Skip to content

Commit

Permalink
Allow using multiple bulbs at the same time with magicblueshell
Browse files Browse the repository at this point in the history
* `connect` can now be called multiple time with all the bulbs you want to connect to. Commands are then dispatched on every bulb one at a time
* `connect` now accepts bulb version as an optional argument, allowing to connect to multiple bulbs with different versions at the same time
  • Loading branch information
Betree committed Jul 22, 2017
1 parent 670938e commit 52fa1f0
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 55 deletions.
6 changes: 3 additions & 3 deletions magicblue/magicbluelib.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

def connection_required(func):
"""Raise an exception before calling the actual function if the device is
not connection.
not connected.
"""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
Expand All @@ -50,9 +50,9 @@ def _figure_addr_type(mac_address=None, version=None, addr_type=None):
if version == 9 or version == 10:
return btle.ADDR_TYPE_PUBLIC

if version == 8:
if version == 7 or version == 8:
return btle.ADDR_TYPE_RANDOM

# try using mac_address
if mac_address is not None:
mac_address_num = int(mac_address.replace(':', ''), 16)
Expand Down
114 changes: 62 additions & 52 deletions magicblue/magicblueshell.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import sys
from sys import platform as _platform

import webcolors
from webcolors import hex_to_rgb, name_to_rgb
from bluepy.btle import Scanner, DefaultDelegate
try:
from magicblue.magicbluelib import MagicBlue, Effect
Expand All @@ -30,13 +30,14 @@
class MagicBlueShell:
class Cmd:
def __init__(self, cmd_str, func, conn_required, help='', params=None,
aliases=None):
aliases=None, opt_params=None):
self.cmd_str = cmd_str
self.func = func
self.conn_required = conn_required
self.help = help
self.params = params or []
self.aliases = aliases or []
self.opt_params = opt_params or []

