Skip to content

Commit

Permalink
Decouple TraitsExecutor from future implementation (#396)
Browse files Browse the repository at this point in the history
* Experimental refactor

* Revert unrelated change that's been pulled out into another PR (#395)

* Replace _message trait with receive method

* To-do notes

* Have 'receive' return a boolean indicating whether a message is final

* More documentation word-smithing

* Docstring updates

* Fix an outdated comment
  • Loading branch information
mdickinson committed Jul 13, 2021
1 parent a25c8b6 commit 5957871
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 116 deletions.
36 changes: 23 additions & 13 deletions docs/source/guide/advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ Traits Futures comes with three basic background task types: background calls,
background iterations and background progress calls, created via the
|submit_call|, |submit_iteration| and |submit_progress| functions,
respectively. In each case, communication from the background task to the
corresponding foreground |IFuture| instance is implemented by sending
custom task-type-specific messages, with the type of message identified by
a suitable string. For example, the background progress task sends messages
of type ``"progress"`` to report progress, while the background iteration task
sends messages of type ``"generated"``.
corresponding foreground |IFuture| instance is implemented by sending custom
task-type-specific messages of the form ``(message_type, message_value)``,
where ``message_type`` is a suitable string describing the type of the message.
For example, the progress task sends messages of type ``"progress"`` to report
progress, while the background iteration task sends messages of type
``"generated"``.

If none of the standard task types meets your needs, it's possible to write
your own background task type, that sends whatever messages you like. This
section describes how to do this in detail.
your own background task type, that sends whatever messages you like. Two base
classes, |BaseFuture| and |BaseTask|, are made available to make this easier.
This section describes how to do this in detail.

To create your own task type, you'll need three ingredients:

Expand All @@ -38,6 +40,9 @@ To create your own task type, you'll need three ingredients:
|ITaskSpecification|, and interrogates that instance to get the background
callable and the corresponding foreground future.

You may optionally also want to create a convenience function analogous to the
existing |submit_call|, |submit_iteration| and |submit_progress| functions.

Below we give a worked example that demonstrates how to create each of these
ingredients for a simple case.

Expand All @@ -54,11 +59,11 @@ is accompanied by the corresponding number.
Message types
~~~~~~~~~~~~~

In general, the message sent from the background task to the future can be any
In general, each message sent from the background task to the future can be any
Python object, and the future can interpret the sent object in any way that it
likes. However, the |BaseFuture| base class that we'll use below provides a
default dispatcher for messages, and that dispatcher expects those messages to
have the form ``(message_type, message_args)``. Here the message type should be
likes. However, the |BaseFuture| and |BaseTask| convenience base classes that
we'll use below provide helper functions to handle and dispatch messages of
the form ``(message_type, message_args)``. Here the message type should be
a string that's valid as a Python identifier, while the message argument can be
any Python object (though it should usually be pickleable and immutable).

Expand All @@ -76,8 +81,12 @@ Next, we define the callable that will be run in the background. This callable
must accept two arguments (which will be passed by position): ``send`` and
``cancelled``. The ``send`` object is a callable which will be used to send
messages to the foreground. The ``cancelled`` object is a zero-argument
callable which can be used to check for cancellation requests. Here's the
``fizz_buzz`` callable.
callable which can be used to check for cancellation requests. For convenience,
we inherit from |BaseTask|, which takes care of sending standard messages
to the future letting the future know that the background task has started,
stopped, or raised an exception.

Here's the ``fizz_buzz`` callable.

.. literalinclude:: examples/fizz_buzz_task.py
:start-after: start fizz_buzz
Expand Down Expand Up @@ -172,6 +181,7 @@ of the new background task type:
substitutions
.. |BaseFuture| replace:: :class:`~.BaseFuture`
.. |BaseTask| replace:: :class:`~.BaseTask`
.. |exception| replace:: :attr:`~traits_futures.i_future.IFuture.exception`
.. |HasStrictTraits| replace:: :class:`~traits.has_traits.HasStrictTraits`
.. |IFuture| replace:: :class:`~.IFuture`
Expand Down
41 changes: 23 additions & 18 deletions docs/source/guide/examples/fizz_buzz_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@
# -- start fizz_buzz --
import time

from traits_futures.api import BaseTask

def fizz_buzz(send, cancelled):

class FizzBuzzTask(BaseTask):
"""
Count slowly from 1, sending FIZZ / BUZZ messages to the foreground.
Background task for Fizz Buzz
Counts slowly from 1, sending FIZZ / BUZZ messages to the foreground.
Parameters
----------
Expand All @@ -46,21 +50,22 @@ def fizz_buzz(send, cancelled):
returns ``True`` if cancellation has been requested, and ``False``
otherwise.
"""
n = 1
while not cancelled():

n_is_multiple_of_3 = n % 3 == 0
n_is_multiple_of_5 = n % 5 == 0

if n_is_multiple_of_3 and n_is_multiple_of_5:
send((FIZZ_BUZZ, n))
elif n_is_multiple_of_3:
send((FIZZ, n))
elif n_is_multiple_of_5:
send((BUZZ, n))

time.sleep(1.0)
n += 1
def run(self, send, cancelled):
n = 1
while not cancelled():

n_is_multiple_of_3 = n % 3 == 0
n_is_multiple_of_5 = n % 5 == 0

if n_is_multiple_of_3 and n_is_multiple_of_5:
send((FIZZ_BUZZ, n))
elif n_is_multiple_of_3:
send((FIZZ, n))
elif n_is_multiple_of_5:
send((BUZZ, n))

time.sleep(1.0)
n += 1
# -- end fizz_buzz --


Expand Down Expand Up @@ -132,7 +137,7 @@ def background_task(self):
callable can use ``send`` to send messages and ``cancelled`` to
check whether cancellation has been requested.
"""
return fizz_buzz
return FizzBuzzTask()
# -- end BackgroundFizzBuzz


Expand Down
4 changes: 3 additions & 1 deletion traits_futures/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
-----------------------------------------
- :class:`~.BaseFuture`
- :class:`~.BaseTask`
- :class:`~.ITaskSpecification`
Parallelism contexts
Expand Down Expand Up @@ -89,7 +90,7 @@
ProgressFuture,
submit_progress,
)
from traits_futures.base_future import BaseFuture
from traits_futures.base_future import BaseFuture, BaseTask
from traits_futures.ets_event_loop import ETSEventLoop
from traits_futures.executor_states import (
ExecutorState,
Expand Down Expand Up @@ -142,6 +143,7 @@
"submit_progress",
# Support for creating new task types
"BaseFuture",
"BaseTask",
"ITaskSpecification",
# Contexts
"IParallelContext",
Expand Down
6 changes: 3 additions & 3 deletions traits_futures/background_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
"""
from traits.api import Callable, Dict, HasStrictTraits, Str, Tuple

from traits_futures.base_future import BaseFuture
from traits_futures.base_future import BaseFuture, BaseTask
from traits_futures.i_task_specification import ITaskSpecification


class CallTask:
class CallTask(BaseTask):
"""
Wrapper around the actual callable to be run. This wrapper provides the
task that will be submitted to the concurrent.futures executor
Expand All @@ -28,7 +28,7 @@ def __init__(self, callable, args, kwargs):
self.args = args
self.kwargs = kwargs

def __call__(self, send, cancelled):
def run(self, send, cancelled):
return self.callable(*self.args, **self.kwargs)


Expand Down
6 changes: 3 additions & 3 deletions traits_futures/background_iteration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@

from traits.api import Callable, Dict, Event, HasStrictTraits, Str, Tuple

from traits_futures.base_future import BaseFuture
from traits_futures.base_future import BaseFuture, BaseTask
from traits_futures.i_task_specification import ITaskSpecification

#: Message sent whenever the iteration yields a result.
#: The message argument is the result generated.
GENERATED = "generated"


class IterationTask:
class IterationTask(BaseTask):
"""
Iteration to be executed in the background.
"""
Expand All @@ -32,7 +32,7 @@ def __init__(self, callable, args, kwargs):
self.args = args
self.kwargs = kwargs

def __call__(self, send, cancelled):
def run(self, send, cancelled):
iterable = iter(self.callable(*self.args, **self.kwargs))

while True:
Expand Down
6 changes: 3 additions & 3 deletions traits_futures/background_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from traits.api import Callable, Dict, Event, HasStrictTraits, Str, Tuple

from traits_futures.base_future import BaseFuture
from traits_futures.base_future import BaseFuture, BaseTask
from traits_futures.i_task_specification import ITaskSpecification

# Message types for messages from ProgressTask
Expand Down Expand Up @@ -73,7 +73,7 @@ def report(self, progress_info):
self.send((PROGRESS, progress_info))


class ProgressTask:
class ProgressTask(BaseTask):
"""
Background portion of a progress background task.
Expand All @@ -86,7 +86,7 @@ def __init__(self, callable, args, kwargs):
self.args = args
self.kwargs = kwargs

def __call__(self, send, cancelled):
def run(self, send, cancelled):
progress = ProgressReporter(send=send, cancelled=cancelled)
try:
return self.callable(
Expand Down
106 changes: 106 additions & 0 deletions traits_futures/base_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Tuple,
)

from traits_futures.exception_handling import marshal_exception
from traits_futures.future_states import (
CANCELLABLE_STATES,
CANCELLED,
Expand All @@ -38,6 +39,32 @@
)
from traits_futures.i_future import IFuture

# Messages sent by the BaseTask, and interpreted by BaseFuture.

#: Custom message from the future. The argument is a pair
#: (message_type, message_args); the message type and message args
#: are interpreted by the future.
SENT = "sent"

#: Control message sent when the callable is abandoned before execution.
ABANDONED = "abandoned"

#: Control message sent before we start to process the target callable.
#: The argument is always ``None``.
STARTED = "started"

#: Control message sent when an exception was raised by the background
#: callable. The argument is a tuple containing exception information.
RAISED = "raised"

#: Control message sent to indicate that the background callable succeeded
#: and returned a result. The argument is that result.
RETURNED = "returned"

#: Message types that indicate a "final" message. After a message of this
#: type is received, no more messages will be received.
FINAL_MESSAGES = {ABANDONED, RAISED, RETURNED}

# The BaseFuture class maintains an internal state. That internal state maps to
# the user-facing state, but is more fine-grained, allowing the class to keep
# track of the internal consistency and invariants. For example, the
Expand Down Expand Up @@ -214,6 +241,29 @@ def cancel(self):
)
self._user_cancelled()

