Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bluez: Allow accessing Bleak from multiple individual event loops (not simultaneously) #1031

Conversation

projectgus
Copy link
Contributor

Hi! I have a small patch to the Bluez backend for your consideration.

Background

In the default configuration, pytest-asyncio creates a new event loop to execute each test coroutine in a clean environment. Similarly, there can be other cases where more than one event loop exists during the lifetime of a Python program.

If Bleak object are instantiated in the context of more than one active event loop then a RuntimeError appears in the second event loop, thrown from the dbus-fast layer. This happens even if all Bleak objects associated with the first event loop have already been garbage collected.

Steps to Reproduce

If pytest-asyncio is also installed:

import asyncio
from bleak import BleakScanner

def scanner_callback(device, adv_data):
    print(device)

async def test_a():
    async with BleakScanner():
        await asyncio.sleep(1)

async def test_b():
    async with BleakScanner():
        await asyncio.sleep(1)

Error looks like:

=================================================================== test session starts ====================================================================
platform linux -- Python 3.10.7, pytest-7.1.2, pluggy-1.0.0
rootdir: /path/to/tests, configfile: pytest.ini
plugins: aiohttp-1.0.4, asyncio-0.18.3
asyncio: mode=auto
collected 2 items                                                                                                                                          

test_bleak.py::test_a PASSED                                                                                                                         [ 50%]
test_bleak.py::test_b FAILED                                                                                                                         [100%]

========================================================================= FAILURES =========================================================================
__________________________________________________________________________ test_b __________________________________________________________________________

    async def test_b():
>       async with BleakScanner():

test_bleak.py:12: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../bleak/bleak/__init__.py:101: in __aenter__
    await self._backend.start()
../../bleak/bleak/backends/bluezdbus/scanner.py:150: in start
    self._stop = await manager.active_scan(
../../bleak/bleak/backends/bluezdbus/manager.py:345: in active_scan
    reply = await self._bus.call(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dbus_fast.aio.message_bus.MessageBus object at 0x7f3809791690>, msg = <dbus_fast.message.Message object at 0x7f3808f2b370>

    async def call(self, msg: Message) -> Optional[Message]:
        """Send a method call and wait for a reply from the DBus daemon.
    
        :param msg: The method call message to send.
        :type msg: :class:`Message <dbus_fast.Message>`
    
        :returns: A message in reply to the message sent. If the message does
            not expect a reply based on the message flags or type, returns
            ``None`` after the message is sent.
        :rtype: :class:`Message <dbus_fast.Message>` or :class:`None` if no reply is expected.
    
        :raises:
            - :class:`Exception` - If a connection error occurred.
        """
        if (
            msg.flags & MessageFlag.NO_REPLY_EXPECTED
            or msg.message_type is not MessageType.METHOD_CALL
        ):
            await self.send(msg)
            return None
    
        future = self._loop.create_future()
    
        def reply_handler(reply, err):
            if not future.done():
                if err:
                    _future_set_exception(future, err)
                else:
                    _future_set_result(future, reply)
    
        self._call(msg, reply_handler, check_callback=False)
    
>       await future
E       RuntimeError: Task <Task pending name='Task-2' coro=<test_b() running at /path/test_bleak.py:12> cb=[_run_until_complete_cb() at /usr/lib/python3.10/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

../../dbus-fast/src/dbus_fast/aio/message_bus.py:354: RuntimeError

Root Cause

As far as I can tell, MessageBus needs to hold a lifetime reference to a single event loop because of MessageWriter, which is associated with one particular event loop.

This solution

Save the current event loop associated when get_global_bluez_manager() is first called. If it's called again from a new event loop, and the saved event loop is closed, then instantiate a new BlueZManager instance.

I'm not sure that is the best solution, but it does allow continuing to use Bleak in the context of a new event loop and with pytest-asyncio in the default configuration.

It's still not possible to have Bleak objects accessed in the context of more than one event loop simultaneously, but the code will now log a warning if this happens.

Other possible solutions

I have another branch which implements a per-event-loop cache of the Manager objects. This works alright, but there are still concurrency issues with trying to use Bleak from multiple event loops simultaneously. I don't know enough about DBus to quickly debug these, and it seems like a much less practical use case, but I'm happy to look into it more if you prefer that approach.

Another possible solution would be to have a global deinit function for Bleak, that could be called between executions? It's possible to mock this up now by calling del get_global_bluez_manager.instance and this works, so a clean wrapper around this would also solve this problem (and leaves managing any leftover Bleak objects to the caller.)

Previous event loop needs to be closed for this to work, as the rest
of the DBus access code doesn't support concurrent access from more than one
MessageBus instance.

However this allows creating per-test Bleak classes with pytest-asyncio that
don't overlap each other, and possibly other use cases as well.
@projectgus projectgus force-pushed the bugfix/bluez_manager_new_event_loop branch from 1c581c0 to d00aaf4 Compare September 26, 2022 10:43
@projectgus
Copy link
Contributor Author

I have another branch which implements a per-event-loop cache of the Manager objects. This works alright, but there are still concurrency issues with trying to use Bleak from multiple event loops simultaneously.

Looked at this some more, it seems like the only real limitation is that bluetoothd sometimes doesn't reply when it gets requests from multiple clients at the same time. So the limitations to using Bleak from multiple event loops simultaneously may be exactly the same as the limitations to using Bleak from multiple independent processes simultaneously.

The remaining fix there seems like it would be to add timeout and/or retry capability to MessageBus.call(), for the cases where bluetoothd sees two requests at roughly the same time from two sender addresses, and only responds to one of them.

Let me know if you like the idea of the manager-per-event-loop better, and I'll clean that branch and submit it instead.

@dlech
Copy link
Collaborator

dlech commented Sep 26, 2022

My first instinct is to say that the per-event-loop manager object seems like the most straight-forward solution. (Could we use the contextvars for this?)

The remaining fix there seems like it would be to add timeout and/or retry capability to MessageBus.call(), for the cases where bluetoothd sees two requests at roughly the same time from two sender addresses, and only responds to one of them.

This seems like something that should be fixed in BlueZ rather than trying to hack around it.

@projectgus
Copy link
Contributor Author

My first instinct is to say that the per-event-loop manager object seems like the most straight-forward solution.

Good point, and now that I understand parallel access is not a limitation on the Bleak side then I agree. Will close this PR and submit another.

(Could we use the contextvars for this?)

Good suggestion, and I was not aware of this feature so thanks for showing me something useful! However, after looking at the docs I don't think so: in asyncio the context is applied per-execution not across an entire event loop.

The remaining fix there seems like it would be to add timeout and/or retry capability to MessageBus.call(), for the cases where bluetoothd sees two requests at roughly the same time from two sender addresses, and only responds to one of them.

This seems like something that should be fixed in BlueZ rather than trying to hack around it.

That's fair. I would suggest it's still valuable to have an RPC timeout on the Bleak side, as the current implementation will hang indefinitely on "await future" if bluez doesn't reply for any reason. Then at least Bleak can push the error back to the caller, with a clear "bluez isn't responding" error message rather than a mystery hang - similar philosophy to the way that the current implementation calls assert_reply(reply) to check for a valid reply, even though in an ideal world Bluez will always send a valid reply! 😁

That said, I think the best way to do this would be to patch dbus-fast with an optional timeout argument on call() and then consider setting that parameter from the Bleak side after it's available. Will see if I'm inspired enough to submit a patch. :)

@projectgus projectgus closed this Sep 26, 2022
@projectgus projectgus deleted the bugfix/bluez_manager_new_event_loop branch September 27, 2022 03:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants