Skip to content

Commit

Permalink
Merge pull request #46 from dls-controls/subscribing
Browse files Browse the repository at this point in the history
Subscribing
  • Loading branch information
coretl committed Jun 14, 2016
2 parents ad037fe + b4025fa commit 7880ad2
Show file tree
Hide file tree
Showing 11 changed files with 480 additions and 17 deletions.
14 changes: 11 additions & 3 deletions malcolm/core/block.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from collections import OrderedDict

from malcolm.core.loggable import Loggable
from malcolm.core.monitorable import Monitorable


class Block(Loggable):
class Block(Monitorable):
"""Object consisting of a number of Attributes and Methods"""

def __init__(self, name):
"""
Args:
name (str): Block name e.g. "BL18I:ZEBRA1"
"""
super(Block, self).__init__(logger_name=name)
super(Block, self).__init__(name=name)
self.name = name
self._methods = OrderedDict()
self._attributes = OrderedDict()
Expand All @@ -25,6 +25,8 @@ def add_attribute(self, attribute):
self._attributes[attribute.name] = attribute
attribute.set_parent(self)
setattr(self, attribute.name, attribute)
self.on_changed([[[attribute.name], attribute.to_dict()]])
self.notify_subscribers()

def add_method(self, method):
"""Add a Method to the Block
Expand All @@ -36,6 +38,12 @@ def add_method(self, method):
"Method %s already defined for Block %s" % (method.name, self.name)
self._methods[method.name] = method
setattr(self, method.name, method)
self.on_changed([[[method.name], method.to_dict()]])
self.notify_subscribers()

def notify_subscribers(self):
if self.parent is not None:
self.parent.notify_subscribers(self.name)

def handle_request(self, request):
"""
Expand Down
112 changes: 104 additions & 8 deletions malcolm/core/process.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from collections import OrderedDict
from collections import OrderedDict, namedtuple

from malcolm.core.loggable import Loggable
from malcolm.core.request import Request
from malcolm.core.response import Response


# Sentinel object that when received stops the recv_loop
PROCESS_STOP = object()

# Internal update messages
BlockNotify = namedtuple("BlockNotify", "name")
BlockChanged = namedtuple("BlockChanged", "changes")
BlockNotify.type_ = "BlockNotify"
BlockChanged.type_ = "BlockChanged"


class Process(Loggable):
"""Hosts a number of Blocks, distributing requests between them"""
Expand All @@ -15,10 +23,20 @@ def __init__(self, name, sync_factory):
self.name = name
self.sync_factory = sync_factory
self.q = self.create_queue()
# map block name -> block object
self._blocks = OrderedDict()
self._blocks = OrderedDict() # block name -> block
self._block_state_cache = OrderedDict()
self._recv_spawned = None
self._other_spawned = []
self._subscriptions = OrderedDict() # block name -> list of subs
self._last_changes = OrderedDict() # block name -> list of changes
self._handle_functions = {
Request.POST: self._forward_block_request,
Request.PUT: self._forward_block_request,
Request.GET: self._handle_get,
Request.SUBSCRIBE: self._handle_subscribe,
BlockNotify.type_: self._handle_block_notify,
BlockChanged.type_: self._handle_block_changed
}

def recv_loop(self):
"""Service self.q, distributing the requests to the right block"""
Expand All @@ -29,11 +47,9 @@ def recv_loop(self):
# Got the sentinel, stop immediately
break
try:
self.handle_request(request)
self._handle_functions[request.type_](request)
except Exception:
# TODO: request.respond_with_error()
self.log_exception("Exception while handling %s",
request.to_dict())
self.log_exception("Exception while handling %s", request)

def start(self):
"""Start the process going"""
Expand All @@ -54,7 +70,7 @@ def stop(self, timeout=None):
for s in self._other_spawned:
s.wait(timeout=timeout)

