Skip to content

Commit

Permalink
Refactor traits_executor (#381)
Browse files Browse the repository at this point in the history
* Switch from waiting for futures to manually running the router

* Remove now-unused machinery

* Update traits_futures/traits_executor.py

* Remove the 'unlink' step

* Refactor traits_executor

* Remove an unnecessary try/except
  • Loading branch information
mdickinson committed Jul 8, 2021
1 parent 68ed49f commit 767359e
Showing 1 changed file with 100 additions and 120 deletions.
220 changes: 100 additions & 120 deletions traits_futures/traits_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,19 +303,15 @@ def submit(self, task):
cancel_event = self._context.event()

sender, receiver = self._message_router.pipe()
try:
runner = task.background_task()
future = task.future()
future._executor_initialized(cancel_event.set)
except Exception:
self._message_router.close_pipe(receiver)
raise
runner = task.background_task()
future = task.future()

background_task_wrapper = BackgroundTaskWrapper(
runner, sender, cancel_event.is_set
)
self._worker_pool.submit(background_task_wrapper)

future._executor_initialized(cancel_event.set)
future_wrapper = FutureWrapper(
future=future,
receiver=receiver,
Expand All @@ -325,31 +321,16 @@ def submit(self, task):
logger.debug(f"{self} created future {future}")
return future

def stop(self):
"""
Initiate stop: cancel existing jobs and prevent new ones.
"""
if not self.running:
raise RuntimeError("Executor is not currently running.")

self._initiate_stop()
if not self._wrappers:
self._complete_stop()

def shutdown(self, *, timeout=None):
"""
Shut this executor down, abandoning all currently executing futures.
All currently executing futures that are cancellable will be cancelled.
Wait for all tasks to complete and then shut this executor down.
This method is blocking: it waits for associated background tasks
to complete, and if this executor owns its worker pool, it waits
for the worker pool to be shut down.
No further updates to a future's state will occur after this method
is called. In particular, any future that's cancelled by calling this
method will remain in CANCELLING state, and its state will never be
updated to CANCELLED.
All waiting or executing background tasks that are cancellable will be
cancelled, and then this executor will wait for all tasks to complete.
If a timeout is given and that timeout is reached before all tasks
complete, then :exc:`RuntimeError` will be raised and the executor will
remain in :data:`~.STOPPING` state. Otherwise, on return from this
method the executor will be in :data:`~.STOPPED` state
This method may be called at any time. If called on an executor
that's already stopped, this method does nothing.
Expand All @@ -364,94 +345,68 @@ def shutdown(self, *, timeout=None):
------
RuntimeError
If a timeout is given, and the background tasks fail to complete
within the given timeout. In this case the executor will remain
in STOPPING state.
within the given timeout.
"""
if self.stopped:
return

if self.running:
if self._internal_state == RUNNING:
self._initiate_stop()
if self._internal_state == STOPPING:
self._unlink_tasks()
self._initiate_manual_stop()

assert self._internal_state == _TERMINATING

if self._have_message_router:
# Route messages until either all futures are complete, or
# timeout.
try:
self._message_router.route_until(
lambda: not self._wrappers,
timeout=timeout,
)
except RuntimeError as exc:
# Re-raise with a more targeted error message.
# Re-raise with a more user-friendly error message.
raise RuntimeError(
"Shutdown timed out; "
"f{len(self._wrappers)} tasks still running"
) from exc

assert not self._wrappers
self._complete_stop()

self._terminate()

# State transitions #######################################################

def _stop_router(self):
def stop(self):
"""
Stop the message router.
Initiate stop: cancel existing jobs and prevent new ones.
"""
if self._have_message_router:
logger.debug(f"{self} stopping message router")
for wrapper in self._wrappers:
self._message_router.close_pipe(wrapper.receiver)
self._message_router.stop()
self._message_router = None
self._have_message_router = False
logger.debug(f"{self} message router stopped")
if not self.running:
raise RuntimeError("Executor is not currently running.")

def _close_context(self):
"""
Close the context, if we own it.
"""
if self._own_context:
logger.debug(f"{self} closing context")
self._context.close()
logger.debug(f"{self} context closed")
self._context = None
self._initiate_stop()

def _shutdown_worker_pool(self):
"""
Shut down the worker pool if we own it.
"""
if self._own_worker_pool:
logger.debug(f"{self} shutting down owned worker pool")
# The worker pool shutdown call is potentially blocking, but we
# should only ever reach this line when all the background tasks
# are complete, so in practice it should never block for long.
self._worker_pool.shutdown()
logger.debug(f"{self} worker pool is now shut down")
self._worker_pool = None
# If there are no tasks pending we can complete the stop immediately;
# otherwise, we check as each task completes using the observer below.
if not self._wrappers:
self._complete_stop()

def _cancel_tasks(self):
"""
Cancel all currently running tasks.
"""
logger.debug(f"{self} cancelling incomplete tasks")
cancel_count = 0
for wrapper in self._wrappers:
future = wrapper.future
if future.cancellable:
future.cancel()
cancel_count += 1
logger.debug(f"{self} cancelled {cancel_count} tasks")
@observe("_wrappers:items:done")
def _finalize_task_and_check_for_stop(self, event):
wrapper = event.object
self._message_router.close_pipe(wrapper.receiver)
self._wrappers.remove(wrapper)
logger.debug(
f"{self} future {wrapper.future} done ({wrapper.future.state})"
)
# If we're in STOPPING state and the last future has just exited,
# clean up and stop.
if self._internal_state == STOPPING:
if not self._wrappers:
self._complete_stop()

# State transitions #######################################################

def _initiate_stop(self):
"""
Prevent new tasks from being submitted and cancel existing tasks.
State: RUNNING -> STOPPING
Internal state: RUNNING -> STOPPING
"""
if self._internal_state == RUNNING:
self._cancel_tasks()
Expand All @@ -467,9 +422,15 @@ def _complete_stop(self):
"""
Move to stopped state when all remaining futures have completed.
State: STOPPING -> STOPPED
Internal state:
* STOPPING -> STOPPED
* _TERMINATING -> STOPPED
"""
if self._internal_state == STOPPING:
if self._internal_state in {STOPPING, _TERMINATING}:
# We should only get here once all futures have completed.
assert not self._wrappers

self._stop_router()
self._close_context()
self._shutdown_worker_pool()
Expand All @@ -481,15 +442,16 @@ def _complete_stop(self):
)
)

