Skip to content

Commit

Permalink
Route messages during executor shutdown (#380)
Browse files Browse the repository at this point in the history
* Switch from waiting for futures to manually running the router

* Remove now-unused machinery

* Update traits_futures/traits_executor.py

* Remove the 'unlink' step
  • Loading branch information
mdickinson committed Jul 8, 2021
1 parent 077e8b9 commit 525fb31
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 47 deletions.
6 changes: 0 additions & 6 deletions docs/source/guide/intro.rst
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,6 @@ method performs the following tasks, in order:

* Moves the executor to |STOPPING| state.
* Requests cancellation of all waiting or executing background tasks.
* Unlinks all background tasks from their associated futures: the
futures will receive no further updates after |shutdown| returns.
* Waits for all background tasks to complete.
* Shuts down the worker pool (if that worker pool is owned by the executor).
* Moves the executor to |STOPPED| state.
Expand All @@ -302,10 +300,6 @@ If called on an executor in |STOPPED| state, |shutdown| simply returns
without taking any action. If called on an executor in |STOPPING| state,
any of the above actions that have not already been taken will be taken.

Note that because of the unlinking of the background tasks and their
associated futures, background tasks that have been cancelled will leave their
associated futures in |CANCELLING| state. Those futures will never reach
|CANCELLED| state, even under a running event loop.

Shutdown with a timeout
~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
6 changes: 3 additions & 3 deletions traits_futures/tests/traits_executor_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,15 @@ def test_shutdown_does_nothing_if_stopped(self):
def test_shutdown_cancels_running_futures(self):
future = submit_call(self.executor, pow, 3, 5)
self.executor.shutdown(timeout=SAFETY_TIMEOUT)
self.assertEqual(future.state, CANCELLING)
self.assertEqual(future.state, CANCELLED)
self.assertTrue(self.executor.stopped)

def test_no_future_updates_after_shutdown(self):
future = submit_call(self.executor, pow, 3, 5)
self.executor.shutdown(timeout=SAFETY_TIMEOUT)
self.assertEqual(future.state, CANCELLING)
self.assertEqual(future.state, CANCELLED)
self.exercise_event_loop()
self.assertEqual(future.state, CANCELLING)
self.assertEqual(future.state, CANCELLED)

def test_shutdown_goes_through_stopping_state(self):
self.executor.shutdown(timeout=SAFETY_TIMEOUT)
Expand Down
55 changes: 21 additions & 34 deletions traits_futures/traits_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,10 @@ def submit(self, task):
background_task_wrapper = BackgroundTaskWrapper(
runner, sender, cancel_event.is_set
)
cf_future = self._worker_pool.submit(background_task_wrapper)
self._worker_pool.submit(background_task_wrapper)

future_wrapper = FutureWrapper(
future=future,
cf_future=cf_future,
receiver=receiver,
)
self._wrappers.add(future_wrapper)
Expand Down Expand Up @@ -375,41 +374,29 @@ def shutdown(self, *, timeout=None):
self._initiate_stop()
if self._internal_state == STOPPING:
self._unlink_tasks()
if self._wait_for_tasks(timeout):
self._terminate()
else:
raise RuntimeError(
"Shutdown timed out; "
"f{len(self._wrappers)} tasks still running"
)

# State transitions #######################################################
assert self._internal_state == _TERMINATING

def _wait_for_tasks(self, timeout):
"""
Wait for concurrent.futures futures associated to pending tasks.
if self._have_message_router:
# Route messages until either all futures are complete, or
# timeout.
try:
self._message_router.route_until(
lambda: not self._wrappers,
timeout=timeout,
)
except RuntimeError as exc:
# Re-raise with a more targeted error message.
raise RuntimeError(
"Shutdown timed out; "
"f{len(self._wrappers)} tasks still running"
) from exc

Returns
-------
success : bool
True if all background tasks completed within the given timeout.
False if some background tasks were still running at timeout.
"""
cf_futures = [wrapper.cf_future for wrapper in self._wrappers]
logger.debug(f"{self} waiting for {len(cf_futures)} background tasks")
done, not_done = concurrent.futures.wait(cf_futures, timeout=timeout)
logger.debug(
f"{self} done waiting: {len(done)} tasks completed, "
f"{len(not_done)} tasks still running"
)
assert not self._wrappers

# Remove wrappers for completed futures.
done_wrappers = {
wrapper for wrapper in self._wrappers if wrapper.cf_future in done
}
self._wrappers -= done_wrappers
self._terminate()

return not not_done
# State transitions #######################################################

def _stop_router(self):
"""
Expand Down Expand Up @@ -505,8 +492,6 @@ def _unlink_tasks(self):
State: STOPPING -> _TERMINATING
"""
if self._internal_state == STOPPING:
self._stop_router()
self._close_context()
self._internal_state = _TERMINATING
else:
raise _StateTransitionError(
Expand All @@ -522,6 +507,8 @@ def _terminate(self):
State: _TERMINATING -> STOPPED
"""
if self._internal_state == _TERMINATING:
self._stop_router()
self._close_context()
self._shutdown_worker_pool()
self._internal_state = STOPPED
else:
Expand Down
4 changes: 0 additions & 4 deletions traits_futures/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
These are used by the TraitsExecutor machinery.
"""

import concurrent.futures
import logging

from traits.api import Bool, HasStrictTraits, HasTraits, Instance, observe
Expand Down Expand Up @@ -57,9 +56,6 @@ class FutureWrapper(HasStrictTraits):
#: The Traits Futures future being wrapped
future = Instance(IFuture)

#: The concurrent.futures future associated to the background task.
cf_future = Instance(concurrent.futures.Future)

#: Object that receives messages from the background task.
receiver = Instance(HasTraits)

Expand Down

0 comments on commit 525fb31

Please sign in to comment.