Skip to content

Commit

Permalink
Rewrite client controller to do subscribes
Browse files Browse the repository at this point in the history
  • Loading branch information
coretl authored and c-mita committed Jul 1, 2016
1 parent a3d7881 commit 28bcce6
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 88 deletions.
77 changes: 47 additions & 30 deletions malcolm/controllers/clientcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,64 @@
from malcolm.core.controller import Controller
from malcolm.core.request import Request
from malcolm.core.method import Method
from malcolm.core.serializable import Serializable


class ClientController(Controller):
"""Sync a local block with a given remote block"""
REMOTE_BLOCKS_ID = 0
BLOCK_ID = 1

def __init__(self, process, block):
"""
Args:
process (Process): The process this should run under
block (Block): The local block we should be controlling
client_comms (ClientComms): Should be already connected to a server
hosting the remote block
"""
self.q = process.create_queue()
self.client_comms = process.get_client_comms(block.name)
assert self.client_comms, \
"Process doesn't know about block %s" % block.name
# Call this last as it calls create_methods
super(ClientController, self).__init__(block=block)
self.process = process
request = Request.Subscribe(
None, self, [process.name, "remoteBlocks", "value"])
request.set_id(self.REMOTE_BLOCKS_ID)
self.process.q.put(request)

def create_methods(self):
"""Get methods from remote block and mirror them internally"""
request = Request.Get(None, self.q, [self.block.name])
self.client_comms.q.put(request)
self.log_debug("Waiting for response to Get %s", self.block.name)
response = self.q.get()
# Find all the methods
for aname, amap in response.value.items():
# TODO: If it has "takes" it's a method, flaky...
if "takes" in amap:
yield self.wrap_method(aname, amap)
def put(self, response):
"""We don't have a queue as no thread to service, but act like one"""
if response.id_ == self.REMOTE_BLOCKS_ID:
if response.value and self.block.name in response.value:
# process knows how to get to a block
self._subscribe_to_block(self.block.name)
elif response.id_ == self.BLOCK_ID:
with self.block.lock:
self.log_debug(response)
for change in response.changes:
if change[0] == []:
# update root
self._regenerate_block(change[1])
else:
# just pass it to the block to handle
self.block.update(change)

def wrap_method(self, method_name, method_map):
"""Take the serialized method map and create a Method from it
def _regenerate_block(self, d):
children = []
for k, v in d.items():
if k == "typeid":
continue
child = Serializable.from_dict(k, v)
children.append(child)
if isinstance(child, Method):
# calling method forwards to server
child.set_function(
functools.partial(self.call_server_method, k))
self.block.replace_children(children)

Args:
method_map (dict): Serialized Method
"""
method = Method.from_dict(method_name, method_map)
method.set_function(
functools.partial(self.call_server_method, method_name))
self.log_debug("Wrapping method %s", method_name)
return method
def _subscribe_to_block(self, block_name):
self.client_comms = self.process.get_client_comms(block_name)
assert self.client_comms, \
"Process doesn't know about block %s" % block_name
request = Request.Subscribe(None, self, [block_name], delta=True)
request.set_id(self.BLOCK_ID)
self.client_comms.q.put(request)

