Skip to content

Commit

Permalink
Merge pull request #124 from dls-controls/task_changes
Browse files Browse the repository at this point in the history
Task changes
  • Loading branch information
coretl committed Aug 15, 2016
2 parents 34b6111 + 1f18ccd commit 630ed94
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 159 deletions.
4 changes: 2 additions & 2 deletions malcolm/controllers/defaultcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class DefaultController(Controller):
def disable(self):
try:
self.transition(sm.DISABLING, "Disabling")
self.Disabling.run(self)
self.run_hook(self.Disabling)
self.transition(sm.DISABLED, "Done Disabling")
except Exception as e: # pylint:disable=broad-except
self.log_exception("Fault occurred while Disabling")
Expand All @@ -27,7 +27,7 @@ def disable(self):
def reset(self):
try:
self.transition(sm.RESETTING, "Resetting")
self.Resetting.run(self)
self.run_hook(self.Resetting)
self.transition(sm.AFTER_RESETTING, "Done Resetting")
except Exception as e: # pylint:disable=broad-except
self.log_exception("Fault occurred while Resetting")
Expand Down
94 changes: 85 additions & 9 deletions malcolm/core/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
from malcolm.core.attribute import Attribute
from malcolm.core.block import Block
from malcolm.core.blockmeta import BlockMeta
from malcolm.core.hook import Hook
from malcolm.core.hook import Hook, get_hook_decorated
from malcolm.core.loggable import Loggable
from malcolm.core.methodmeta import method_takes, method_only_in, MethodMeta, \
from malcolm.core.methodmeta import method_takes, MethodMeta, \
get_method_decorated
from malcolm.core.request import Post
from malcolm.core.statemachine import DefaultStateMachine
from malcolm.core.task import Task
from malcolm.core.vmetas import BooleanMeta, ChoiceMeta, StringMeta
from malcolm.core.request import Post


sm = DefaultStateMachine
Expand Down Expand Up @@ -44,18 +45,38 @@ def __init__(self, block_name, process, parts=None, params=None):
# dictionary of dictionaries
# {state (str): {MethodMeta: writeable (bool)}
self.methods_writeable = {}
if parts is None:
parts = {}
self.parts = parts
for part_name, part in self.parts.items():
part.set_logger_name("%s.%s" % (controller_name, part_name))

# dict {hook: name}
self.hook_names = self._find_hooks()
self.parts = self._setup_parts(parts, controller_name)
self._set_block_children()
self._do_transition(sm.DISABLED, "Disabled")
self.block.set_parent(process, block_name)
process.add_block(self.block)
self.do_initial_reset()

def _find_hooks(self):
hook_names = {}
for n in dir(self):
attr = getattr(self, n)
if isinstance(attr, Hook):
assert attr not in hook_names, \
"Hook %s already in controller as %s" % (
n, hook_names[attr])
hook_names[attr] = n
return hook_names

def _setup_parts(self, parts, controller_name):
if parts is None:
parts = {}
for part_name, part in parts.items():
part.set_logger_name("%s.%s" % (controller_name, part_name))
# Check part hooks into one of our hooks
for func_name, part_hook, _ in get_hook_decorated(part):
assert part_hook in self.hook_names, \
"Part %s func %s not hooked into %s" % (
part, func_name, self)
return parts

def do_initial_reset(self):
request = Post(
None, self.process.create_queue(), [self.block_name, "reset"])
Expand Down Expand Up @@ -194,3 +215,58 @@ def register_method_writeable(self, method, states):
is_writeable = state in states
writeable_dict[method] = is_writeable

def create_part_tasks(self):
part_tasks = {}
for part_name, part in self.parts.items():
part_tasks[part] = Task("Task(%s)" % part_name, self.process)
return part_tasks

def run_hook(self, hook, part_tasks=None):
assert hook in self.hook_names, \
"Hook %s doesn't appear in controller hooks %s" % (
hook, self.hook_names)
self.log_debug("Running %s hook", self.hook_names[hook])

if part_tasks is None:
part_tasks = self.create_part_tasks()

