Skip to content

Commit

Permalink
Convert ClientComms and ServerComms to use Spawnable mixin
Browse files Browse the repository at this point in the history
  • Loading branch information
c-mita committed Jun 22, 2016
1 parent 13c1c02 commit 2eb5d3d
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 127 deletions.
28 changes: 6 additions & 22 deletions malcolm/core/clientcomms.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
from collections import OrderedDict

from malcolm.core.loggable import Loggable
from malcolm.core.spawnable import Spawnable

# Sentinel object to stop the send loop
CLIENT_STOP = object()


class ClientComms(Loggable):
class ClientComms(Loggable, Spawnable):
"""Abstract class for dispatching requests to a server and resonses to
a method"""

def __init__(self, name, process):
super(ClientComms, self).__init__(logger_name=name)
self.process = process
self.q = self.process.create_queue()
self._send_spawned = None
self._current_id = 1
self.requests = OrderedDict()
self.add_spawn_function(self.send_loop,
self.make_default_stop_func(self.q))
self.add_spawn_function(self.start_recv_loop, self.stop_recv_loop)

def send_loop(self):
"""Service self.q, sending requests to server"""
while True:
request = self.q.get()
if request is CLIENT_STOP:
if request is Spawnable.STOP:
break
try:
request.id_ = self._current_id
Expand All @@ -48,22 +48,6 @@ def send_to_caller(self, response):
request = self.requests[response.id_]
request.response_queue.put(response)

def start(self):
"""Start communications"""
self._send_spawned = self.process.spawn(self.send_loop)
self.start_recv_loop()

def stop(self, timeout=None):
"""Request all communications be stopped and wait for finish
Args:
timeout (float): Time in seconds to wait for comms to stop.
None means wait forever.
"""
self.q.put(CLIENT_STOP)
self._send_spawned.wait(timeout=timeout)
self.stop_recv_loop()

def start_recv_loop(self):
"""Abstract method to start a receive loop to dispatch responses to a
a Method"""
Expand Down
28 changes: 6 additions & 22 deletions malcolm/core/servercomms.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
from malcolm.core.loggable import Loggable
from malcolm.core.spawnable import Spawnable

# Sentinel object to stop the send loop
SERVER_STOP = object()


class ServerComms(Loggable):
class ServerComms(Loggable, Spawnable):
"""Abstract class for dispatching requests to a process and responses to a
client"""

def __init__(self, name, process):
super(ServerComms, self).__init__(logger_name=name)
self.process = process
self.q = self.process.create_queue()
self._send_spawned = None
self.add_spawn_function(self.send_loop,
self.make_default_stop_func(self.q))
self.add_spawn_function(self.start_recv_loop, self.stop_recv_loop)

def send_loop(self):
"""Service self.q, sending responses to client"""
while True:
response = self.q.get()
if response is SERVER_STOP:
if response is Spawnable.STOP:
break
try:
self.send_to_client(response)
Expand All @@ -39,28 +39,12 @@ def send_to_process(self, request):
"""Send request to process"""
self.process.q.put(request)

def start(self):
"""Start communications"""
self._send_spawned = self.process.spawn(self.send_loop)
self.start_recv_loop()

def start_recv_loop(self):
"""Abstract method to start a recieve loop to dispatch requests to
Process"""
raise NotImplementedError(
"Abstract method that must be implemented by deriving class")

def stop(self, timeout=None):
"""Request all communications be stopped and wait for finish
Args:
timeout (float): Time in seconds to wait for comms to stop.
None means wait forever.
"""
self.q.put(SERVER_STOP)
self._send_spawned.wait(timeout=timeout)
self.stop_recv_loop()

def stop_recv_loop(self):
"""Abstract method to stop the receive loop created by
start_recv_loop"""
Expand Down
51 changes: 14 additions & 37 deletions tests/test_core/test_clientcomms.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@
from collections import OrderedDict

from . import util
from mock import Mock
from mock import Mock, patch, call

from malcolm.core.clientcomms import ClientComms, CLIENT_STOP
from malcolm.core.clientcomms import ClientComms
from malcolm.core.syncfactory import SyncFactory

class TestClientComms(unittest.TestCase):
def test_init(self):
@patch("malcolm.core.clientcomms.ClientComms.add_spawn_function")
@patch("malcolm.core.clientcomms.ClientComms.make_default_stop_func")
def test_init(self, def_stop, add_func):
process = Mock()
client = ClientComms("c", process)
process.create_queue.assert_called_once_with()
self.assertEqual(client.q, process.create_queue.return_value)
spawn_function_calls = client.add_spawn_function.call_args_list
self.assertEquals(
[call(client.send_loop, def_stop.return_value),
call(client.start_recv_loop, client.stop_recv_loop)],
spawn_function_calls)