def handle_request(self, request):
def _forward_block_request(self, request):
"""Lookup target Block and spawn block.handle_request(request)
Args:
Expand All @@ -74,6 +90,8 @@ def add_block(self, block):
assert block.name not in self._blocks, \
"There is already a block called %s" % block.name
self._blocks[block.name] = block
self._block_state_cache[block.name] = block.to_dict()
block.parent = self

def create_queue(self):
"""
Expand All @@ -91,3 +109,81 @@ def spawn(self, function, *args, **kwargs):
self._other_spawned.append(spawned)
return spawned

def _handle_block_notify(self, request):
"""Update subscribers with changes and applies stored changes to the
cached structure"""
# update cached dict
for path, value in self._last_changes.setdefault(request.name, []):
d = self._block_state_cache
for p in path[:-1]:
d = d[p]
d[path[-1]] = value

for subscription in self._subscriptions.setdefault(request.name, []):
endpoint = subscription.endpoint
# find stuff that's changed that is relevant to this subscriber
changes = []
for change_path, change_value in self._last_changes[request.name]:
# look for a change_path where the beginning matches the
# endpoint path, then strip away the matching part and add
# to the change set
i = 0
for (cp_element, ep_element) in zip(change_path, endpoint):
if cp_element != ep_element:
break
i += 1
else:
# change has matching path, so keep it
# but strip off the end point path
filtered_change = [change_path[i:], change_value]
changes.append(filtered_change)
if len(changes) > 0:
if subscription.delta:
# respond with the filtered changes
response = Response.Delta(
subscription.id_, subscription.context, changes)
subscription.response_queue.put(response)
else:
# respond with the structure of everything
# below the endpoint
update = self._block_state_cache
for p in endpoint:
update = update[p]
response = Response.Update(
subscription.id_, subscription.context, update)
subscription.response_queue.put(response)
self._last_changes[request.name] = []

def _handle_block_changed(self, request):
"""Record changes to made to a block"""
for path, value in request.changes:
# update changes
block_changes = self._last_changes.setdefault(path[0], [])
block_changes.append([path, value])

def _handle_subscribe(self, request):
"""Add a new subscriber and respond with the current
sub-structure state"""
subs = self._subscriptions.setdefault(request.endpoint[0], [])
subs.append(request)
d = self._block_state_cache
for p in request.endpoint:
d = d[p]
if request.delta:
request.respond_with_delta([[[], d]])
else:
request.respond_with_update(d)

def _handle_get(self, request):
layer = self._block_state_cache[request.endpoint[0]]
for p in request.endpoint[1:]:
layer = layer[p]
result = layer.to_dict() if hasattr(layer, "to_dict") else layer
response = Response.Return(request.id_, request.context, result)
request.response_queue.put(response)

def notify_subscribers(self, block_name):
self.q.put(BlockNotify(name=block_name))

def on_changed(self, changes):
self.q.put(BlockChanged(changes=changes))
44 changes: 44 additions & 0 deletions malcolm/core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ class Request(object):
"""An object to interact with the attributes of a Block"""

POST = "Post"
PUT = "Put"
GET = "Get"
SUBSCRIBE = "Subscribe"

def __init__(self, context, response_queue, type_):
"""
Expand Down Expand Up @@ -56,6 +59,26 @@ def respond_with_error(self, message):
response = Response.Error(self.id_, self.context, message=message)
self.response_queue.put(response)

def respond_with_update(self, value):
"""
Create an Update Response object to handle the request
Args:
value (dict): Dictionary describing the new structure
"""
response = Response.Update(self.id_, self.context, value=value)
self.response_queue.put(response)

def respond_with_delta(self, changes):
"""
Create a Delta Response object to handle the request
Args:
changes (list): list of [[path], value] pairs for changed values
"""
response = Response.Delta(self.id_, self.context, changes=changes)
self.response_queue.put(response)

@classmethod
def Get(cls, context, response_queue, endpoint):
"""
Expand Down Expand Up @@ -98,6 +121,24 @@ def Post(cls, context, response_queue, endpoint, parameters=None):

return request