# ask the hook to find the functions it should run
func_tasks = hook.find_func_tasks(part_tasks)

# now start them off
hook_queue = self.process.create_queue()

def _gather_task_return_value(func, task):
try:
result = func(task)
except Exception as e: # pylint:disable=broad-except
hook_queue.put((func, e))
else:
hook_queue.put((func, result))

for func, task in func_tasks.items():
task.define_spawn_function(_gather_task_return_value, func, task)
task.start()

# Create the reverse dictionary so we know where to store the results
task_part_names = {}
for part_name, part in self.parts.items():
if part in part_tasks:
task_part_names[part_tasks[part]] = part_name

# Wait for them all to finish
return_dict = {}
while func_tasks:
func, ret = hook_queue.get()
task = func_tasks.pop(func)
part_name = task_part_names[task]
return_dict[part_name] = ret
if isinstance(ret, Exception):
# Stop all other tasks
for task in func_tasks.values():
task.stop()
for task in func_tasks.values():
task.wait()
raise

return return_dict
68 changes: 14 additions & 54 deletions malcolm/core/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,61 +19,21 @@ def __call__(self, func):
func.Hook = self
return func

def run(self, controller):
"""
Run all relevant functions for this Hook
Args:
controller(Controller): Controller who's parts' functions will be run
"""

names = [n for n in dir(controller) if getattr(controller, n) is self]
assert len(names) > 0, \
"Hook is not in controller"
assert len(names) == 1, \
"Hook appears in controller multiple times as %s" % names

task_queue = controller.process.create_queue()

spawned_list = []
active_tasks = []
for pname, part in controller.parts.items():
members = [value[1] for value in
inspect.getmembers(part, predicate=inspect.ismethod)]
def find_func_tasks(self, part_tasks):
func_tasks = {}

for function in members:
if hasattr(function, "Hook") and function.Hook == self:
task = Task("%s.%s" % (names[0], pname), controller.process)
spawned_list.append(controller.process.spawn(
self._run_func, task_queue, function, task))
active_tasks.append(task)
# Filter part tasks so that we only run the ones hooked to us
for part, task in part_tasks.items():
for func_name, part_hook, func in get_hook_decorated(part):
if part_hook is self:
assert func not in func_tasks, \
"Function %s is second defined for a hook" % func_name
func_tasks[func] = task

while active_tasks:
task, response = task_queue.get()
active_tasks.remove(task)
return func_tasks

if isinstance(response, Exception):
for task in active_tasks:
task.stop()
for spawned in spawned_list:
spawned.wait()

raise response

@staticmethod
def _run_func(q, func, task):
"""
Run a function and place the response or exception back on the queue
Args:
q(Queue): Queue to place response/exception raised on
func: Function to run
task(Task): Task to run function with
"""

try:
result = func(task)
except Exception as e: # pylint:disable=broad-except
q.put((task, e))
else:
q.put((task, result))
def get_hook_decorated(part):
for name, member in inspect.getmembers(part, inspect.ismethod):
if hasattr(member, "Hook"):
yield name, member.Hook, member
11 changes: 6 additions & 5 deletions malcolm/core/spawnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ def start(self, process=None):
if process is None:
process = self.process
self._initialize()
for (func, _) in self._spawn_functions:
self._spawned.append(process.spawn(func))
for (func, args, _) in self._spawn_functions:
assert func is not None, "Spawned function is None"
self._spawned.append(process.spawn(func, *args))

def stop(self):
"""Call registered stop functions"""

self._initialize()
for (_, stop_func) in reversed(self._spawn_functions):
for (_, _, stop_func) in reversed(self._spawn_functions):
if stop_func is not None:
stop_func()

Expand All @@ -42,15 +43,15 @@ def wait(self, timeout=None):
spawned.wait(timeout=timeout)
self._spawned = []

def add_spawn_function(self, func, stop_func=None):
def add_spawn_function(self, func, stop_func=None, *args):
"""Register functions to be triggered by self.start and self.stop
Args:
func: function to be spawned
stop_func: function to halt the spawned function (default None)
"""
self._initialize()
self._spawn_functions.append((func, stop_func))
self._spawn_functions.append((func, args, stop_func))

