From f55b01d96504d2d2ff22d49519cffdac8aa6f9fb Mon Sep 17 00:00:00 2001 From: David Lechner Date: Mon, 25 Jul 2022 19:46:16 -0500 Subject: [PATCH] bluez/client: move state management to global BlueZ manager This fixes a number of bugs by moving the `BleakClient` state management to the new global BlueZ manager object. - Calling "GetManagedObjects" each time we connected caused performance issues. Fixes #500. - Calling "ConnectDevice" didn't work as expected and has been removed/ Fixes #806. - BleakClient didn't handle "InterfacesRemoved" which resulted in an invalid service dictionary in some cases. Fixes #882. --- CHANGELOG.rst | 7 + bleak/backends/bluezdbus/client.py | 331 ++++++---------------------- bleak/backends/bluezdbus/manager.py | 226 ++++++++++++++++++- 3 files changed, 300 insertions(+), 264 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b373d806..1ce56d87 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -38,7 +38,14 @@ Fixed * Documentation fixes. * On empty characteristic description from WinRT, use the lookup table instead of returning empty string. +* Fixed performance issues in BlueZ backend caused by calling "GetManagedObjects" each time a + ``BleakScanner`` scans or ``BleakClient`` is connected. Fixes #500. +* Fixed not handling "InterfacesRemoved" in ``BleakClient`` in BlueZ backend. Fixes #882. +Removed +------- + +* Removed fallback to call "ConnectDevice" when "Connect" fails in Bluez backend. Fixes #806. `0.14.3`_ (2022-04-29) ====================== diff --git a/bleak/backends/bluezdbus/client.py b/bleak/backends/bluezdbus/client.py index 39c605b2..f553ec50 100644 --- a/bleak/backends/bluezdbus/client.py +++ b/bleak/backends/bluezdbus/client.py @@ -7,24 +7,21 @@ import asyncio import os import warnings -from typing import Any, Callable, Dict, Optional, Union +from typing import Callable, Optional, Union from uuid import UUID from dbus_next.aio import MessageBus -from dbus_next.constants import BusType, ErrorType, MessageType +from dbus_next.constants import BusType, ErrorType from dbus_next.message import Message from dbus_next.signature import Variant from bleak.backends.bluezdbus import check_bluez_version, defs from bleak.backends.bluezdbus.characteristic import BleakGATTCharacteristicBlueZDBus -from bleak.backends.bluezdbus.descriptor import BleakGATTDescriptorBlueZDBus +from bleak.backends.bluezdbus.manager import get_global_bluez_manager from bleak.backends.bluezdbus.scanner import BleakScannerBlueZDBus -from bleak.backends.bluezdbus.service import BleakGATTServiceBlueZDBus -from bleak.backends.bluezdbus.signals import MatchRules, add_match from bleak.backends.bluezdbus.utils import ( assert_reply, extract_service_handle_from_path, - unpack_variants, ) from bleak.backends.client import BaseBleakClient from bleak.backends.device import BLEDevice @@ -66,10 +63,10 @@ def __init__(self, address_or_ble_device: Union[BLEDevice, str], **kwargs): # D-Bus message bus self._bus: Optional[MessageBus] = None - # D-Bus properties for the device - self._properties: Dict[str, Any] = {} - # provides synchronization between get_services() and PropertiesChanged signal - self._services_resolved_event: Optional[asyncio.Event] = None + # tracks device watcher subscription + self._remove_device_watcher: Optional[Callable] = None + # private backing for is_connected property + self._is_connected = False # indicates disconnect request in progress when not None self._disconnecting_event: Optional[asyncio.Event] = None # used to ensure device gets disconnected if event loop crashes @@ -121,193 +118,82 @@ async def connect(self, **kwargs) -> bool: "Device with address {0} was not found.".format(self.address) ) - # Create system bus + manager = await get_global_bluez_manager() + + # Each BLE connection session needs a new D-Bus connection to avoid a + # BlueZ quirk where notifications are automatically enabled on reconnect. self._bus = await MessageBus( bus_type=BusType.SYSTEM, negotiate_unix_fd=True ).connect() - try: - # Add signal handlers. These monitor the device D-Bus object and - # all of its descendats (services, characteristics, descriptors). - # This we always have an up-to-date state for all of these that is - # maintained automatically in the background. + def on_connected_changed(connected: bool) -> None: + if not connected: + logger.debug(f"Device disconnected ({self._device_path})") - self._bus.add_message_handler(self._parse_msg) + self._is_connected = False - rules = MatchRules( - interface=defs.OBJECT_MANAGER_INTERFACE, - member="InterfacesAdded", - arg0path=f"{self._device_path}/", - ) - reply = await add_match(self._bus, rules) - assert_reply(reply) + if self._disconnect_monitor_event: + self._disconnect_monitor_event.set() + self._disconnect_monitor_event = None - rules = MatchRules( - interface=defs.OBJECT_MANAGER_INTERFACE, - member="InterfacesRemoved", - arg0path=f"{self._device_path}/", - ) - reply = await add_match(self._bus, rules) - assert_reply(reply) + self._cleanup_all() + if self._disconnected_callback is not None: + self._disconnected_callback(self) + disconnecting_event = self._disconnecting_event + if disconnecting_event: + disconnecting_event.set() - rules = MatchRules( - interface=defs.PROPERTIES_INTERFACE, - member="PropertiesChanged", - path_namespace=self._device_path, - ) - reply = await add_match(self._bus, rules) - assert_reply(reply) + def on_value_changed(char_path: str, value: bytes) -> None: + if char_path in self._notification_callbacks: + handle = extract_service_handle_from_path(char_path) + self._notification_callbacks[char_path](handle, bytearray(value)) - # Find the HCI device to use for scanning and get cached device properties - reply = await self._bus.call( - Message( - destination=defs.BLUEZ_SERVICE, - path="/", - member="GetManagedObjects", - interface=defs.OBJECT_MANAGER_INTERFACE, - ) - ) - assert_reply(reply) - - interfaces_and_props: Dict[str, Dict[str, Variant]] = reply.body[0] + watcher = manager.add_device_watcher( + self._device_path, on_connected_changed, on_value_changed + ) + self._remove_device_watcher = lambda: manager.remove_device_watcher(watcher) - # The device may have been removed from BlueZ since the time we stopped scanning - if self._device_path not in interfaces_and_props: - # Sometimes devices can be removed from the BlueZ object manager - # before we connect to them. In this case we try using the - # org.bluez.Adapter1.ConnectDevice method instead. This method - # requires that bluetoothd is run with the --experimental flag - # and is available since BlueZ 5.49. - logger.debug( - f"org.bluez.Device1 object not found, trying org.bluez.Adapter1.ConnectDevice ({self._device_path})" - ) + try: + try: reply = await asyncio.wait_for( self._bus.call( Message( destination=defs.BLUEZ_SERVICE, - interface=defs.ADAPTER_INTERFACE, - path=f"/org/bluez/{self._adapter}", - member="ConnectDevice", - signature="a{sv}", - body=[ - { - "Address": Variant( - "s", self._device_info["Address"] - ), - "AddressType": Variant( - "s", self._device_info["AddressType"] - ), - } - ], + interface=defs.DEVICE_INTERFACE, + path=self._device_path, + member="Connect", ) ), timeout, ) - - # FIXME: how to cancel connection if timeout? - - if ( - reply.message_type == MessageType.ERROR - and reply.error_name == ErrorType.UNKNOWN_METHOD.value - ): - logger.debug( - f"org.bluez.Adapter1.ConnectDevice not found ({self._device_path}), try enabling bluetoothd --experimental" - ) - raise BleakError( - "Device with address {0} could not be found. " - "Try increasing `timeout` value or moving the device closer.".format( - self.address - ) - ) - assert_reply(reply) - else: - # required interface - self._properties = unpack_variants( - interfaces_and_props[self._device_path][defs.DEVICE_INTERFACE] - ) - - # optional interfaces - services and characteristics may not - # be populated yet - for path, interfaces in interfaces_and_props.items(): - if not path.startswith(self._device_path): - continue - - if defs.GATT_SERVICE_INTERFACE in interfaces: - obj = unpack_variants(interfaces[defs.GATT_SERVICE_INTERFACE]) - self.services.add_service(BleakGATTServiceBlueZDBus(obj, path)) - - if defs.GATT_CHARACTERISTIC_INTERFACE in interfaces: - obj = unpack_variants( - interfaces[defs.GATT_CHARACTERISTIC_INTERFACE] - ) - service = interfaces_and_props[obj["Service"]][ - defs.GATT_SERVICE_INTERFACE - ] - uuid = service["UUID"].value - handle = extract_service_handle_from_path(obj["Service"]) - self.services.add_characteristic( - BleakGATTCharacteristicBlueZDBus(obj, path, uuid, handle) - ) - - if defs.GATT_DESCRIPTOR_INTERFACE in interfaces: - obj = unpack_variants( - interfaces[defs.GATT_DESCRIPTOR_INTERFACE] - ) - characteristic = interfaces_and_props[obj["Characteristic"]][ - defs.GATT_CHARACTERISTIC_INTERFACE - ] - uuid = characteristic["UUID"].value - handle = extract_service_handle_from_path(obj["Characteristic"]) - self.services.add_descriptor( - BleakGATTDescriptorBlueZDBus(obj, path, uuid, handle) - ) + self._is_connected = True + except BaseException: + # calling Disconnect cancels any pending connect request try: - reply = await asyncio.wait_for( - self._bus.call( - Message( - destination=defs.BLUEZ_SERVICE, - interface=defs.DEVICE_INTERFACE, - path=self._device_path, - member="Connect", - ) - ), - timeout, + reply = await self._bus.call( + Message( + destination=defs.BLUEZ_SERVICE, + interface=defs.DEVICE_INTERFACE, + path=self._device_path, + member="Disconnect", + ) ) - assert_reply(reply) - except BaseException: - # calling Disconnect cancels any pending connect request try: - reply = await self._bus.call( - Message( - destination=defs.BLUEZ_SERVICE, - interface=defs.DEVICE_INTERFACE, - path=self._device_path, - member="Disconnect", - ) - ) - try: - assert_reply(reply) - except BleakDBusError as e: - # if the object no longer exists, then we know we - # are disconnected for sure, so don't need to log a - # warning about it - if e.dbus_error != ErrorType.UNKNOWN_OBJECT.value: - raise - except Exception as e: - logger.warning( - f"Failed to cancel connection ({self._device_path}): {e}" - ) - - raise + assert_reply(reply) + except BleakDBusError as e: + # if the object no longer exists, then we know we + # are disconnected for sure, so don't need to log a + # warning about it + if e.dbus_error != ErrorType.UNKNOWN_OBJECT.value: + raise + except Exception as e: + logger.warning( + f"Failed to cancel connection ({self._device_path}): {e}" + ) - if self.is_connected: - logger.debug(f"Connection successful ({self._device_path})") - else: - raise BleakError( - f"Connection was not successful! ({self._device_path})" - ) + raise # Create a task that runs until the device is disconnected. self._disconnect_monitor_event = asyncio.Event() @@ -329,7 +215,7 @@ async def _disconnect_monitor(self) -> None: # loop is called with asyncio.run() or otherwise runs pending tasks # after the original event loop stops. This will also cause an exception # if a run loop is stopped before the device is disconnected since this - # task will still be running and asyncio compains if a loop with running + # task will still be running and asyncio complains if a loop with running # tasks is stopped. try: await self._disconnect_monitor_event.wait() @@ -356,6 +242,10 @@ def _cleanup_all(self) -> None: """ logger.debug(f"_cleanup_all({self._device_path})") + if self._remove_device_watcher: + self._remove_device_watcher() + self._remove_device_watcher = None + if not self._bus: logger.debug(f"already disconnected ({self._device_path})") return @@ -513,7 +403,7 @@ def is_connected(self) -> bool: """ return self._DeprecatedIsConnectedReturn( - False if self._bus is None else self._properties.get("Connected", False) + False if self._bus is None else self._is_connected ) async def _acquire_mtu(self) -> None: @@ -585,15 +475,11 @@ async def get_services(self, **kwargs) -> BleakGATTServiceCollection: if self._services_resolved: return self.services - if not self._properties["ServicesResolved"]: - logger.debug(f"Waiting for ServicesResolved ({self._device_path})") - self._services_resolved_event = asyncio.Event() - try: - await asyncio.wait_for(self._services_resolved_event.wait(), 5) - finally: - self._services_resolved_event = None + manager = await get_global_bluez_manager() + self.services = await manager.get_services(self._device_path) self._services_resolved = True + return self.services # IO methods @@ -651,7 +537,8 @@ async def read_gatt_char( self._hides_device_name_characteristic ): # Simulate regular characteristics read to be consistent over all platforms. - value = bytearray(self._properties["Name"].encode("ascii")) + manager = await get_global_bluez_manager() + value = bytearray(manager.get_device_name(self._device_path).encode()) logger.debug( "Read Device Name {0} | {1}: {2}".format( char_specifier, self._device_path, value @@ -963,81 +850,3 @@ async def stop_notify( assert_reply(reply) self._notification_callbacks.pop(characteristic.path, None) - - # Internal Callbacks - - def _parse_msg(self, message: Message): - if message.message_type != MessageType.SIGNAL: - return - - logger.debug( - "received D-Bus signal: {0}.{1} ({2}): {3}".format( - message.interface, message.member, message.path, message.body - ) - ) - - if message.member == "InterfacesAdded": - path, interfaces = message.body - - if defs.GATT_SERVICE_INTERFACE in interfaces: - obj = unpack_variants(interfaces[defs.GATT_SERVICE_INTERFACE]) - # if this assert fails, it means our match rules are probably wrong - assert obj["Device"] == self._device_path - self.services.add_service(BleakGATTServiceBlueZDBus(obj, path)) - - if defs.GATT_CHARACTERISTIC_INTERFACE in interfaces: - obj = unpack_variants(interfaces[defs.GATT_CHARACTERISTIC_INTERFACE]) - service = next( - x - for x in self.services.services.values() - if x.path == obj["Service"] - ) - self.services.add_characteristic( - BleakGATTCharacteristicBlueZDBus( - obj, path, service.uuid, service.handle - ) - ) - - if defs.GATT_DESCRIPTOR_INTERFACE in interfaces: - obj = unpack_variants(interfaces[defs.GATT_DESCRIPTOR_INTERFACE]) - handle = extract_service_handle_from_path(obj["Characteristic"]) - characteristic = self.services.characteristics[handle] - self.services.add_descriptor( - BleakGATTDescriptorBlueZDBus(obj, path, characteristic.uuid, handle) - ) - elif message.member == "InterfacesRemoved": - path, interfaces = message.body - - elif message.member == "PropertiesChanged": - interface, changed, _ = message.body - changed = unpack_variants(changed) - - if interface == defs.GATT_CHARACTERISTIC_INTERFACE: - if message.path in self._notification_callbacks and "Value" in changed: - handle = extract_service_handle_from_path(message.path) - self._notification_callbacks[message.path]( - handle, bytearray(changed["Value"]) - ) - elif interface == defs.DEVICE_INTERFACE: - self._properties.update(changed) - - if "ServicesResolved" in changed: - if changed["ServicesResolved"]: - if self._services_resolved_event: - self._services_resolved_event.set() - else: - self._services_resolved = False - - if "Connected" in changed and not changed["Connected"]: - logger.debug(f"Device disconnected ({self._device_path})") - - if self._disconnect_monitor_event: - self._disconnect_monitor_event.set() - self._disconnect_monitor_event = None - - self._cleanup_all() - if self._disconnected_callback is not None: - self._disconnected_callback(self) - disconnecting_event = self._disconnecting_event - if disconnecting_event: - disconnecting_event.set() diff --git a/bleak/backends/bluezdbus/manager.py b/bleak/backends/bluezdbus/manager.py index ab50dcd5..acf249e9 100644 --- a/bleak/backends/bluezdbus/manager.py +++ b/bleak/backends/bluezdbus/manager.py @@ -13,11 +13,15 @@ from dbus_next import BusType, Message, MessageType, Variant from dbus_next.aio.message_bus import MessageBus -from typing_extensions import TypedDict, Literal +from typing_extensions import Literal, TypedDict from ...exc import BleakError +from ..service import BleakGATTServiceCollection from . import defs from .advertisement_monitor import AdvertisementMonitor, OrPatternLike +from .characteristic import BleakGATTCharacteristicBlueZDBus +from .descriptor import BleakGATTDescriptorBlueZDBus +from .service import BleakGATTServiceBlueZDBus from .signals import MatchRules, add_match from .utils import assert_reply, unpack_variants @@ -196,6 +200,42 @@ class CallbackAndState(NamedTuple): """ +DeviceConnectedChangedCallback = Callable[[bool], None] +""" +A callback that is called when a device's "Connected" property changes. + +Args: + arg0: The current value of the "Connected" property. +""" + +CharacteristicValueChangedCallback = Callable[[str, bytes], None] +""" +A callback that is called when a characteristics's "Value" property changes. + +Args: + arg0: The D-Bus object path of the characteristic. + arg1: The current value of the "Value" property. +""" + + +class DeviceWatcher(NamedTuple): + + device_path: str + """ + The D-Bus object path of the device. + """ + + on_connected_changed: DeviceConnectedChangedCallback + """ + A callback that is called when a device's "Connected" property changes. + """ + + on_characteristic_value_changed: CharacteristicValueChangedCallback + """ + A callback that is called when a characteristics's "Value" property changes. + """ + + # set of org.bluez.Device1 property names that come from advertising data _ADVERTISING_DATA_PROPERTIES = { "AdvertisingData", @@ -222,6 +262,8 @@ def __init__(self): self._properties: Dict[str, Dict[str, Dict[str, Any]]] = {} self._advertisement_callbacks: List[CallbackAndState] = [] + self._device_watchers: Set[DeviceWatcher] = set() + self._condition_callbacks: Set[Callable] = set() async def async_init(self): """ @@ -460,6 +502,153 @@ async def stop(): self._advertisement_callbacks.remove(callback_and_state) raise + def add_device_watcher( + self, + device_path: str, + on_connected_changed: DeviceConnectedChangedCallback, + on_characteristic_value_changed: CharacteristicValueChangedCallback, + ) -> DeviceWatcher: + """ + Registers a device watcher to receive callbacks when device state + changes or events are received. + + Args: + device_path: + The D-Bus object path of the device. + on_connected_changed: + A callback that is called when the device's "Connected" + state changes. + on_characteristic_value_changed: + A callback that is called whenever a characteristic receives + a notification/indication. + + Returns: + A device watcher object that acts a token to unregister the watcher. + """ + watcher = DeviceWatcher( + device_path, on_connected_changed, on_characteristic_value_changed + ) + + self._device_watchers.add(watcher) + return watcher + + def remove_device_watcher(self, watcher: DeviceWatcher) -> None: + """ + Unregisters a device watcher. + + Args: + The device watcher token that was returned by + :meth:`add_device_watcher`. + """ + self._device_watchers.remove(watcher) + + async def get_services(self, device_path: str) -> BleakGATTServiceCollection: + """ + Builds a new :class:`BleakGATTServiceCollection` from the current state. + + Args: + device_path: The D-Bus object path of the Bluetooth device. + + Returns: + A new :class:`BleakGATTServiceCollection`. + """ + await self._wait_condition(device_path, "ServicesResolved", True) + + services = BleakGATTServiceCollection() + + for service_path, service_ifaces in self._properties.items(): + if ( + not service_path.startswith(device_path) + or defs.GATT_SERVICE_INTERFACE not in service_ifaces + ): + continue + + service = BleakGATTServiceBlueZDBus( + service_ifaces[defs.GATT_SERVICE_INTERFACE], service_path + ) + + services.add_service(service) + + for char_path, char_ifaces in self._properties.items(): + if ( + not char_path.startswith(service_path) + or defs.GATT_CHARACTERISTIC_INTERFACE not in char_ifaces + ): + continue + + char = BleakGATTCharacteristicBlueZDBus( + char_ifaces[defs.GATT_CHARACTERISTIC_INTERFACE], + char_path, + service.uuid, + service.handle, + ) + + services.add_characteristic(char) + + for desc_path, desc_ifaces in self._properties.items(): + if ( + not desc_path.startswith(char_path) + or defs.GATT_DESCRIPTOR_INTERFACE not in desc_ifaces + ): + continue + + desc = BleakGATTDescriptorBlueZDBus( + desc_ifaces[defs.GATT_DESCRIPTOR_INTERFACE], + desc_path, + char.uuid, + char.handle, + ) + + services.add_descriptor(desc) + + return services + + def get_device_name(self, device_path: str) -> str: + """ + Gets the value of the "Name" property for a device. + + Args: + device_path: The D-Bus object path of the device. + + Returns: + The current property value. + """ + return self._properties[device_path][defs.DEVICE_INTERFACE]["Name"] + + async def _wait_condition( + self, device_path: str, property_name: str, property_value: Any + ) -> None: + """ + Waits for a condition to become true. + + Args: + device_path: The D-Bus object path of a Bluetooth device. + property_name: The name of the property to test. + property_value: A value to compare the current property value to. + """ + if ( + self._properties[device_path][defs.DEVICE_INTERFACE][property_name] + == property_value + ): + return + + event = asyncio.Event() + + def callback(): + if ( + self._properties[device_path][defs.DEVICE_INTERFACE][property_name] + == property_value + ): + event.set() + + self._condition_callbacks.add(callback) + + try: + # can be canceled + await event.wait() + finally: + self._condition_callbacks.remove(callback) + def _parse_msg(self, message: Message): """ Handles callbacks from dbus_next. @@ -507,9 +696,19 @@ def _parse_msg(self, message: Message): # since "GetManagedObjects" will return a newer value. pass else: + # update self._properties first + self_interface.update(unpack_variants(changed)) + for name in invalidated: + del self_interface[name] + + # then call any callbacks so they will be called with the + # updated state + if interface == defs.DEVICE_INTERFACE: + # handle advertisement watchers + for ( callback, adapter_path, @@ -537,8 +736,29 @@ def _parse_msg(self, message: Message): # TODO: this should be deep copy, not shallow callback(message.path, cast(Device1, self_interface.copy())) - for name in invalidated: - del self_interface[name] + # handle device condition watchers + for condition_callback in self._condition_callbacks: + condition_callback() + + # handle device connection change watchers + + if "Connected" in changed: + for ( + device_path, + on_connected_changed, + _, + ) in self._device_watchers.copy(): + # callbacks may remove the watcher, hence the copy() above + if message.path == device_path: + on_connected_changed(self_interface["Connected"]) + + elif interface == defs.GATT_CHARACTERISTIC_INTERFACE: + # handle characteristic value change watchers + + if "Value" in changed: + for device_path, _, on_value_changed in self._device_watchers: + if message.path.startswith(device_path): + on_value_changed(message.path, self_interface["Value"]) async def get_global_bluez_manager() -> BlueZManager: