Skip to content

Commit

Permalink
Feature: executor.shutdown (#334)
Browse files Browse the repository at this point in the history
* Attempt to add a 'shutdown' method to the TraitsExecutor

* Fix Pinger / Pingee lifetime issues

* Style fix to prevent black from reformatting

* Remove debugging notes

* Update docstrings for accuracy.

* Implement shutdown

* Another TODO: fix missing test.

* Don't call stop on a STOPPING executor

* Revert changes unrelated to this PR

* Revert more changes unrelated to this PR (and moved into #335)

* Remove unused import

* Tidying: remove out-of-date comment, spurious blank line

* Fix use of timeout, add a test

* Remove TODO comments (transferred to the PR description)

* Comment out timeout test for now; add timeouts to other shutdown calls

* Move executor states to their own module

* Fix up documentation for move of executor states

* Add internal states to TraitsExecutor

* Rework state transitions

* Fix test_shutdown_timeout

* Allow shutdown to be called from any state

* Refactor to remove duplication and an unnecessary internal state

* update docstring for long_running_task

* Remove redundant wait call, clarify workings of test

* Bring in changes from #334

* Remove code specific to the new 'shutdown' method

* Fix references to RUNNING, STOPPING and STOPPED in the documentation

* Update user guide to explain shutdown

* Add explanatory comment to worker pool shutdown

* Fix misleading method name

* Update a docstring to follow NumPyDoc style

* Minor code cleanup in _wait_for_tasks

* Close the context at the same time as shutting down the router

* Improve description of the new internal state

* Language change: talk about unlinking rather than disconnecting

* Separate methods for closing context and stopping router

* Add note about testing

* Documentation wording improvements

* Updates to match #334

* Fix copy-and-paste bug in logging message

* Fix logic in 'stopped' check

* Fix inconsistent log message: all messages now include {self}

* Make the shutdown argument keyword-only
  • Loading branch information
mdickinson committed Jul 6, 2021
1 parent f65964a commit bfb5e0c
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 19 deletions.
103 changes: 86 additions & 17 deletions docs/source/guide/intro.rst
Original file line number Diff line number Diff line change
Expand Up @@ -253,31 +253,98 @@ occurs after the |cancel| call.
Stopping the executor
---------------------

Like the various future classes, a |TraitsExecutor| also has a state trait, of
type |ExecutorState|. This state is one of the following:
To avoid unexpected side-effects during Python process finalization, it's
recommended to shut down a running |TraitsExecutor| explicitly prior to process
exit. Similarly, when writing a unit test that makes use of a |TraitsExecutor|,
that executor should be shut down at test exit, to avoid potential for
unexpected interactions with other tests.

This section describes the two methods available for executor shutdown:
|shutdown| and |stop|.

Executor states
~~~~~~~~~~~~~~~

Like the various future classes, a |TraitsExecutor| also has a |state| trait,
of type |ExecutorState|. This state is one of the following:

|RUNNING|
The executor is running and accepting task submissions.
The executor is running and accepting task submissions. This is the state
of a newly-created executor.
|STOPPING|
The user has requested that the executor stop, but there are still
running futures associated with this executor. An executor in |STOPPING|
state will not accept new task submissions.
Shutdown has been initiated or partially completed, but there are still
running background tasks associated with this executor. An executor in
|STOPPING| state will not accept new task submissions.
|STOPPED|
The executor has stopped, and all futures associated with this
executor have finished. An executor in this state cannot be
used to submit new tasks, and cannot be restarted.
The executor has stopped, all resources associated with the executor have
been released, and all background tasks associated with this executor have
finished. An executor in |STOPPED| state will not accept new task
submissions, and cannot be restarted.

Executor shutdown
~~~~~~~~~~~~~~~~~

Once a |TraitsExecutor| object is no longer needed (for example at application
shutdown time), its |stop| method may be called. This cancels all current
executing or waiting futures, puts the executor into |STOPPING| state and then
returns.
shutdown time), it can be shut down via its |shutdown| method. This method is
blocking: it waits for all of the background tasks to complete before
returning. In more detail, if called on a running executor, the |shutdown|
method performs the following tasks, in order:

* Moves the executor to |STOPPING| state.
* Requests cancellation of all waiting or executing background tasks.
* Unlinks all background tasks from their associated futures: the
futures will receive no further updates after |shutdown| returns.
* Waits for all background tasks to complete.
* Shuts down the worker pool (if that worker pool is owned by the executor).
* Moves the executor to |STOPPED| state.

If called on an executor in |STOPPED| state, |shutdown| simply returns
without taking any action. If called on an executor in |STOPPING| state,
any of the above actions that have not already been taken will be taken.

Note that because of the unlinking of the background tasks and their
associated futures, background tasks that have been cancelled will leave their
associated futures in |CANCELLING| state. Those futures will never reach
|CANCELLED| state, even under a running event loop.

Shutdown with a timeout
~~~~~~~~~~~~~~~~~~~~~~~

To avoid blocking indefinitely, the |shutdown| method also accepts a
``timeout`` parameter. That timeout is used when waiting for the background
tasks to complete. If the background tasks fail to complete within the given
timeout, |shutdown| will raise |RuntimeError| and leave the executor in
|STOPPING| state. The worker pool used by the executor will not have been shut
down.

Non-blocking executor shutdown
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Occasionally, it may be desirable to shut down an executor during normal
application execution, rather than at application shutdown time. In this
situation calling |shutdown| is problematic, since that method is blocking and
so will make the GUI unresponsive. Instead, users can call the non-blocking
|stop| method. This method:

* Moves the executor to |STOPPING| state.
* Requests cancellation of all waiting or executing background tasks.

Typically, the event loop will continue to run after calling the |stop| method.
Under that running event loop, all futures will eventually reach one of the
final states (|COMPLETED|, |FAILED| or |CANCELLED|). When that happens, the
system automatically:

* Shuts down the worker pool (if that worker pool is owned by the executor).
* Moves the executor to |STOPPED| state.

Once all futures reach |CANCELLED| state, an executor in |STOPPING| state moves
into |STOPPED| state. If the executor owns its worker pool, that worker pool is
shut down immediately before moving into |STOPPED| state.
If there are no waiting or executing background tasks, then |stop| goes
through all of the steps above at once, moving the executor through
the |STOPPING| state to |STOPPED| state.

It's advisable to stop the executor explicitly and wait for it to reach
|STOPPING| state before exiting an application using it.
Note that while |stop| can only be called on an executor in |RUNNING| state,
it's always legal to call |shutdown| on an executor, regardless of the current
state of that executor. In particular, calling |shutdown| after |stop| is
permissible, but calling |stop| after |shutdown| would be an error.


Using a shared worker pool
Expand All @@ -303,8 +370,10 @@ needed.
.. |traits_futures.api| replace:: :mod:`traits_futures.api`

.. |TraitsExecutor| replace:: :class:`~traits_futures.traits_executor.TraitsExecutor`
.. |shutdown| replace:: :meth:`~traits_futures.traits_executor.TraitsExecutor.shutdown`
.. |stop| replace:: :meth:`~traits_futures.traits_executor.TraitsExecutor.stop`

.. |state| replace:: :attr:`~traits_futures.traits_executor.TraitsExecutor.state`
.. |ExecutorState| replace:: :meth:`~traits_futures.executor_states.ExecutorState`
.. |RUNNING| replace:: :data:`~traits_futures.executor_states.RUNNING`
.. |STOPPING| replace:: :data:`~traits_futures.executor_states.STOPPING`
Expand Down
93 changes: 93 additions & 0 deletions traits_futures/tests/traits_executor_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import contextlib
import queue
import threading
import time

from traits.api import (
Bool,
Expand Down Expand Up @@ -62,6 +63,18 @@ def test_progress(arg1, arg2, kwd1, kwd2, progress):
return arg1, arg2, kwd1, kwd2


def slow_call(starting, stopping):
"""Target background task used to check waiting behaviour of 'shutdown'.
Parameters
----------
starting, stopping : threading.Event
"""
starting.set()
time.sleep(0.1)
stopping.set()


def wait_for_event(started, event, timeout):
"""Wait for an event, and raise if it doesn't occur within a timeout.
Expand Down Expand Up @@ -170,6 +183,68 @@ def test_stop_method_raises_unless_running(self):
with self.assertRaises(RuntimeError):
self.executor.stop()

def test_shutdown_when_already_stopping(self):
with self.long_running_task(self.executor):
self.assertEqual(self.executor.state, RUNNING)
self.executor.stop()

self.assertEqual(self.executor.state, STOPPING)
self.executor.shutdown(timeout=SAFETY_TIMEOUT)
self.assertEqual(self.executor.state, STOPPED)

def test_shutdown_does_nothing_if_stopped(self):
self.assertEqual(self.executor.state, RUNNING)
self.executor.stop()
self.wait_until_stopped(self.executor)
self.assertEqual(self.executor.state, STOPPED)
self.executor.shutdown(timeout=SAFETY_TIMEOUT)
self.assertEqual(self.executor.state, STOPPED)

def test_shutdown_cancels_running_futures(self):
future = submit_call(self.executor, pow, 3, 5)
self.executor.shutdown(timeout=SAFETY_TIMEOUT)
self.assertEqual(future.state, CANCELLING)
self.assertTrue(self.executor.stopped)

def test_no_future_updates_after_shutdown(self):
future = submit_call(self.executor, pow, 3, 5)
self.executor.shutdown(timeout=SAFETY_TIMEOUT)
self.assertEqual(future.state, CANCELLING)
self.exercise_event_loop()
self.assertEqual(future.state, CANCELLING)

def test_shutdown_goes_through_stopping_state(self):
self.executor.shutdown(timeout=SAFETY_TIMEOUT)
self.assertEqual(
self.listener.states,
[RUNNING, STOPPING, STOPPED],
)

def test_shutdown_waits_for_background_tasks(self):
starting = self._context.event()
stopping = self._context.event()
submit_call(self.executor, slow_call, starting, stopping)

# Make sure background task has started, else it might be
# cancelled altogether.
self.assertTrue(starting.wait(timeout=SAFETY_TIMEOUT))

self.executor.shutdown(timeout=SAFETY_TIMEOUT)
self.assertTrue(stopping.is_set())

def test_shutdown_timeout(self):
start_time = time.monotonic()
with self.long_running_task(self.executor):
with self.assertRaises(RuntimeError):
self.executor.shutdown(timeout=0.1)

actual_timeout = time.monotonic() - start_time
self.assertLess(actual_timeout, 1.0)
self.assertEqual(self.executor.state, STOPPING)

self.executor.shutdown(timeout=SAFETY_TIMEOUT)
self.assertEqual(self.executor.state, STOPPED)

def test_cant_submit_new_unless_running(self):
with self.long_running_task(self.executor):
self.executor.stop()
Expand Down Expand Up @@ -390,6 +465,24 @@ def target(executor, msg_queue):

# Helper methods and assertions ###########################################

def exercise_event_loop(self):
"""
Exercise the event loop.
Places a new task on the event loop and runs the event loop
until that task is complete. The goal is to flush out any other
tasks that might already be in event loop tasks queue.
"""

class Sentinel(HasStrictTraits):
#: Simple boolean flag.
flag = Bool(False)

sentinel = Sentinel()

self._event_loop_helper.setattr_soon(sentinel, "flag", True)
self.run_until(sentinel, "flag", lambda sentinel: sentinel.flag)

def wait_until_stopped(self, executor):
"""
Wait for the executor to reach STOPPED state.
Expand Down

0 comments on commit bfb5e0c

Please sign in to comment.