def make_default_stop_func(self, q):
"""Convenience function for creating a default stop function that puts
Expand Down
21 changes: 13 additions & 8 deletions malcolm/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
from malcolm.core.methodmeta import MethodMeta
from malcolm.core.request import Subscribe, Unsubscribe, Post, Put
from malcolm.core.response import Error, Return, Update
from malcolm.core.spawnable import Spawnable


class Task(Loggable):
class Task(Loggable, Spawnable):
"""Provides a mechanism for executing commands and setting or monitoring
attributes on blocks. Note that queue handling is executed in the
caller's thread (by calling wait_all). Hence this module is not
thread safe"""
# TODO: when Task object is destroyed we need to cancel all subscriptions
# Sentinel object that when received stops the recv_loop
TASK_STOP = object()

def __init__(self, name, process):
self.set_logger_name(name)
Expand All @@ -23,6 +22,8 @@ def __init__(self, name, process):
self._next_id = 0
self._futures = {} # dict {int id: Future}
self._subscriptions = {} # dict {int id: (endpoint, func, args)}
# For testing, make it so start() will raise, but stop() works
self.define_spawn_function(None)

def _save_future(self, future):
""" stores the future with unique id"""
Expand Down Expand Up @@ -177,10 +178,6 @@ def unsubscribe(self, id_):
request.set_id(id_)
self.process.q.put(request)

def stop(self):
"""Puts an abort on the queue"""
self.q.put(Task.TASK_STOP)

def wait_all(self, futures, timeout=None):
"""services all futures until the list 'futures' are all done
then returns. Calls relevant subscription callbacks as they
Expand All @@ -200,7 +197,7 @@ def wait_all(self, futures, timeout=None):
self.log_debug("wait_all awaiting response ...")
response = self.q.get(True, timeout)
self.log_debug("wait_all received response %s", response)
if response is Task.TASK_STOP:
if response is Spawnable.STOP:
raise RuntimeWarning("Task aborted")
elif response.id in self._futures:
f = self._update_future(response)
Expand Down Expand Up @@ -250,3 +247,11 @@ def _invoke_callback(self, response):
raise ValueError(
"Subscription received unexpected response: %s" % response)
return ret_val

def define_spawn_function(self, func, *args):
self._initialize()
if len(self._spawned) > 0:
raise AssertionError("Spawned functions are still running")
self._spawn_functions = []
self.add_spawn_function(
func, self.make_default_stop_func(self.q), *args)
4 changes: 4 additions & 0 deletions malcolm/imalcolm.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ def main():
or
self.gui(self.make_client("counter"))
or
self.process.process_block.blocks
"""
try:
import IPython
Expand Down
5 changes: 3 additions & 2 deletions tests/test_controllers/test_defaultcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def test_init(self):
)

self.assertEqual(expected, self.c.methods_writeable)
self.assertEqual(self.c.hook_names, {
self.c.Resetting: "Resetting", self.c.Disabling: "Disabling"})

def test_transition(self):
self.c.reset()
Expand All @@ -80,8 +82,7 @@ def test_transition_raises(self):
self.c.transition("Configuring", "Attempting to configure scan...")

def test_reset_fault(self):
self.c.Resetting = MagicMock()
self.c.Resetting.run.side_effect = ValueError("boom")
self.c.run_hook = MagicMock(side_effect = ValueError("boom"))
with self.assertRaises(ValueError):
self.c.reset()
self.b["busy"].set_value.assert_has_calls(
Expand Down
8 changes: 8 additions & 0 deletions tests/test_core/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,13 @@ def test_set_writeable_methods(self):
self.c.register_method_writeable(m, "Ready")
self.assertEqual(self.c.methods_writeable['Ready'][m], True)

def test_run_hook(self):
# TODO: write this
pass

def test_run_hook_raises(self):
# TODO: write this
pass

if __name__ == "__main__":
unittest.main(verbosity=2)

0 comments on commit 630ed94

Please sign in to comment.