Skip to content

Commit

Permalink
Merge pull request #77 from dls-controls/process_block
Browse files Browse the repository at this point in the history
Process block
  • Loading branch information
c-mita committed Jul 1, 2016
2 parents dba6024 + 2518a3e commit 82aa6a7
Show file tree
Hide file tree
Showing 14 changed files with 411 additions and 203 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ testpublish:
$(PYTHON) setup.py register -r https://testpypi.python.org/pypi sdist upload -r https://testpypi.python.org/pypi

test:
$(PYTHON) setup.py test
PYTHONPATH=../scanpointgenerator $(PYTHON) setup.py test

docs/html/index.html: $(wildcard docs/*.rst docs/*/*.rst docs/conf.py)
sphinx-build -b html docs docs/html
Expand Down
77 changes: 48 additions & 29 deletions malcolm/controllers/clientcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,64 @@
from malcolm.core.controller import Controller
from malcolm.core.request import Request
from malcolm.core.method import Method
from malcolm.core.serializable import Serializable


class ClientController(Controller):
"""Sync a local block with a given remote block"""
REMOTE_BLOCKS_ID = 0
BLOCK_ID = 1

def __init__(self, process, block, client_comms):
def __init__(self, process, block):
"""
Args:
process (Process): The process this should run under
block (Block): The local block we should be controlling
client_comms (ClientComms): Should be already connected to a server
hosting the remote block
"""
self.q = process.create_queue()
self.client_comms = client_comms
# Call this last as it calls create_methods
super(ClientController, self).__init__(block=block)
self.process = process
request = Request.Subscribe(
None, self, [process.name, "remoteBlocks", "value"])
request.set_id(self.REMOTE_BLOCKS_ID)
self.process.q.put(request)

def create_methods(self):
"""Get methods from remote block and mirror them internally"""
request = Request.Get(None, self.q, [self.block.name])
self.client_comms.q.put(request)
self.log_debug("Waiting for response to Get %s", self.block.name)
response = self.q.get()
# Find all the methods
for aname, amap in response.value.items():
# TODO: If it has "takes" it's a method, flaky...
if "takes" in amap:
yield self.wrap_method(aname, amap)
def put(self, response):
"""We don't have a queue as no thread to service, but act like one"""
if response.id_ == self.REMOTE_BLOCKS_ID:
if response.value and self.block.name in response.value:
# process knows how to get to a block
self._subscribe_to_block(self.block.name)
elif response.id_ == self.BLOCK_ID:
with self.block.lock:
self.log_debug(response)
for change in response.changes:
if change[0] == []:
# update root
self._regenerate_block(change[1])
else:
# just pass it to the block to handle
self.block.update(change)

def wrap_method(self, method_name, method_map):
"""Take the serialized method map and create a Method from it
def _regenerate_block(self, d):
children = []
for k, v in d.items():
if k == "typeid":
continue
child = Serializable.from_dict(k, v)
children.append(child)
if isinstance(child, Method):
# calling method forwards to server
child.set_function(
functools.partial(self.call_server_method, k))
self.block.replace_children(children)

Args:
method_map (dict): Serialized Method
"""
method = Method.from_dict(method_name, method_map)
method.set_function(
functools.partial(self.call_server_method, method_name))
self.log_debug("Wrapping method %s", method_name)
return method
def _subscribe_to_block(self, block_name):
self.client_comms = self.process.get_client_comms(block_name)
assert self.client_comms, \
"Process doesn't know about block %s" % block_name
request = Request.Subscribe(None, self, [block_name], delta=True)
request.set_id(self.BLOCK_ID)
self.client_comms.q.put(request)