def __init__(self, bluetooth_adapter, bulb_version=7):
# List available commands and their usage. 'con_required' define if
Expand All @@ -51,7 +52,9 @@ def __init__(self, bluetooth_adapter, bulb_version=7):
help='List available effects',),
MagicBlueShell.Cmd('connect', self.cmd_connect, False,
help='Connect to light bulb',
params=['mac_address or ID']),
params=['mac_address||ID'],
aliases=['c'],
opt_params=['bulb version (default 7)']),
MagicBlueShell.Cmd('disconnect', self.cmd_disconnect, True,
help='Disconnect from current light bulb'),
MagicBlueShell.Cmd('set_color', self.cmd_set_color, True,
Expand All @@ -75,7 +78,7 @@ def __init__(self, bluetooth_adapter, bulb_version=7):

self.bluetooth_adapter = bluetooth_adapter
self._bulb_version = bulb_version
self._magic_blue = None
self._bulbs = []
self._devices = []
self.last_scan = None

Expand All @@ -98,12 +101,12 @@ def start_interactive_mode(self):

def exec_cmd(self, str_cmd):
cmd = self._get_command(str_cmd)
args = str_cmd.split()[1:]
if cmd is not None:
if cmd.conn_required and not (self._magic_blue and
self._magic_blue.is_connected()):
if cmd.conn_required and not (len(self._bulbs) > 0):
logger.error('You must be connected to run this command')
elif self._check_args(str_cmd, cmd):
cmd.func(str_cmd.split()[1:])
elif self._check_args(cmd, args):
cmd.func(args)
else:
logger.error('"{}" is not a valid command.'
'Type "help" to see what you can do'
Expand All @@ -112,12 +115,14 @@ def exec_cmd(self, str_cmd):
def print_usage(self, str_cmd):
cmd = self._get_command(str_cmd)
if cmd is not None:
print('Usage: {} {}'.format(cmd.cmd_str, ' '.join(cmd.params)))
params = ' '.join(cmd.params)
opt_params = ' '.join('[{}]'.format(p) for p in cmd.opt_params)
print('Usage: {} {} {}'.format(cmd.cmd_str, params, opt_params))
else:
logger.error('Unknown command {}'.format(str_cmd))
return False

def cmd_list_devices(self, *args):
def cmd_list_devices(self, args):
scan_time = 300
try:
self.last_scan = ScanDelegate()
Expand All @@ -135,65 +140,70 @@ def cmd_list_devices(self, *args):
logger.error('Problem with the Bluetooth adapter : {}'.format(e))
return False

def cmd_list_effects(self, *args):
def cmd_list_effects(self, args):
for e in Effect.__members__.keys():
print(e)

def cmd_connect(self, *args):
def cmd_connect(self, args):
bulb_version = args[1] if len(args) > 1 else self._bulb_version
# Use can enter either a mac address or the device ID from the list
if len(args[0][0]) < 4 and self.last_scan:
if len(args[0]) < 4 and self.last_scan:
try:
dev_id = int(args[0][0]) - 1
dev_id = int(args[0]) - 1
entry = self.last_scan.devices[dev_id]
mac_address = entry.addr
addr_type = entry.addrType
except Exception:
logger.error('Bad ID / MAC address : {}'.format(args[0][0]))
logger.error('Bad ID / MAC address : {}'.format(args[0]))
return False
else:
addr_type = None
mac_address = args[0][0]
self._magic_blue = MagicBlue(mac_address,
version=self._bulb_version,
addr_type=addr_type)
self._magic_blue.connect(self.bluetooth_adapter)
mac_address = args[0]
magic_blue = MagicBlue(mac_address,
version=bulb_version,
addr_type=addr_type)
magic_blue.connect(self.bluetooth_adapter)
self._bulbs.append(magic_blue)
logger.info('Connected')

def cmd_disconnect(self, *args):
self._magic_blue.disconnect()
self._magic_blue = None
for bulb in self._bulbs:
bulb.disconnect()
self._bulbs = []

def cmd_turn(self, *args):
if args[0][0] == 'on':
self._magic_blue.turn_on()
def cmd_turn(self, args):
if args[0] == 'on':
[bulb.turn_on() for bulb in self._bulbs]
else:
self._magic_blue.turn_off()

def cmd_read(self, *args):
if args[0][0] == 'name':
name = self._magic_blue.get_device_name()
logger.info('Received name: {}'.format(name))
elif args[0][0] == 'device_info':
device_info = self._magic_blue.get_device_info()
logger.info('Received device_info: {}'.format(device_info))
elif args[0][0] == 'date_time':
datetime_ = self._magic_blue.get_date_time()
logger.info('Received datetime: {}'.format(datetime_))

def cmd_set_color(self, *args):
color = args[0][0]
[bulb.turn_off() for bulb in self._bulbs]

def cmd_read(self, args):
for bulb in self._bulbs:
print('-------------------')
if args[0] == 'name':
name = bulb.get_device_name()
logger.info('Received name: {}'.format(name))
elif args[0] == 'device_info':
device_info = bulb.get_device_info()
logger.info('Received device_info: {}'.format(device_info))
elif args[0] == 'date_time':
datetime_ = bulb.get_date_time()
logger.info('Received datetime: {}'.format(datetime_))

def cmd_set_color(self, args):
color = args[0]
try:
if color.startswith('#'):
self._magic_blue.set_color(webcolors.hex_to_rgb(color))
[b.set_color(hex_to_rgb(color)) for b in self._bulbs]
else:
self._magic_blue.set_color(webcolors.name_to_rgb(color))
[b.set_color(name_to_rgb(color)) for b in self._bulbs]
except ValueError as e:
logger.error('Invalid color value : {}'.format(str(e)))
self.print_usage('set_color')

def cmd_set_warm_light(self, *args):
try:
self._magic_blue.set_warm_light(float(args[0][0]))
[bulb.set_warm_light(float(args[0][0])) for bulb in self._bulbs]
except ValueError as e:
logger.error('Invalid intensity value : {}'.format(str(e)))
self.print_usage('set_color')
Expand All @@ -208,7 +218,7 @@ def cmd_set_effect(self, *args):
except ValueError:
self.print_usage('set_effect')
else:
self._magic_blue.set_effect(effect, speed)
[bulb.set_effect(effect, speed) for bulb in self._bulbs]

def list_commands(self, *args):
print(' ----------------------------')
Expand All @@ -225,13 +235,13 @@ def list_commands(self, *args):
def cmd_exit(self, *args):
print('Bye !')

def _check_args(self, str_cmd, cmd):
expected_nb_args = len(cmd.params)
args = str_cmd.split()[1:]
if len(args) != expected_nb_args:
self.print_usage(str_cmd.split()[0])
return False
return True
def _check_args(self, cmd, args):
min_expected_nb_args = len(cmd.params)
max_expected_nb_args = min_expected_nb_args + len(cmd.opt_params)
if min_expected_nb_args <= len(args) <= max_expected_nb_args:
return True
self.print_usage(cmd.cmd_str)
return False

def _get_command(self, str_cmd):
str_cmd = str_cmd.split()[0]
Expand Down Expand Up @@ -277,7 +287,7 @@ def get_params():
default='7',
dest='bulb_version',
type=int,
help='Bulb version as displayed in the official app')
help='Bulb version (currently support 7, 8, 9 and 10)')
return parser.parse_args()


Expand Down

2 comments on commit 52fa1f0

@mouth4war
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Betree,

I've installed this commit to test it out. It doesn't seem to change behaviour from HASS UI (groups still update one at a time) so I will try from the shell.

@Betree
Copy link
Owner Author

@Betree Betree commented on 52fa1f0 Aug 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update is indeed for shell only. I've made some minor modifications in the lib but the core of this update is to be able to update multiple bulbs sequentially from the shell.

I'm not sure you can achieve real parallelism here, even with threads; I think bluetooth library / device will be a bottleneck. I still have to check BLE specifications + bluepy library details to solve these questions.

Please sign in to comment.