Skip to content

Commit

Permalink
added classes Task and Future
Browse files Browse the repository at this point in the history
  • Loading branch information
gilesknap committed Jul 18, 2016
1 parent 61c92b1 commit 8f71a26
Show file tree
Hide file tree
Showing 13 changed files with 699 additions and 19 deletions.
2 changes: 1 addition & 1 deletion docs/arch/hierarchy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ example, a position plugin might look like this::
task.put(pos.xml, xml)
self._loaded = 100

def _load_pos(self, device, positions):
def _load_pos(self, positions, device):
pos = self.child
if positions < 100 and self._loaded < device.totalSteps:
# add up to 100 more positions
Expand Down
8 changes: 6 additions & 2 deletions malcolm/compat.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
try:
from queue import Queue # noqa
# python 2
import Queue as queue # noqa
except ImportError:
from Queue import Queue # noqa
# python 3
import queue # noqa

try:
# python 2
base_string = basestring
except NameError:
# python 3
base_string = str
10 changes: 8 additions & 2 deletions malcolm/core/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,16 @@ def handle_request(self, request):
"Expected Post or Put request, received %s" % request.typeid
with self.lock:
if isinstance(request, Post):
method_name = request.endpoint[-1]
if len(request.endpoint) != 2:
raise ValueError("POST endpoint requires 2 part endpoint")
method_name = request.endpoint[1]
response = self.methods[method_name].get_response(request)
elif isinstance(request, Put):
attr_name = request.endpoint[-1]
attr_name = request.endpoint[1]
if len(request.endpoint) != 3:
raise ValueError("PUT endpoint requires 3 part endpoint")
assert request.endpoint[2] == "value", \
"Can only put to an attribute value"
self.attributes[attr_name].put(request.value)
self.attributes[attr_name].set_value(request.value)
response = Return(request.id_, request.context)
Expand Down
112 changes: 112 additions & 0 deletions malcolm/core/future.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@

# Possible future states (for internal use).
RUNNING = 'RUNNING'
# Task has set the return or exception and this future is filled
FINISHED = 'FINISHED'

_FUTURE_STATES = [
RUNNING,
FINISHED
]

class Error(Exception):
"""Base class for all future-related exceptions."""
# TODO: for review - user-defined exceptions - should we use them?
pass


class TimeoutError(Error):
"""The operation exceeded the given deadline."""
pass


class RemoteError(Error):
"""The remote operation generated an error."""
pass


class Future(object):
"""Represents the result of an asynchronous computation.
This class has a similar API to concurrent.futures.Future but this
simpler version is not thread safe"""

def __init__(self, task):
"""Initializes the future """
self._task = task
self._state = RUNNING
self._result = None
self._exception = None

def done(self):
"""Return True if the future finished executing."""
return self._state in [FINISHED]

def __get_result(self):
if self._exception:
raise RemoteError(self._exception)
else:
return self._result

def result(self, timeout=None):
"""Return the result of the call that the future represents.
Args:
timeout: The number of seconds to wait for the result if the future
isn't done. If None, then there is no limit on the wait time.
Returns:
The result of the call that the future represents.
Raises:
TimeoutError: If the future didn't finish executing before the given
timeout.
Exception: If the call raised then that exception will be raised.
"""
if self._state == FINISHED:
return self.__get_result()

self._task.wait_all(self, timeout)

return self.__get_result()

def exception(self, timeout=None):
"""Return the exception raised by the call that the future represents.
Args:
timeout: The number of seconds to wait for the exception if the
future isn't done. If None, then there is no limit on the wait
time.
Returns:
The exception raised by the call that the future represents or None
if the call completed without raising.
Raises:
TimeoutError: If the future didn't finish executing before the given
timeout.
"""

if self._state == FINISHED:
return self._exception

self._task.wait_all(self, timeout)

return self._exception

# The following methods should only be used by Task and in unit tests.

def set_result(self, result):
"""Sets the return value of work associated with the future.
Should only be used by Task and unit tests.
"""
self._result = result
self._state = FINISHED

def set_exception(self, exception):
"""Sets the result of the future as being the given exception.
Should only be used by Task and unit tests.
"""
self._exception = exception
self._state = FINISHED
8 changes: 1 addition & 7 deletions malcolm/core/hook.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
import inspect

from malcolm.core.loggable import Loggable
# from malcolm.core.task import Task


class Task(object):

def __init__(self, process):
self.process = process
from malcolm.core.task import Task


class Hook(Loggable):
Expand Down
4 changes: 2 additions & 2 deletions malcolm/core/syncfactory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from threading import Lock
from multiprocessing.pool import ThreadPool

from malcolm.compat import Queue
from malcolm.compat import queue
from malcolm.core.loggable import Loggable


Expand Down Expand Up @@ -32,7 +32,7 @@ def spawn(self, function, *args, **kwargs):

def create_queue(self):
"""Creates a new Queue object"""
return Queue()
return queue.Queue()

def create_lock(self):
"""Creates a new simple Lock object"""
Expand Down

0 comments on commit 8f71a26

Please sign in to comment.