Skip to content

Commit

Permalink
Remove the blocking wait during message routing (#413)
Browse files Browse the repository at this point in the history
* Remove the blocking wait during message routing

* Make time_remaining computation explicit to avoid confusion
  • Loading branch information
mdickinson committed Jul 16, 2021
1 parent 61a37a7 commit 018b8ab
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 29 deletions.
48 changes: 32 additions & 16 deletions traits_futures/multiprocessing_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,22 +383,15 @@ def route_until(self, condition, timeout=None):

if timeout is None:
while not condition():
self._route_message()
self._route_message(block=True)
else:
end_time = time.monotonic() + timeout
while not condition():
time_remaining = end_time - time.monotonic()
if time_remaining < 0.0:
break
try:
self._route_message(timeout=time_remaining)
except queue.Empty:
break
else:
# Success: condition became true.
return

raise RuntimeError("Timed out waiting for messages")
try:
while not condition():
time_remaining = end_time - time.monotonic()
self._route_message(block=True, timeout=time_remaining)
except queue.Empty:
raise RuntimeError("Timed out waiting for messages")

# Public traits ###########################################################

Expand Down Expand Up @@ -468,8 +461,31 @@ def _unlink_from_event_loop(self):
self._pingee.disconnect()
self._linked = False

def _route_message(self, timeout=None):
connection_id, message = self._local_message_queue.get(timeout=timeout)
def _route_message(self, *, block=False, timeout=None):
"""
Get and dispatch a message from the local message queue.
Parameters
----------
block : bool, optional
If True, block until either a message arrives or until timeout. If
False (the default), we expect a message to already be present in
the queue.
timeout : float, optional
Maximum time to wait for a message to arrive. If no timeout
is given and ``block`` is True, wait indefinitely. If ``block``
is False, this parameter is ignored.
Raises
------
queue.Empty
If no message arrives within the given timeout.
"""
if block and timeout is not None and timeout <= 0.0:
raise queue.Empty
connection_id, message = self._local_message_queue.get(
block=block, timeout=timeout
)
try:
receiver = self._receivers[connection_id]
except KeyError:
Expand Down
42 changes: 29 additions & 13 deletions traits_futures/multithreading_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,22 +336,15 @@ def route_until(self, condition, timeout=None):

if timeout is None:
while not condition():
self._route_message()
self._route_message(block=True)
else:
end_time = time.monotonic() + timeout
while not condition():
time_remaining = end_time - time.monotonic()
if time_remaining < 0.0:
break
try:
self._route_message(timeout=time_remaining)
time_remaining = end_time - time.monotonic()
self._route_message(block=True, timeout=time_remaining)
except queue.Empty:
break
else:
# Success: condition became true.
return

raise RuntimeError("Timed out waiting for messages")
raise RuntimeError("Timed out waiting for messages")

# Public traits ###########################################################

Expand Down Expand Up @@ -412,8 +405,31 @@ def _unlink_from_event_loop(self):
self._pingee.disconnect()
self._linked = False

def _route_message(self, timeout=None):
connection_id, message = self._message_queue.get(timeout=timeout)
def _route_message(self, *, block=False, timeout=None):
"""
Get and dispatch a message from the local message queue.
Parameters
----------
block : bool, optional
If True, block until either a message arrives or until timeout. If
False (the default), we expect a message to already be present in
the queue.
timeout : float, optional
Maximum time to wait for a message to arrive. If no timeout
is given and ``block`` is True, wait indefinitely. If ``block``
is False, this parameter is ignored.
Raises
------
queue.Empty
If no message arrives within the given timeout.
"""
if block and timeout is not None and timeout <= 0.0:
raise queue.Empty
connection_id, message = self._message_queue.get(
block=block, timeout=timeout
)
try:
receiver = self._receivers[connection_id]
except KeyError:
Expand Down

0 comments on commit 018b8ab

Please sign in to comment.