def call_server_method(self, method_name, parameters, returns):
"""Call method_name on the server
Expand All @@ -54,10 +71,12 @@ def call_server_method(self, method_name, parameters, returns):
returns (Map): Returns map to fill and return
"""
self.log_debug(dict(parameters))
request = Request.Post(None, self.q,
q = self.process.create_queue()
request = Request.Post(None, q,
[self.block.name, method_name], parameters)
self.client_comms.q.put(request)
response = self.q.get()
with self.block.lock_released():
response = q.get()
assert response.type_ == response.RETURN, \
"Expected Return, got %s" % response.type_
returns.update(response.value)
Expand Down
107 changes: 66 additions & 41 deletions malcolm/core/block.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
from collections import OrderedDict
from contextlib import contextmanager

from malcolm.core.serializable import Serializable
from malcolm.core.request import Request
from malcolm.core.response import Response
from malcolm.core.attribute import Attribute
from malcolm.core.method import Method

@contextmanager
def dummy_lock():
yield
class DummyLock(object):

def acquire(self):
pass

def release(self):
pass

def __enter__(self):
self.acquire()

def __exit__(self, type, value, traceback):
self.release()
return False

class LockRelease(object):
def __init__(self, lock):
self.lock = lock

def __enter__(self):
self.lock.__exit__(None, None, None)
self.lock.release()

def __exit__(self, type, value, traceback):
self.lock.__enter__()
self.lock.acquire()
return False

@Serializable.register("malcolm:core/Block:1.0")
Expand All @@ -33,34 +43,40 @@ def __init__(self, name):
"""
super(Block, self).__init__(name=name)
self.name = name
self._methods = OrderedDict()
self._attributes = OrderedDict()
self.lock = dummy_lock()
self.methods = OrderedDict()
self.attributes = OrderedDict()
self.lock = DummyLock()

def add_attribute(self, attribute):
def add_attribute(self, attribute, notify=True):
"""Add an Attribute to the block and set the block as its parent"""
self.add_child(attribute, self.attributes)
self.on_changed([[attribute.name], attribute.to_dict()], notify)

assert attribute.name not in self._attributes, \
"Attribute %s already defined for Block %s" \
% (attribute.name, self.name)
self._attributes[attribute.name] = attribute
attribute.set_parent(self)
setattr(self, attribute.name, attribute)
self.on_changed([[attribute.name], attribute.to_dict()])
self.notify_subscribers()

def add_method(self, method):
def add_method(self, method, notify=True):
"""Add a Method to the Block
Args:
method (Method): The Method object that has already been filled in
"""
assert method.name not in self._methods, \
"Method %s already defined for Block %s" % (method.name, self.name)
self._methods[method.name] = method
setattr(self, method.name, method)
self.on_changed([[method.name], method.to_dict()])
self.notify_subscribers()
self.add_child(method, self.methods)
self.on_changed([[method.name], method.to_dict()], notify)

def add_child(self, attribute_or_method, d):
"""Add an Attribute or Method to the block and set the block as its
parent, but don't notify"""
child_name = attribute_or_method.name
assert not hasattr(self, child_name), \
"Attribute or Method %s already defined for Block %s" \
% (child_name, self.name)
setattr(self, child_name, attribute_or_method)
d[child_name] = attribute_or_method
attribute_or_method.set_parent(self)

def _where_child_stored(self, child):
if isinstance(child, Method):
return self.methods
elif isinstance(child, Attribute):
return self.attributes

def update(self, change):
"""Update block given a single change.
Expand All @@ -78,16 +94,25 @@ def update(self, change):
# sub-structure does not exist - create and add
if len(change[0]) > 1:
raise ValueError("Missing substructure at %s" % name)
obj = Serializable.from_dict(name, change[1])
if isinstance(obj, Method):
self.add_method(obj)
elif isinstance(obj, Attribute):
self.add_attribute(obj)
else:
raise ValueError(
"Change %s deserialized to unknown object %s"
% (change, obj))

child = Serializable.from_dict(name, change[1])
d = self._where_child_stored(child)
assert d is not None, \
"Change %s deserialized to unknown object %s" % (change, child)
self.add_child(child, d)

def replace_children(self, children, notify=True):
for method_name in self.methods:
delattr(self, method_name)
self.methods.clear()
for attr_name in self.attributes:
delattr(self, attr_name)
self.attributes.clear()
for child in children:
d = self._where_child_stored(child)
assert d is not None, \
"Don't know how to add a child %s" % child
self.add_child(child, d)
self.on_changed([[], self.to_dict()], notify)

def notify_subscribers(self):
if self.parent is not None:
Expand All @@ -106,11 +131,11 @@ def handle_request(self, request):
with self.lock:
if request.type_ == Request.POST:
method_name = request.endpoint[-1]
response = self._methods[method_name].get_response(request)
response = self.methods[method_name].get_response(request)
elif request.type_ == Request.PUT:
attr_name = request.endpoint[-1]
self._attributes[attr_name].put(request.value)
self._attributes[attr_name].set_value(request.value)
self.attributes[attr_name].put(request.value)
self.attributes[attr_name].set_value(request.value)
response = Response.Return(request.id_, request.context)
self.parent.block_respond(response, request.response_queue)

Expand All @@ -120,9 +145,9 @@ def to_dict(self):
d = OrderedDict()

d["typeid"] = self.typeid
for attribute_name, attribute in self._attributes.items():
for attribute_name, attribute in self.attributes.items():
d[attribute_name] = attribute.to_dict()
for method_name, method in self._methods.items():
for method_name, method in self.methods.items():
d[method_name] = method.to_dict()


Expand Down
13 changes: 10 additions & 3 deletions malcolm/core/clientcomms.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
class ClientComms(Loggable, Spawnable):
"""Abstract class for dispatching requests to a server and resonses to
a method"""
# The id that will be use for subscriptions to the blocks the server has
SERVER_BLOCKS_ID=0

def __init__(self, name, process):
super(ClientComms, self).__init__(logger_name=name)
Expand All @@ -24,7 +26,7 @@ def send_loop(self):
if request is Spawnable.STOP:
break
try:
request.id_ = self._current_id
request.set_id(self._current_id)
self._current_id += 1

# TODO: Move request store into new method?
Expand All @@ -44,5 +46,10 @@ def send_to_server(self, request):
"Abstract method that must be implemented by deriving class")

def send_to_caller(self, response):
request = self.requests[response.id_]
request.response_queue.put(response)
if response.id_ == self.SERVER_BLOCKS_ID:
assert response.type_ == response.UPDATE, \
"Expected server blocks Update, got %s" % response.type_
self.process.update_block_list(self, response.value)
else:
request = self.requests[response.id_]
request.response_queue.put(response)

0 comments on commit 82aa6a7

Please sign in to comment.