Skip to content

Commit

Permalink
winrt/client: fix issues with getting services
Browse files Browse the repository at this point in the history
There were a number of issues with getting services not being restarted
properly when a services changed event occurred. Cancellation wasn't
handled properly in FutureLike and it wasn't used in enough places.

Now, the entire `get_services()` method is restarted when an event is
received instead of only `self._requester.get_gatt_services_async()`.
  • Loading branch information
dlech committed Nov 19, 2022
1 parent 793cd09 commit 6608999
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 85 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -7,6 +7,14 @@ All notable changes to this project will be documented in this file.
The format is based on `Keep a Changelog <https://keepachangelog.com/en/1.0.0/>`_,
and this project adheres to `Semantic Versioning <https://semver.org/spec/v2.0.0.html>`_.

`Unreleased`_
=============

Fixed
-----
* Fixed getting services in WinRT backend. Fixes #1123.


`0.19.4`_ (2022-11-06)
======================

Expand Down
231 changes: 146 additions & 85 deletions bleak/backends/winrt/client.py
Expand Up @@ -6,7 +6,6 @@
"""

import asyncio
import functools
import logging
import sys
import uuid
Expand Down Expand Up @@ -278,6 +277,7 @@ def handle_disconnect():
)
self._services_changed_token = None

logger.debug("closing requester")
self._requester.close()
self._requester = None

Expand All @@ -294,6 +294,7 @@ def handle_disconnect():
)
self._max_pdu_size_changed_token = None

logger.debug("closing session")
self._session.close()
self._session = None

Expand Down Expand Up @@ -360,29 +361,76 @@ def max_pdu_size_changed_handler(sender: GattSession, args):
max_pdu_size_changed_handler
)

# Windows does not support explicitly connecting to a device.
# Instead it has the concept of a GATT session that is owned
# by the calling program.
self._session.maintain_connection = True
# This keeps the device connected until we set maintain_connection = False.

wait_connect_task = asyncio.create_task(event.wait())
services_changed_event = asyncio.Event()
self._services_changed_events.append(services_changed_event)

try:
# Windows does not support explicitly connecting to a device.
# Instead it has the concept of a GATT session that is owned
# by the calling program.
self._session.maintain_connection = True
# This keeps the device connected until we set maintain_connection = False.

# if we receive a services changed event before get_gatt_services_async()
# finishes, we need to call it again with BluetoothCacheMode.CACHED
# to ensure we have the correct services as described in
# https://learn.microsoft.com/en-us/uwp/api/windows.devices.bluetooth.bluetoothledevice.gattserviceschanged
cache_mode = None

if self._use_cached_services is not None:
cache_mode = (
BluetoothCacheMode.CACHED
if self._use_cached_services
else BluetoothCacheMode.UNCACHED
)

async with async_timeout.timeout(timeout):
while True:
services_changed_event_task = asyncio.create_task(
services_changed_event.wait()
)

get_services_task = asyncio.create_task(
self.get_services(cache_mode=cache_mode)
)

_, pending = await asyncio.wait(
[services_changed_event_task, get_services_task],
return_when=asyncio.FIRST_COMPLETED,
)

for p in pending:
p.cancel()

if not services_changed_event.is_set():
# services did not change while getting services,
# so this is the final result
self.services = get_services_task.result()
self._services_resolved = True
break

logger.debug(
"%s: restarting get services due to services changed event",
self.address,
)
cache_mode = BluetoothCacheMode.CACHED
services_changed_event.clear()

# ensure the task ran to completion to avoid OSError
# on next call to get_services()
try:
await get_services_task
except OSError:
pass
except asyncio.CancelledError:
pass

# a connection may not be made until we request info from the
# device, so we have to get services before the GATT session
# is set to active
wait_get_services_task = asyncio.create_task(self.get_services())

try:
# wait for the session to become active
async with async_timeout.timeout(timeout):
await asyncio.gather(wait_connect_task, wait_get_services_task)

finally:
wait_get_services_task.cancel()
await event.wait()
finally:
wait_connect_task.cancel()
self._services_changed_events.remove(services_changed_event)

except BaseException:
handle_disconnect()
Expand Down Expand Up @@ -545,7 +593,9 @@ async def unpair(self) -> bool:

# GATT services methods

async def get_services(self, **kwargs) -> BleakGATTServiceCollection:
async def get_services(
self, *, cache_mode: Optional[BluetoothCacheMode] = None, **kwargs
) -> BleakGATTServiceCollection:
"""Get all services registered for this GATT server.
Returns:
Expand All @@ -557,7 +607,9 @@ async def get_services(self, **kwargs) -> BleakGATTServiceCollection:
if self._services_resolved:
return self.services

logger.debug("Get Services...")
logger.debug("getting services (cache_mode=%r)...", cache_mode)

new_services = BleakGATTServiceCollection()

# Each of the get_serv/char/desc_async() methods has two forms, one
# with no args and one with a cache_mode argument
Expand All @@ -567,49 +619,11 @@ async def get_services(self, **kwargs) -> BleakGATTServiceCollection:
# was created, the we use the second form with explicit cache mode.
# Otherwise we use the first form with no explicit cache mode which
# allows the OS Bluetooth stack to decide what is best.
if self._use_cached_services is not None:
args.append(
BluetoothCacheMode.CACHED
if self._use_cached_services
else BluetoothCacheMode.UNCACHED
)

# if we receive a services changed event before get_gatt_services_async()
# finishes, we need to call it again with BluetoothCacheMode.UNCACHED
# to ensure we have the correct services as described in
# https://learn.microsoft.com/en-us/uwp/api/windows.devices.bluetooth.bluetoothledevice.gattserviceschanged
while True:
services_changed_event = asyncio.Event()
services_changed_event_task = asyncio.create_task(
services_changed_event.wait()
)
self._services_changed_events.append(services_changed_event)