def _unlink_tasks(self):
def _initiate_manual_stop(self):
"""
Unlink background tasks from their corresponding futures.
Move into manual stopping mode (_TERMINATING internal state).
This doesn't stop the background tasks from executing, but after this
method is called, the corresponding futures will no longer receive any
state updates in response to messages sent by the background task.
This differs from the STOPPING internal state in its handling of
completed futures: an executor in STOPPING state moves to STOPPED state
automatically when the last task completes. An executor in _TERMINATING
state must complete the stop manually.
State: STOPPING -> _TERMINATING
Internal state: STOPPING -> _TERMINATING
"""
if self._internal_state == STOPPING:
self._internal_state = _TERMINATING
Expand All @@ -500,23 +462,54 @@ def _unlink_tasks(self):
)
)

def _terminate(self):
# Private methods #########################################################

def _cancel_tasks(self):
"""
Cancel all currently running tasks.
"""
Complete executor shutdown.
logger.debug(f"{self} cancelling incomplete tasks")
cancel_count = 0
for wrapper in self._wrappers:
future = wrapper.future
if future.cancellable:
future.cancel()
cancel_count += 1
logger.debug(f"{self} cancelled {cancel_count} tasks")

State: _TERMINATING -> STOPPED
def _stop_router(self):
"""
if self._internal_state == _TERMINATING:
self._stop_router()
self._close_context()
self._shutdown_worker_pool()
self._internal_state = STOPPED
else:
raise _StateTransitionError(
"Unexpected state transition in internal state {!r}".format(
self._internal_state
)
)
Stop the message router.
"""
if self._have_message_router:
logger.debug(f"{self} stopping message router")
self._message_router.stop()
self._message_router = None
self._have_message_router = False
logger.debug(f"{self} message router stopped")

def _close_context(self):
"""
Close the context, if we own it.
"""
if self._own_context:
logger.debug(f"{self} closing context")
self._context.close()
logger.debug(f"{self} context closed")
self._context = None

def _shutdown_worker_pool(self):
"""
Shut down the worker pool, if we own it.
"""
if self._own_worker_pool:
logger.debug(f"{self} shutting down owned worker pool")
# The worker pool shutdown call is potentially blocking, but we
# should only ever reach this line when all the background tasks
# are complete, so in practice it should never block for long.
self._worker_pool.shutdown()
logger.debug(f"{self} worker pool is now shut down")
self._worker_pool = None

# Private traits ##########################################################

Expand Down Expand Up @@ -606,16 +599,3 @@ def __context_default(self):
context = MultithreadingContext()
self._own_context = True
return context

@observe("_wrappers:items:done")
def _untrack_future(self, event):
wrapper = event.object
self._message_router.close_pipe(wrapper.receiver)
self._wrappers.remove(wrapper)
logger.debug(
f"{self} future {wrapper.future} done ({wrapper.future.state})"
)
# If we're in STOPPING state and the last future has just exited,
# go to STOPPED state.
if self._internal_state == STOPPING and not self._wrappers:
self._complete_stop()

0 comments on commit 767359e

Please sign in to comment.