Skip to content

Commit

Permalink
Added catch_anywhere to dbus signals
Browse files Browse the repository at this point in the history
Creates an async iterator which yields object path that emitted
signal and signal data.

Can be called from class but requires explicit service name
in that case.
  • Loading branch information
igo95862 committed Jun 26, 2022
1 parent 46642d8 commit e8dcaa0
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 6 deletions.
22 changes: 22 additions & 0 deletions docs/asyncio_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,28 @@ Decorators
Signal objects can also be async iterated directly:
``async for x in something.some_signal``

.. py:method:: catch_anywhere(service_name, bus)
Catch signal independent of path.
Yields tuple of path of the object that emitted signal and signal data.

``async for path, data in something.some_signal.catch_anywhere():``

This method can be called from both an proxy object and class.
However, it cannot be called on local objects and will raise
``NotImplementedError``.

:param str service_name:
Service name of which signals belong to.
Required if called from class. When called from proxy object
the service name of the proxy will be used.

:param str bus:
Optional dbus connection object.
If not passed when called from proxy the bus connected
to proxy will be used or when called from class default
bus will be used.

.. py:method:: emit(args)
Emit a new signal with *args* data.
Expand Down
12 changes: 12 additions & 0 deletions docs/asyncio_quick.rst
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,18 @@ Example: ::

example_object.name_changed.emit('test')

Signals can also be caught from multiple D-Bus objects using
:py:meth:`catch_anywhere` method. The async iterator will yield
the path of the object that emitted the signal and the signal data.

:py:meth:`catch_anywhere` can be called from class but in such case
the service name must be provided.

Example::

async for path, x in ExampleInterface.name_changed('org.example.test'):
print(f"On {path} caught: {x}")

Subclass Overrides
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
53 changes: 51 additions & 2 deletions src/sdbus/dbus_proxy_async_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Generic,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
cast,
Expand All @@ -40,7 +41,8 @@
DbusSingalCommon,
DbusSomethingAsync,
)
from .sd_bus_internals import SdBusMessage
from .dbus_common_funcs import get_default_bus
from .sd_bus_internals import SdBus, SdBusMessage

T = TypeVar('T')

Expand All @@ -52,7 +54,7 @@
class DbusSignalAsync(DbusSomethingAsync, DbusSingalCommon, Generic[T]):

def __get__(self,
obj: DbusInterfaceBaseAsync,
obj: Optional[DbusInterfaceBaseAsync],
obj_class: Optional[Type[DbusInterfaceBaseAsync]] = None,
) -> DbusSignalBinded[T]:
return DbusSignalBinded(self, obj)
Expand Down Expand Up @@ -145,6 +147,53 @@ async def catch(self) -> AsyncGenerator[T, None]:

__aiter__ = catch

async def catch_anywhere(
self,
service_name: Optional[str] = None,
bus: Optional[SdBus] = None,
) -> AsyncGenerator[Tuple[str, T], None]:
if service_name is None:
if self.interface_ref is not None:
interface = self.interface_ref()
assert interface is not None
if interface._remote_service_name is None:
raise NotImplementedError(
'catch_anywhere not implemented for '
'local objects'
)

service_name = interface._remote_service_name
else:
raise ValueError(
'Called catch_anywhere from class '
'but service name was not provided'
)

if bus is None:
if self.interface_ref is not None:
interface = self.interface_ref()
assert interface is not None
assert interface._attached_bus is not None
bus = interface._attached_bus
else:
bus = get_default_bus()

message_queue = await bus.get_signal_queue_async(
service_name,
None,
self.dbus_signal.interface_name,
self.dbus_signal.signal_name,
)

while True:
next_signal_message = await message_queue.get()
signal_path = next_signal_message.path
assert signal_path is not None
yield (
signal_path,
cast(T, next_signal_message.get_contents())
)

