Skip to content

Commit

Permalink
Fix behaviour of route_until with timeout and queued messages (#419)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdickinson committed Jul 17, 2021
1 parent 6bd09ef commit de7c81c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
5 changes: 2 additions & 3 deletions traits_futures/multiprocessing_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,9 @@ def _route_message(self, *, block=False, timeout=None):
queue.Empty
If no message arrives within the given timeout.
"""
if block and timeout is not None and timeout <= 0.0:
raise queue.Empty
connection_id, message = self._local_message_queue.get(
block=block, timeout=timeout
block=block,
timeout=None if timeout is None else max(timeout, 0.0),
)
try:
receiver = self._receivers[connection_id]
Expand Down
5 changes: 2 additions & 3 deletions traits_futures/multithreading_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,9 @@ def _route_message(self, *, block=False, timeout=None):
queue.Empty
If no message arrives within the given timeout.
"""
if block and timeout is not None and timeout <= 0.0:
raise queue.Empty
connection_id, message = self._message_queue.get(
block=block, timeout=timeout
block=block,
timeout=None if timeout is None else max(timeout, 0.0),
)
try:
receiver = self._receivers[connection_id]
Expand Down
26 changes: 26 additions & 0 deletions traits_futures/tests/i_message_router_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,32 @@ def test_route_until_second_call_after_timeout(self):
# an unusable state.
router.route_until(lambda: True, timeout=SAFETY_TIMEOUT)

def test_route_until_timeout_with_queued_messages(self):
# Even with a timeout of 0.0, route_until shouldn't fail if
# there are sufficient messages already queued to satisfy
# the condition.
messages = list(range(10))

with self.started_router() as router:
sender, receiver = router.pipe()
listener = ReceiverListener(receiver=receiver)
try:
with self.context.worker_pool() as worker_pool:
worker_pool.submit(send_messages, sender, messages)

# At this point we've closed the worker pool, so all
# background jobs have completed and all messages are
# already queued. route_until should be able to
# process all of them, regardless of timeout.
router.route_until(
lambda: len(listener.messages) >= len(messages),
timeout=0.0,
)
finally:
router.close_pipe(receiver)

self.assertEqual(listener.messages, messages)

def test_event_loop_after_route_until(self):
# This tests a potentially problematic situation:
# - route_until processes at least one message manually
Expand Down

0 comments on commit de7c81c

Please sign in to comment.