def test_not_implemented_error(self):
client = ClientComms("c", Mock())
Expand All @@ -25,7 +32,7 @@ def test_send_logs_error(self):
client.send_to_server = Mock(side_effect=Exception)
request = Mock()
request.to_dict = Mock(return_value = "<to_dict>")
client.q.get = Mock(side_effect = [request, CLIENT_STOP])
client.q.get = Mock(side_effect = [request, client.STOP])
client.log_exception = Mock()
client.send_loop()
client.log_exception.assert_called_once_with(
Expand All @@ -36,58 +43,28 @@ def test_requests_are_stored(self):
client._current_id = 1234
request = Mock()
client.send_to_server = Mock()
client.q.get = Mock(side_effect = [request, CLIENT_STOP])
client.q.get = Mock(side_effect = [request, client.STOP])
client.send_loop()
expected = OrderedDict({1234 : request})
self.assertEquals(expected, client.requests)

def test_loop_starts(self):
process = Mock(spawn = lambda x: x())
client = ClientComms("c", process)
client.send_loop = Mock()
client.start_recv_loop = Mock()
client.log_exception = Mock()
client.start()
client.send_loop.assert_called_once_with()
client.start_recv_loop.assert_called_once_with()
client.log_exception.assert_not_called()

def test_sends_to_server(self):
client = ClientComms("c", Mock())
client.send_to_server = Mock()
request = Mock()
client.q.get = Mock(side_effect = [request, CLIENT_STOP])
client.q.get = Mock(side_effect = [request, client.STOP])
client.log_exception = Mock()
client.send_loop()
client.send_to_server.assert_called_once_with(request)
client.log_exception.assert_not_called()

def test_start_stop(self):
sync_factory = SyncFactory("s")
process = Mock()
process.spawn = sync_factory.spawn
process.create_queue = sync_factory.create_queue
client = ClientComms("c", process)
client.send_loop = Mock(side_effect = client.send_loop)
client.start_recv_loop = Mock()
client.stop_recv_loop = Mock()
client.log_exception = Mock()
client.start()
self.assertFalse(client._send_spawned.ready())
client.start_recv_loop.assert_called_once_with()
client.stop(0.1)
self.assertTrue(client._send_spawned.ready())
client.send_loop.assert_called_once_with()
client.stop_recv_loop.assert_called_once_with()
client.log_exception.assert_not_called()

def test_request_id_provided(self):
client = ClientComms("c", Mock())
client._current_id = 1234
client.send_to_server = Mock()
request_1 = Mock(id_ = None)
request_2 = Mock(id_ = None)
client.q.get = Mock(side_effect = [request_1, request_2, CLIENT_STOP])
client.q.get = Mock(side_effect = [request_1, request_2, client.STOP])
client.send_loop()
self.assertEqual(1234, request_1.id_)
self.assertEqual(1235, request_2.id_)
Expand Down
60 changes: 14 additions & 46 deletions tests/test_core/test_servercomms.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,43 @@
import unittest

from . import util
from mock import Mock
from mock import Mock, patch, call

from malcolm.core.servercomms import ServerComms, SERVER_STOP
from malcolm.core.servercomms import ServerComms
from malcolm.core.spawnable import Spawnable
from malcolm.core.syncfactory import SyncFactory

class TestServerComms(unittest.TestCase):

def setUp(self):
self.process = Mock()

def test_init(self):
@patch("malcolm.core.servercomms.ServerComms.add_spawn_function")
@patch("malcolm.core.servercomms.ServerComms.make_default_stop_func")
def test_init(self, def_stop, add_func):
server = ServerComms("server", self.process)
self.process.create_queue.assert_called_once_with()
self.assertEqual(
server.q, self.process.create_queue.return_value)
self.assertEquals(
[call(server.send_loop, def_stop.return_value),
call(server.start_recv_loop, server.stop_recv_loop)],
server.add_spawn_function.call_args_list)

def test_not_implemented_error(self):
server = ServerComms("server", self.process)
self.assertRaises(NotImplementedError, server.send_to_client, Mock())
self.assertRaises(NotImplementedError, server.start_recv_loop)
self.assertRaises(NotImplementedError, server.stop_recv_loop)

def test_loop_starts(self):
self.process.spawn = lambda x: x()
server = ServerComms("server", self.process)
server.send_loop = Mock()
server.start_recv_loop = Mock()
server.start()
server.send_loop.assert_called_once_with()
server.start_recv_loop.assert_called_once_with()

def test_loop_stops(self):
self.process.spawn = lambda x: x()
self.process.create_queue = Mock(
return_value=Mock(get=Mock(return_value=SERVER_STOP)))
server = ServerComms("server", self.process)
server.start_recv_loop = Mock()
server.stop_recv_loop = Mock()
server.send_loop = Mock(side_effect = server.send_loop)
server.start()
server.send_loop.assert_called_once_with()

def test_start_stop(self):
self.process.sync_factory = SyncFactory("s")
self.process.spawn = self.process.sync_factory.spawn
self.process.create_queue = self.process.sync_factory.create_queue
server = ServerComms("server", self.process)
server.send_loop = Mock(side_effect = server.send_loop)
server.start_recv_loop = Mock()
server.stop_recv_loop = Mock()
server.start()
self.assertFalse(server._send_spawned.ready())
server.start_recv_loop.assert_called_once_with()
server.stop(0.1)
self.assertTrue(server._send_spawned.ready())
server.send_loop.assert_called_once_with()
server.stop_recv_loop.assert_called_once_with()

def test_send_to_client(self):
def test_send_to_client_called(self):
request = Mock()
dummy_queue = Mock()
dummy_queue.get = Mock(side_effect = [request, SERVER_STOP])
dummy_queue.get = Mock(side_effect = [request, Spawnable.STOP])
self.process.create_queue = Mock(return_value = dummy_queue)
self.process.spawn = Mock(side_effect = lambda x: x())
server = ServerComms("server", self.process)
server.send_to_client = Mock(
side_effect = server.send_to_client)
server.start_recv_loop = Mock()
server.start()
server.send_to_client = Mock()
server.send_loop()
server.send_to_client.assert_called_once_with(request)

def test_send_to_process(self):
Expand Down
2 changes: 2 additions & 0 deletions tests/test_wscomms/test_system_wscomms.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ def setUp(self):
def tearDown(self):
if hasattr(self, "cc"):
self.cc.stop()
self.cc.wait()
self.sc.stop()
self.sc.wait()
self.process.stop()

@gen.coroutine
Expand Down

0 comments on commit 2eb5d3d

Please sign in to comment.