def _emit_message(self, args: T) -> None:
assert self.interface_ref is not None, (
"Called method from class?"
Expand Down
77 changes: 73 additions & 4 deletions test/test_sd_bus_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,20 +201,23 @@ class DbusErrorUnmappedLater(DbusFailedError):
dbus_error_name = 'org.example.Nothing'


TEST_SERVICE_NAME = 'org.example.test'


def initialize_object() -> Tuple[TestInterface, TestInterface]:
test_object = TestInterface()
test_object.export_to_dbus('/')

test_object_connection = TestInterface.new_proxy(
"org.example.test", '/')
TEST_SERVICE_NAME, '/')

return test_object, test_object_connection


class TestProxy(IsolatedDbusTestCase):
async def asyncSetUp(self) -> None:
await super().asyncSetUp()
await self.bus.request_name_async("org.example.test", 0)
await self.bus.request_name_async(TEST_SERVICE_NAME, 0)

async def test_method_kwargs(self) -> None:
test_object, test_object_connection = initialize_object()
Expand Down Expand Up @@ -304,7 +307,7 @@ def test_property_setter(self, var: str) -> None:
self.assertEqual(await test_subclass.test_int(), 2)

test_subclass_connection = TestInheritnce.new_proxy(
"org.example.test", '/subclass', self.bus)
TEST_SERVICE_NAME, '/subclass', self.bus)

self.assertEqual(await test_subclass_connection.test_int(), 2)

Expand Down Expand Up @@ -378,6 +381,72 @@ async def test_signal(self) -> None:

self.assertEqual(test_tuple, await wait_for(q.get(), timeout=1))

async def test_signal_catch_anywhere(self) -> None:
test_object, test_object_connection = initialize_object()

loop = get_running_loop()

test_tuple = ('sgfsretg', 'asd')

with self.subTest('Catch anywhere over D-Bus object'):
async def catch_anywhere_oneshot_dbus(
) -> Tuple[str, Tuple[str, str]]:
async for x in test_object_connection.test_signal\
.catch_anywhere():
return x

raise RuntimeError

catch_anywhere_over_dbus_task \
= loop.create_task(catch_anywhere_oneshot_dbus())

await sleep(0)

test_object.test_signal.emit(test_tuple)

self.assertEqual(
('/', test_tuple),
await wait_for(catch_anywhere_over_dbus_task, timeout=1),
)

with self.subTest('Catch anywhere over D-Bus class'):
async def catch_anywhere_oneshot_from_class(
) -> Tuple[str, Tuple[str, str]]:
async for x in TestInterface.test_signal.catch_anywhere(
TEST_SERVICE_NAME, self.bus):
return x

raise RuntimeError

catch_anywhere_from_class_task \
= loop.create_task(catch_anywhere_oneshot_from_class())

await sleep(0)

test_object.test_signal.emit(test_tuple)

self.assertEqual(
('/', test_tuple),
await wait_for(catch_anywhere_from_class_task, timeout=1),
)

with self.subTest('Catch anywhere over local object'):
async def catch_anywhere_oneshot_local(
) -> Tuple[str, Tuple[str, str]]:
async for x in test_object.test_signal.catch_anywhere():
return x

raise RuntimeError

catch_anywhere_over_local_task \
= loop.create_task(catch_anywhere_oneshot_local())

with self.assertRaises(NotImplementedError):
await wait_for(
catch_anywhere_over_local_task,
timeout=1,
)

async def test_exceptions(self) -> None:
test_object, test_object_connection = initialize_object()

Expand Down Expand Up @@ -555,7 +624,7 @@ async def test_singal_queue_wildcard_match(self) -> None:
test_object, test_object_connection = initialize_object()

message_queue = await self.bus.get_signal_queue_async(
'org.example.test',
TEST_SERVICE_NAME,
None, None, None)

test_object.test_signal.emit(('test', 'signal'))
Expand Down

1 comment on commit e8dcaa0

@igo95862
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asmello you might make use of this.

Please sign in to comment.