From 79ad3630569ce7ca22a7e905cdf70c3f46e52b86 Mon Sep 17 00:00:00 2001 From: Philipp Trenz Date: Fri, 8 May 2026 14:13:48 +0200 Subject: [PATCH 1/3] Fix races and memory leak in ReceptionStatusAwaiter, assuming a single awaiter; resolves flexiblepower/s2-python#153 --- src/s2python/reception_status_awaiter.py | 44 +++++++++++++----------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/src/s2python/reception_status_awaiter.py b/src/s2python/reception_status_awaiter.py index 5c4bd42..d054045 100644 --- a/src/s2python/reception_status_awaiter.py +++ b/src/s2python/reception_status_awaiter.py @@ -13,6 +13,10 @@ class ReceptionStatusAwaiter: + """Notify coroutines waiting for a `ReceptionStatus` by subject message ID. + + Reception statuses are single-consumer: once awaited, they are removed.""" + received: Dict[uuid.UUID, ReceptionStatus] awaiting: Dict[uuid.UUID, asyncio.Event] @@ -23,22 +27,21 @@ def __init__(self) -> None: async def wait_for_reception_status( self, message_id: uuid.UUID, timeout_reception_status: float ) -> ReceptionStatus: - if message_id in self.received: - reception_status = self.received[message_id] - else: - if message_id in self.awaiting: - received_event = self.awaiting[message_id] - else: - received_event = asyncio.Event() - self.awaiting[message_id] = received_event - await asyncio.wait_for(received_event.wait(), timeout_reception_status) - reception_status = self.received[message_id] + existing = self.received.pop(message_id, None) + if existing is not None: + return existing - if message_id in self.awaiting: - del self.awaiting[message_id] + received_event = self.awaiting.get(message_id) + if received_event is None: + received_event = asyncio.Event() + self.awaiting[message_id] = received_event - return reception_status + try: + await asyncio.wait_for(received_event.wait(), timeout_reception_status) + return self.received.pop(message_id) + finally: + self.awaiting.pop(message_id, None) async def receive_reception_status(self, reception_status: ReceptionStatus) -> None: if not isinstance(reception_status, ReceptionStatus): @@ -46,15 +49,16 @@ async def receive_reception_status(self, reception_status: ReceptionStatus) -> N f"Expected a ReceptionStatus but received message {reception_status}" ) - if reception_status.subject_message_id in self.received: + mid = reception_status.subject_message_id + + if mid in self.received: raise RuntimeError( - f"ReceptationStatus for message_subject_id {reception_status.subject_message_id} has already " - f"been received!" + f"ReceptionStatus for message_subject_id {mid} has already been received!" ) - self.received[reception_status.subject_message_id] = reception_status - awaiting = self.awaiting.get(reception_status.subject_message_id) + self.received[mid] = reception_status - if awaiting: + awaiting = self.awaiting.get(mid) + if awaiting is not None: awaiting.set() - del self.awaiting[reception_status.subject_message_id] + self.awaiting.pop(mid, None) From a56d5c741686b12fab7db558a0f8f9a2dc1e9a93 Mon Sep 17 00:00:00 2001 From: Philipp Trenz Date: Fri, 8 May 2026 14:23:17 +0200 Subject: [PATCH 2/3] test: align ReceptionStatusAwaiter tests with single-consumer behavior --- tests/unit/reception_status_awaiter_test.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/unit/reception_status_awaiter_test.py b/tests/unit/reception_status_awaiter_test.py index fb06630..c6cb382 100644 --- a/tests/unit/reception_status_awaiter_test.py +++ b/tests/unit/reception_status_awaiter_test.py @@ -83,10 +83,7 @@ async def test__wait_for_reception_status__multiple_receive_while_waiting(self): should_be_waiting_still_1 = not wait_task_1.done() should_be_waiting_still_2 = not wait_task_2.done() await awaiter.receive_reception_status(s2_reception_status) - await wait_task_1 - await wait_task_2 - received_s2_reception_status_1 = wait_task_1.result() - received_s2_reception_status_2 = wait_task_2.result() + results = await asyncio.gather(wait_task_1, wait_task_2, return_exceptions=True) # Assert expected_s2_reception_status = ReceptionStatus( # pyright: ignore[reportCallIssue] @@ -95,8 +92,14 @@ async def test__wait_for_reception_status__multiple_receive_while_waiting(self): self.assertTrue(should_be_waiting_still_1) self.assertTrue(should_be_waiting_still_2) - self.assertEqual(expected_s2_reception_status, received_s2_reception_status_1) - self.assertEqual(expected_s2_reception_status, received_s2_reception_status_2) + + successful_results = [result for result in results if not isinstance(result, Exception)] + exception_results = [result for result in results if isinstance(result, Exception)] + + self.assertEqual(1, len(successful_results)) + self.assertEqual(1, len(exception_results)) + self.assertEqual(expected_s2_reception_status, successful_results[0]) + self.assertIsInstance(exception_results[0], TimeoutError) async def test__receive_reception_status__wrong_message(self): # Arrange From c64730f4a19f585b8d8c4787c767ca986a67c574 Mon Sep 17 00:00:00 2001 From: Philipp Trenz Date: Fri, 8 May 2026 16:41:15 +0200 Subject: [PATCH 3/3] handle asyncio TimeoutError compatibility in awaiter test and async connection --- src/s2python/connection/async_/connection.py | 2 +- tests/unit/reception_status_awaiter_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/s2python/connection/async_/connection.py b/src/s2python/connection/async_/connection.py index 9f225c8..d5c4cdd 100644 --- a/src/s2python/connection/async_/connection.py +++ b/src/s2python/connection/async_/connection.py @@ -247,7 +247,7 @@ async def send_msg_and_await_reception_status( if reception_status_task in done: try: reception_status = await reception_status_task - except TimeoutError: + except (TimeoutError, asyncio.TimeoutError): logger.error("Did not receive a reception status on time for %s", s2_msg.message_id) self._stop_event.set() raise diff --git a/tests/unit/reception_status_awaiter_test.py b/tests/unit/reception_status_awaiter_test.py index c6cb382..5e27c83 100644 --- a/tests/unit/reception_status_awaiter_test.py +++ b/tests/unit/reception_status_awaiter_test.py @@ -99,7 +99,7 @@ async def test__wait_for_reception_status__multiple_receive_while_waiting(self): self.assertEqual(1, len(successful_results)) self.assertEqual(1, len(exception_results)) self.assertEqual(expected_s2_reception_status, successful_results[0]) - self.assertIsInstance(exception_results[0], TimeoutError) + self.assertIsInstance(exception_results[0], asyncio.TimeoutError) async def test__receive_reception_status__wrong_message(self): # Arrange