Skip to content

Commit

Permalink
test: Test signal with multiple readers
Browse files Browse the repository at this point in the history
This checks that messages are properly rewinded between readers.
  • Loading branch information
igo95862 committed Oct 28, 2023
1 parent 65b257d commit dda8224
Showing 1 changed file with 27 additions and 0 deletions.
27 changes: 27 additions & 0 deletions test/test_sd_bus_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,33 @@ async def catch_anywhere_oneshot_local(
timeout=1,
)

async def test_signal_multiple_readers(self) -> None:
test_object, test_object_connection = initialize_object()

loop = get_running_loop()

test_tuple = ('sgfsretg', 'asd')

async def reader_one() -> Tuple[str, str]:
async for x in test_object_connection.test_signal.catch():
return test_tuple

raise RuntimeError

async def reader_two() -> Tuple[str, str]:
async for x in test_object_connection.test_signal.catch():
return test_tuple

raise RuntimeError

t1 = loop.create_task(reader_one())
t2 = loop.create_task(reader_two())

loop.call_at(0, test_object.test_signal.emit, test_tuple)

self.assertEqual(test_tuple, await wait_for(t1, timeout=1))
self.assertEqual(test_tuple, await wait_for(t2, timeout=1))

async def test_exceptions(self) -> None:
test_object, test_object_connection = initialize_object()

Expand Down

0 comments on commit dda8224

Please sign in to comment.