def call_server_method(self, method_name, parameters, returns):
"""Call method_name on the server
Expand All @@ -56,10 +71,12 @@ def call_server_method(self, method_name, parameters, returns):
returns (Map): Returns map to fill and return
"""
self.log_debug(dict(parameters))
request = Request.Post(None, self.q,
q = self.process.create_queue()
request = Request.Post(None, q,
[self.block.name, method_name], parameters)
self.client_comms.q.put(request)
response = self.q.get()
with self.block.lock_released():
response = q.get()
assert response.type_ == response.RETURN, \
"Expected Return, got %s" % response.type_
returns.update(response.value)
Expand Down
88 changes: 35 additions & 53 deletions tests/test_controllers/test_clientcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,73 +9,54 @@
from mock import MagicMock, patch

# logging
import logging
logging.basicConfig(level=logging.DEBUG)
# import logging
# logging.basicConfig(level=logging.DEBUG)

# module imports
from malcolm.controllers.clientcontroller import ClientController
from malcolm.core.process import Process, BlockList
from malcolm.controllers.hellocontroller import HelloController
from malcolm.core.block import Block
from malcolm.core.stringmeta import StringMeta
from malcolm.core.syncfactory import SyncFactory

from malcolm.compat import Queue

class TestClientController(unittest.TestCase):

def setUp(self):
s = SyncFactory("sync")
self.p = Process("process", s)
# Serialized version of the block we want
source = Block("blockname")
HelloController(source)
self.serialized = source.to_dict()
# Setup client controller prerequisites
self.b = Block("blockname")
self.p = MagicMock()
self.comms = MagicMock()
serialized = dict(
say_hello=dict(
description="Says hello",
tags=[],
takes=dict(
description="Hello takes",
tags=[],
elements=dict(
name=dict(
description="A name",
tags=["tag"],
writeable=False,
typeid="malcolm:core/StringMeta:1.0",
label="label",
),
),
required=["name"],
),
defaults={},
returns=dict(
description="Hello returns",
tags=[],
elements=dict(
greeting=dict(
description="A greeting",
tags=["tag"],
writeable=False,
typeid="malcolm:core/StringMeta:1.0",
label="label",
),
),
required=["response"],
),
writeable=True,
typeid="malcolm:core/Method:1.0",
label="label",
),
)

def f(request):
request.respond_with_return(serialized)

self.comms.q.put.side_effect = f
self.p._handle_block_list(BlockList(client_comms=self.comms, blocks=["blockname"]))
self.cc = ClientController(self.p, self.b)
# get process to give us comms
self.p.get_client_comms.return_value = self.comms
# tell our controller which blocks the process can talk to
response = MagicMock(id_=self.cc.REMOTE_BLOCKS_ID, value=["blockname"])
self.cc.put(response)
# tell our controller the serialized state of the block
response = MagicMock(id_=self.cc.BLOCK_ID, changes=[[[], self.serialized]])
self.cc.put(response)

def test_init(self):
self.assertEqual(self.p.q.put.call_count, 1)
req = self.p.q.put.call_args[0][0]
self.assertEqual(req.type_, "Subscribe")
self.assertEqual(req.endpoint, [self.p.name, "remoteBlocks", "value"])
self.assertEqual(req.response_queue, self.cc)
self.p.get_client_comms.assert_called_with("blockname")
self.assertEqual(self.comms.q.put.call_count, 1)
req = self.comms.q.put.call_args[0][0]
self.assertEqual(req.type_, "Subscribe")
self.assertEqual(req.delta, True)
self.assertEqual(req.response_queue, self.cc)
self.assertEqual(req.endpoint, ["blockname"])

def test_methods_created(self):
self.assertEqual(list(self.b._methods), ["say_hello"])
m = self.b._methods["say_hello"]
self.assertEqual(list(self.b.methods), ["say_hello"])
m = self.b.methods["say_hello"]
self.assertEqual(m.name, "say_hello")
self.assertEqual(list(m.takes.elements), ["name"])
self.assertEqual(type(m.takes.elements["name"]), StringMeta)
Expand All @@ -84,6 +65,7 @@ def test_methods_created(self):
self.assertEqual(m.defaults, {})

def test_call_method(self):
self.p.create_queue.return_value = Queue()
def f(request):
request.respond_with_return(dict(
greeting="Hello %s" % request.parameters.name))
Expand Down
8 changes: 3 additions & 5 deletions tests/test_wscomms/test_system_wscomms.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# tornado
from tornado.websocket import websocket_connect
from tornado import gen
from tornado.ioloop import IOLoop
import json


Expand All @@ -23,7 +22,6 @@
from malcolm.core.block import Block
from malcolm.core.process import Process
from malcolm.core.syncfactory import SyncFactory
from malcolm.core.request import Request
from malcolm.wscomms.wsservercomms import WSServerComms
from malcolm.wscomms.wsclientcomms import WSClientComms

Expand Down Expand Up @@ -76,12 +74,12 @@ def test_server_and_simple_client(self):
def test_server_with_malcolm_client(self):
self.cc = WSClientComms("cc", self.process, "ws://localhost:8888/ws")
self.cc.start()
# Wait for comms to be connected
# while not self.cc.conn.done():
time.sleep(1)
# Don't add to process as we already have a block of that name
block2 = Block("hello")
ClientController(self.process, block2)
# Normally we would wait for it to be connected here, but it isn't
# attached to a process so just sleep for a bit
time.sleep(0.1)
ret = block2.say_hello("me2")
self.assertEqual(ret, dict(greeting="Hello me2"))

Expand Down

0 comments on commit 28bcce6

Please sign in to comment.