def receive(self, message):
"""
Receive and process a message from the task associated to this future.
This method is primarily for use by the executors, but may also be of
use in testing.
Parameters
----------
message : object
The message received from the associated task.
Returns
-------
final : bool
True if the received message should be the last one ever received
from the paired task.
"""
message_type, message_arg = message
method_name = "_task_{}".format(message_type)
getattr(self, method_name)(message_arg)
return message_type in FINAL_MESSAGES

# Semi-private methods ####################################################

# These methods represent the state transitions in response to external
Expand Down Expand Up @@ -460,3 +510,59 @@ def _update_property_traits(self, event):
new_done = new_internal_state in _DONE_INTERNAL_STATES
if old_done != new_done:
self.trait_property_changed("done", old_done, new_done)


class BaseTask:
"""
Mixin for background task classes, making those classes callable.
This class provides a callable wrapper allowing subclasses to easily
provide a background callable task.
Subclasses should override the ``run`` method to customize what should
happen when the task runs. This class's ``__call__`` implementation will
take care of sending standard control messages telling the future that the
task has started, completed, or raised, and delegate to the ``run`` method
for execution of the background task and sending of any custom messages.
"""

def run(send, cancelled):
"""
Run the body of the background task.
Parameters
----------
send : callable
single-argument callable used to send a message to the
associated future. It takes the message to be sent, and returns
no useful value.
cancelled : callable
zero-argument callable that can be used to check whether
cancellation has been requested for this task. Returns ``True``
if cancellation has been requested, else ``False``.
Returns
-------
any : object
May return any object. That object will be delivered to the
future's ``result`` attribute.
"""
raise NotImplementedError(
"This method should be implemented by subclasses."
)

def __call__(self, send, cancelled):
if cancelled():
send((ABANDONED, None))
return

send((STARTED, None))
try:
result = self.run(
lambda message: send((SENT, message)),
cancelled,
)
except BaseException as e:
send((RAISED, marshal_exception(e)))
else:
send((RETURNED, result))
20 changes: 20 additions & 0 deletions traits_futures/i_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,23 @@ def cancel(self):
If the task has already completed or cancellation has already
been requested.
"""

@abc.abstractmethod
def receive(self, message):
"""
Receive and process a message from the task associated to this future.
This method is primarily for use by the executors, but may also be of
use in testing.
Parameters
----------
message : object
The message received from the associated task.
Returns
-------
final : bool
True if the received message should be the last one ever received
from the paired task.
"""

0 comments on commit 5957871

Please sign in to comment.