Skip to content

Commit

Permalink
Change Post responses to go through the process queue first.
Browse files Browse the repository at this point in the history
Fixes out-of-order messages, where an Update (triggered by a method call
that changes attributes) can arrive after the method's Return message.
  • Loading branch information
c-mita committed Jun 14, 2016
1 parent 7880ad2 commit 470db4a
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 12 deletions.
5 changes: 4 additions & 1 deletion malcolm/core/block.py
@@ -1,6 +1,7 @@
from collections import OrderedDict

from malcolm.core.monitorable import Monitorable
from malcolm.core.process import BlockRespond


class Block(Monitorable):
Expand Down Expand Up @@ -55,7 +56,9 @@ def handle_request(self, request):
self.log_debug("Received request %s", request)
if request.type_ == request.POST:
method_name = request.endpoint[-1]
self._methods[method_name].handle_request(request)
response = self._methods[method_name].get_response(request)
response = BlockRespond(response, request.response_queue)
self.parent.q.put(response)
else:
layer = self
for next_link in request.endpoint[1:]:
Expand Down
7 changes: 4 additions & 3 deletions malcolm/core/method.py
Expand Up @@ -3,6 +3,7 @@

from malcolm.core.monitorable import Monitorable
from malcolm.core.mapmeta import MapMeta, OPTIONAL, REQUIRED
from malcolm.core.response import Response


class Method(Monitorable):
Expand Down Expand Up @@ -66,7 +67,7 @@ def __call__(self, *args, **kwargs):
self.returns.elements[r_name].validate(r_val)
return return_val

def handle_request(self, request):
def get_response(self, request):
"""Call exposed function using request parameters and respond with the
result
Expand All @@ -79,10 +80,10 @@ def handle_request(self, request):
except Exception as error:
self.log_debug("Error raised %s", error.message)
message = "Method %s raised an error: %s" % (self.name, error.message)
request.respond_with_error(message)
return Response.Error(request.id_, request.context, message)
else:
self.log_debug("Returning result %s", result)
request.respond_with_return(result)
return Response.Return(request.id_, request.context, value=result)

def to_dict(self):
"""Return ordered dictionary representing Method object."""
Expand Down
9 changes: 8 additions & 1 deletion malcolm/core/process.py
Expand Up @@ -11,8 +11,10 @@
# Internal update messages
BlockNotify = namedtuple("BlockNotify", "name")
BlockChanged = namedtuple("BlockChanged", "changes")
BlockRespond = namedtuple("BlockRespond", "response, response_queue")
BlockNotify.type_ = "BlockNotify"
BlockChanged.type_ = "BlockChanged"
BlockRespond.type_ = "BlockRespond"


class Process(Loggable):
Expand All @@ -35,7 +37,8 @@ def __init__(self, name, sync_factory):
Request.GET: self._handle_get,
Request.SUBSCRIBE: self._handle_subscribe,
BlockNotify.type_: self._handle_block_notify,
BlockChanged.type_: self._handle_block_changed
BlockChanged.type_: self._handle_block_changed,
BlockRespond.type_: self._handle_block_respond
}

def recv_loop(self):
Expand Down Expand Up @@ -161,6 +164,10 @@ def _handle_block_changed(self, request):
block_changes = self._last_changes.setdefault(path[0], [])
block_changes.append([path, value])

def _handle_block_respond(self, request):
"""Push the response to the required queue"""
request.response_queue.put(request.response)

def _handle_subscribe(self, request):
"""Add a new subscriber and respond with the current
sub-structure state"""
Expand Down
3 changes: 2 additions & 1 deletion tests/test_core/test_block.py
Expand Up @@ -98,6 +98,7 @@ class TestHandleRequest(unittest.TestCase):

def setUp(self):
self.block = Block("TestBlock")
self.block.parent = MagicMock()
self.method = MagicMock()
self.method.name = "get_things"
self.response = MagicMock()
Expand All @@ -111,7 +112,7 @@ def test_given_request_then_pass_to_correct_method(self):

self.block.handle_request(request)

self.method.handle_request.assert_called_once_with(request)
self.method.get_response.assert_called_once_with(request)

def test_given_get_then_return_attribute(self):
self.block.state = MagicMock()
Expand Down
13 changes: 7 additions & 6 deletions tests/test_core/test_method.py
Expand Up @@ -10,6 +10,7 @@

from malcolm.core.method import Method, takes, returns
from malcolm.core.mapmeta import OPTIONAL, REQUIRED
from malcolm.core.response import Response


class TestMethod(unittest.TestCase):
Expand Down Expand Up @@ -134,9 +135,9 @@ def test_handle_request(self):
request = Mock(
id=(123, Mock()), type="Post", parameters={"first": 2},
respond_with_return=Mock())
m.handle_request(request)
response = m.get_response(request)
func.assert_called_with({"first": 2})
request.respond_with_return.assert_called_with({"output": 1})
self.assertEquals({"output":1}, response.value)

def test_handle_request_error(self):
func = MagicMock()
Expand All @@ -147,10 +148,10 @@ def test_handle_request_error(self):
m.returns = MagicMock()
request = MagicMock()

m.handle_request(request)

request.respond_with_error.assert_called_once_with(
"Method test_method raised an error: Test error")
response = m.get_response(request)
self.assertEquals(Response.ERROR, response.type_)
self.assertEquals(
"Method test_method raised an error: Test error", response.message)

def test_to_dict_serialization(self):
func = Mock(return_value={"out": "dummy"})
Expand Down

0 comments on commit 470db4a

Please sign in to comment.