Skip to content

Commit

Permalink
Feature: IMessageRouter.route_until (#378)
Browse files Browse the repository at this point in the history
* Stash

* Add GuiTestAssistant.exercise_event_loop

* Remove unused timeout

* Unlink from the event loop on first call to route_until

* Add docstrings

* Add route_until docstrings

* Remove out-of-date comment

* Update traits_futures/i_message_router.py

Co-authored-by: Poruri Sai Rahul <rporuri@enthought.com>

* Update traits_futures/multithreading_router.py

Co-authored-by: Poruri Sai Rahul <rporuri@enthought.com>

* Update traits_futures/multiprocessing_router.py

Co-authored-by: Poruri Sai Rahul <rporuri@enthought.com>

* Add regression test; clean up send_messages_from_worker

* Fix tests to use existing worker_pool machinery; additional test

* Fix re-use of timeout on each get operation

Co-authored-by: Poruri Sai Rahul <rporuri@enthought.com>
  • Loading branch information
mdickinson and Poruri Sai Rahul committed Jul 7, 2021
1 parent 839a775 commit 077e8b9
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 9 deletions.
27 changes: 27 additions & 0 deletions traits_futures/i_message_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,30 @@ def close_pipe(self, receiver):
RuntimeError
If the router is not currently running.
"""

@abc.abstractmethod
def route_until(self, condition, timeout=None):
"""
Manually drive the router until a given condition occurs, or timeout.
This is primarily used as part of a clean shutdown.
Note: this has the side-effect of moving the router from "event loop"
mode to "manual" mode. This mode switch is permanent, in the sense that
after this point, the router will no longer respond to pings: any
messages will need to be processed through this function.
Parameters
----------
condition : callable
Zero-argument callable returning a boolean. When this condition
becomes true, this method will stop routing messages. If the
condition is already true on entry, no messages will be routed.
timeout : float, optional
Maximum number of seconds to route messages for.
Raises
------
RuntimeError
If the condition did not become true before timeout.
"""
65 changes: 61 additions & 4 deletions traits_futures/multiprocessing_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import multiprocessing.managers
import queue
import threading
import time

from traits.api import (
Any,
Expand Down Expand Up @@ -286,8 +287,7 @@ def stop(self):
# reference.
self._process_message_queue = None

self._pingee.disconnect()
self._pingee = None
self._unlink_from_event_loop()

self._local_message_queue = None

Expand Down Expand Up @@ -359,6 +359,52 @@ def close_pipe(self, receiver):
f"{self} closed pipe #{connection_id} with receiver {receiver}"
)

def route_until(self, condition, timeout=None):
"""
Manually drive the router until a given condition occurs, or timeout.
This is primarily used as part of a clean shutdown.
Note: this has the side-effect of moving the router from "event loop"
mode to "manual" mode. This mode switch is permanent, in the sense that
after this point, the router will no longer respond to pings: any
messages will need to be processed through this function.
Parameters
----------
condition : callable
Zero-argument callable returning a boolean. When this condition
becomes true, this method will stop routing messages. If the
condition is already true on entry, no messages will be routed.
timeout : float, optional
Maximum number of seconds to route messages for.
Raises
------
RuntimeError
If the condition did not become true before timeout.
"""
self._unlink_from_event_loop()

if timeout is None:
while not condition():
self._route_message()
else:
end_time = time.monotonic() + timeout
while not condition():
time_remaining = end_time - time.monotonic()
if time_remaining < 0.0:
break
try:
self._route_message(timeout=time_remaining)
except queue.Empty:
break
else:
# Success: condition became true.
return

raise RuntimeError("Timed out waiting for messages")

# Public traits ###########################################################

#: The event loop used to trigger message dispatch.
Expand Down Expand Up @@ -392,8 +438,19 @@ def close_pipe(self, receiver):

# Private methods #########################################################

def _route_message(self):
connection_id, message = self._local_message_queue.get()
def _unlink_from_event_loop(self):
"""
Unlink this router from the event loop.
After this call, the router will no longer react to any pending
tasks on the event loop.
"""
if self._pingee is not None:
self._pingee.disconnect()
self._pingee = None

def _route_message(self, timeout=None):
connection_id, message = self._local_message_queue.get(timeout=timeout)
try:
receiver = self._receivers[connection_id]
except KeyError:
Expand Down
65 changes: 61 additions & 4 deletions traits_futures/multithreading_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import itertools
import logging
import queue
import time

from traits.api import (
Any,
Expand Down Expand Up @@ -236,8 +237,7 @@ def stop(self):
if self._receivers:
logger.warning(f"{self} has {len(self._receivers)} unclosed pipes")

self._pingee.disconnect()
self._pingee = None
self._unlink_from_event_loop()

self._message_queue = None

Expand Down Expand Up @@ -309,6 +309,52 @@ def close_pipe(self, receiver):
f"{self} closed pipe #{connection_id} with receiver {receiver}"
)

def route_until(self, condition, timeout=None):
"""
Manually drive the router until a given condition occurs, or timeout.
This is primarily used as part of a clean shutdown.
Note: this has the side-effect of moving the router from "event loop"
mode to "manual" mode. This mode switch is permanent, in the sense that
after this point, the router will no longer respond to pings: any
messages will need to be processed through this function.
Parameters
----------
condition : callable
Zero-argument callable returning a boolean. When this condition
becomes true, this method will stop routing messages. If the
condition is already true on entry, no messages will be routed.
timeout : float, optional
Maximum number of seconds to route messages for.
Raises
------
RuntimeError
If the condition did not become true before timeout.
"""
self._unlink_from_event_loop()

if timeout is None:
while not condition():
self._route_message()
else:
end_time = time.monotonic() + timeout
while not condition():
time_remaining = end_time - time.monotonic()
if time_remaining < 0.0:
break
try:
self._route_message(timeout=time_remaining)
except queue.Empty:
break
else:
# Success: condition became true.
return

raise RuntimeError("Timed out waiting for messages")

# Public traits ###########################################################

#: The event loop used to trigger message dispatch.
Expand All @@ -333,8 +379,19 @@ def close_pipe(self, receiver):

# Private methods #########################################################

def _route_message(self):
connection_id, message = self._message_queue.get()
def _unlink_from_event_loop(self):
"""
Unlink this router from the event loop.
After this call, the router will no longer react to any pending
tasks on the event loop.
"""
if self._pingee is not None:
self._pingee.disconnect()
self._pingee = None

def _route_message(self, timeout=None):
connection_id, message = self._message_queue.get(timeout=timeout)
try:
receiver = self._receivers[connection_id]
except KeyError:
Expand Down
114 changes: 113 additions & 1 deletion traits_futures/tests/i_message_router_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import contextlib
import logging
import threading
import time

from traits.api import Any, HasStrictTraits, Instance, List, observe, Str

Expand All @@ -26,7 +27,7 @@
SAFETY_TIMEOUT = 10.0


def send_messages(sender, messages):
def send_messages(sender, messages, message_delay=None):
"""
Send a sequence of messages using the given sender.
Expand All @@ -36,10 +37,14 @@ def send_messages(sender, messages):
The sender to use to send messages.
messages : list
List of objects to send.
message_delay : float, optional
If given, the time to sleep before sending each message.
"""
sender.start()
try:
for message in messages:
if message_delay is not None:
time.sleep(message_delay)
sender.send(message)
finally:
sender.stop()
Expand Down Expand Up @@ -284,6 +289,113 @@ def test_sender_restart(self):
with self.assertRaises(RuntimeError):
sender.start()

def test_route_until(self):
# When we've sent messages (e.g., from a background thread), we can
# drive the router manually to receive those messages.

messages = ["abc", "def"]

with self.context.worker_pool() as worker_pool:
with self.started_router() as router:
sender, receiver = router.pipe()
try:
listener = ReceiverListener(receiver=receiver)
worker_pool.submit(send_messages, sender, messages)
router.route_until(
lambda: len(listener.messages) >= len(messages),
timeout=SAFETY_TIMEOUT,
)
finally:
router.close_pipe(receiver)

self.assertEqual(listener.messages, messages)

def test_route_until_without_timeout(self):
# This test is mildly dangerous: if something goes wrong, it could
# hang indefinitely. xref: enthought/traits-futures#310

messages = ["abc"]

with self.context.worker_pool() as worker_pool:
with self.started_router() as router:
sender, receiver = router.pipe()
try:
listener = ReceiverListener(receiver=receiver)

worker_pool.submit(send_messages, sender, messages)

router.route_until(
lambda: len(listener.messages) >= len(messages),
)
finally:
router.close_pipe(receiver)

self.assertEqual(listener.messages, messages)

def test_route_until_timeout(self):
with self.started_router() as router:
start_time = time.monotonic()
with self.assertRaises(RuntimeError):
router.route_until(lambda: False, timeout=0.1)
actual_timeout = time.monotonic() - start_time

self.assertLess(actual_timeout, 1.0)

def test_route_until_timeout_with_repeated_messages(self):
# Check for one plausible variety of implementation bug: re-using
# the original timeout on each message get operation instead of
# re-calculating the time to wait.
with self.context.worker_pool() as worker_pool:
with self.started_router() as router:
sender, receiver = router.pipe()
try:
listener = ReceiverListener(receiver=receiver)

# Send one seconds' worth of messages, but timeout after
# 0.1 seconds.
worker_pool.submit(
send_messages, sender, range(20), message_delay=0.05
)
with self.assertRaises(RuntimeError):
router.route_until(lambda: False, timeout=0.1)
finally:
router.close_pipe(receiver)

# With a timeout of 0.1 and only one message sent every 0.05 seconds,
# we should have got at most 2 messages before timeout. To avoid
# spurious failures due to timing variations, we allow up to 4.
self.assertLessEqual(len(listener.messages), 4)

def test_route_until_second_call_after_timeout(self):
with self.started_router() as router:
with self.assertRaises(RuntimeError):
router.route_until(lambda: False, timeout=0.1)
# Just check that the first call hasn't put the router into
# an unusable state.
router.route_until(lambda: True, timeout=SAFETY_TIMEOUT)

def test_event_loop_after_route_until(self):
# This tests a potentially problematic situation:
# - route_until processes at least one message manually
# - the 'ping' for that message is still on the event loop
# - when the event loop is started, it tries to get that same message
# from the message queue, but no such message exists, so we end
# up blocking forever.
messages = ["abc"]

with self.context.worker_pool() as worker_pool:
with self.started_router() as router:
sender, receiver = router.pipe()
try:
listener = ReceiverListener(receiver=receiver)
worker_pool.submit(send_messages, sender, messages)
router.route_until(
lambda: len(listener.messages) >= len(messages),
)
self.exercise_event_loop()
finally:
router.close_pipe(receiver)

# Helper functions and assertions

@contextlib.contextmanager
Expand Down

0 comments on commit 077e8b9

Please sign in to comment.