@classmethod
def Subscribe(cls, context, response_queue, endpoint, delta=False):
"""Create a Subscribe Request object
Args:
context: Context of Subscribe
response_queue (Queue): Queue to return to
endpoint (list[str]): Path to target
delta (bool): Notify of differences only (default False)
Returns:
Subscribe object
"""
request = Request(context, response_queue, type_="Subscribe")
request.fields["endpoint"] = endpoint
request.fields["delta"] = delta
return request

def to_dict(self):
"""Convert object attributes into a dictionary"""

Expand All @@ -122,3 +163,6 @@ def from_dict(cls, d):
for field in [f for f in d.keys() if f not in ["id", "type"]]:
request.fields[field] = d[field]
return request

def __repr__(self):
return self.to_dict().__repr__()
34 changes: 34 additions & 0 deletions malcolm/core/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
class Response(object):
"""Represents a response to a message"""
RETURN = "Return"
DELTA = "Delta"
UPDATE = "Update"
ERROR = "Error"

def __init__(self, id_, context, type_):
self.id_ = id_
Expand All @@ -22,6 +25,9 @@ def to_dict(self):
def __getattr__(self, attr):
return self.fields[attr]

def __repr__(self):
return self.to_dict().__repr__()

@classmethod
def Return(cls, id_, context, value=None):
"""Create a Return Response object with the provided parameters.
Expand Down Expand Up @@ -50,6 +56,34 @@ def Error(cls, id_, context, message):
response.fields["message"] = message
return response

@classmethod
def Update(cls, id_, context, value):
"""
Create an Update Response object with the provided parameters.
Args:
id_ (int): id from intial message
context: Context associated with id
value (dict): Serialized state of update object
"""
response = cls(id_, context, "Update")
response.fields["value"] = value
return response

@classmethod
def Delta(cls, id_, context, changes):
"""
Create a Delta Response object with the provided parameters.
Args:
id_ (int): id from initial message
context: Context associated with id
changes (list): list of [[path], value] pairs for changed values
"""
response = cls(id_, context, "Delta")
response.fields["changes"] = changes
return response

@classmethod
def from_dict(cls, d):
"""Create a Response instance from a serialized version
Expand Down
2 changes: 1 addition & 1 deletion malcolm/wscomms/wsservercomms.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def on_request(self, request):
"""

request.response_queue = self.q
self.process.handle_request(request)
self.process.q.put(request)

def start_recv_loop(self):
"""Start a receive loop to dispatch requests to Process"""
Expand Down
10 changes: 10 additions & 0 deletions tests/test_core/test_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,28 @@ def test_init(self):

def test_add_method_registers(self):
b = Block("blockname")
b.on_changed = MagicMock(side_effect=b.on_changed)
m = MagicMock()
m.name = "mymethod"
b.add_method(m)
self.assertEqual(b._methods.keys(), ["mymethod"])
self.assertFalse(m.called)
b.on_changed.assert_called_with([[[m.name], m.to_dict.return_value]])
m.return_value = 42
self.assertEqual(b.mymethod(), 42)
m.assert_called_once_with()

def test_add_attribute(self):
b = Block("blockname")
b.on_changed = MagicMock(side_effect=b.on_changed)
attr = MagicMock()
attr.name = "attr"
b.add_attribute(attr)
attr.set_parent.assert_called_once_with(b)
self.assertEqual({"attr":attr}, b._attributes)
self.assertIs(attr, b.attr)
b.on_changed.assert_called_with(
[[[attr.name], attr.to_dict.return_value]])

class TestToDict(unittest.TestCase):

Expand Down Expand Up @@ -71,6 +76,11 @@ def test_returns_dict(self):
block.add_attribute(a1)
block.add_attribute(a2)

m1.reset_mock()
m2.reset_mock()
a1.reset_mock()
a2.reset_mock()

expected_dict = OrderedDict()
expected_dict['attr_one'] = a1dict
expected_dict['attr_two'] = a2dict
Expand Down

0 comments on commit 7880ad2

Please sign in to comment.