get_services_task = FutureLike(
self._requester.get_gatt_services_async(*args)
)

try:
await asyncio.wait(
[services_changed_event_task, get_services_task],
return_when=asyncio.FIRST_COMPLETED,
)
finally:
services_changed_event_task.cancel()
self._services_changed_events.remove(services_changed_event)
get_services_task.cancel()

if not services_changed_event.is_set():
break

logger.debug(
"%s: restarting get services due to services changed event",
self.address,
)
args = [BluetoothCacheMode.CACHED]
if cache_mode is not None:
args.append(cache_mode)

services: Sequence[GattDeviceService] = _ensure_success(
get_services_task.result(),
await FutureLike(self._requester.get_gatt_services_async(*args)),
"services",
"Could not get GATT services",
)
Expand All @@ -621,38 +635,37 @@ async def get_services(self, **kwargs) -> BleakGATTServiceCollection:
if service.uuid in _ACCESS_DENIED_SERVICES:
continue

self.services.add_service(BleakGATTServiceWinRT(service))
new_services.add_service(BleakGATTServiceWinRT(service))

characteristics: Sequence[GattCharacteristic] = _ensure_success(
await service.get_characteristics_async(*args),
await FutureLike(service.get_characteristics_async(*args)),
"characteristics",
f"Could not get GATT characteristics for {service}",
)

for characteristic in characteristics:
self.services.add_characteristic(
new_services.add_characteristic(
BleakGATTCharacteristicWinRT(
characteristic, self._session.max_pdu_size - 3
)
)

descriptors: Sequence[GattDescriptor] = _ensure_success(
await characteristic.get_descriptors_async(*args),
await FutureLike(characteristic.get_descriptors_async(*args)),
"descriptors",
f"Could not get GATT descriptors for {service}",
)

for descriptor in descriptors:
self.services.add_descriptor(
new_services.add_descriptor(
BleakGATTDescriptorWinRT(
descriptor,
str(characteristic.uuid),
characteristic.attribute_handle,
)
)

self._services_resolved = True
return self.services
return new_services

# I/O methods

Expand Down Expand Up @@ -914,29 +927,53 @@ class FutureLike:
Needed until https://github.com/pywinrt/pywinrt/issues/14
"""

_asyncio_future_blocking = True
_asyncio_future_blocking = False

def __init__(self, async_result: IAsyncOperation) -> None:
self._async_result = async_result
def __init__(self, op: IAsyncOperation) -> None:
self._op = op
self._callbacks = []
self._loop = asyncio.get_running_loop()
self._cancel_requested = False
self._result = None

def call_callbacks(op: IAsyncOperation, status: AsyncStatus):
def call_callbacks():
for c in self._callbacks:
c(self)

async_result.completed = functools.partial(
self._loop.call_soon_threadsafe, call_callbacks
)
def call_callbacks_threadsafe(op: IAsyncOperation, status: AsyncStatus):
if status == AsyncStatus.COMPLETED:
# have to get result on this thread, otherwise it may not return correct value
self._result = op.get_results()

self._loop.call_soon_threadsafe(call_callbacks)

op.completed = call_callbacks_threadsafe

def result(self) -> Any:
return self._async_result.get_results()
if self._op.status == AsyncStatus.STARTED:
raise asyncio.InvalidStateError

if self._op.status == AsyncStatus.COMPLETED:
if self._cancel_requested:
raise asyncio.CancelledError

return self._result

if self._op.status == AsyncStatus.CANCELED:
raise asyncio.CancelledError

if self._op.status == AsyncStatus.ERROR:
if self._cancel_requested:
raise asyncio.CancelledError

error_code = self._op.error_code.value
pythonapi.PyErr_SetFromWindowsErr(error_code)

def done(self) -> bool:
return self._async_result.status != AsyncStatus.STARTED
return self._op.status != AsyncStatus.STARTED

def cancelled(self) -> bool:
return self._async_result.status == AsyncStatus.CANCELED
return self._cancel_requested or self._op.status == AsyncStatus.CANCELED

def add_done_callback(self, callback, *, context=None) -> None:
self._callbacks.append(callback)
Expand All @@ -945,23 +982,47 @@ def remove_done_callback(self, callback) -> None:
self._callbacks.remove(callback)

def cancel(self, msg=None) -> bool:
if self._async_result.status != AsyncStatus.STARTED:
if self._cancel_requested or self._op.status != AsyncStatus.STARTED:
return False
self._async_result.cancel()

self._cancel_requested = True
self._op.cancel()

return True

def exception(self) -> Optional[Exception]:
if self._async_result.status == AsyncStatus.STARTED:
if self._op.status == AsyncStatus.STARTED:
raise asyncio.InvalidStateError
if self._async_result.status == AsyncStatus.COMPLETED:

if self._op.status == AsyncStatus.COMPLETED:
if self._cancel_requested:
raise asyncio.CancelledError

return None
if self._async_result.status == AsyncStatus.CANCELED:

if self._op.status == AsyncStatus.CANCELED:
raise asyncio.CancelledError
if self._async_result.status == AsyncStatus.ERROR:

if self._op.status == AsyncStatus.ERROR:
if self._cancel_requested:
raise asyncio.CancelledError

error_code = self._op.error_code.value

try:
pythonapi.PyErr_SetFromWindowsErr(self._async_result.error_code)
pythonapi.PyErr_SetFromWindowsErr(error_code)
except OSError as e:
return e

def get_loop(self) -> asyncio.AbstractEventLoop:
return self._loop

def __await__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.

if not self.done():
raise RuntimeError("await wasn't used with future")

return self.result() # May raise too.

0 comments on commit 6608999

Please sign in to comment.