Skip to content

Commit

Permalink
Try to fix race condition in spawning comms loops
Browse files Browse the repository at this point in the history
  • Loading branch information
coretl committed Aug 24, 2016
1 parent 5e1d202 commit 58b7146
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 22 deletions.
7 changes: 2 additions & 5 deletions malcolm/comms/pva/pvaclientcomms.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ def __init__(self, process, _=None):
name (str): Name for logging
process (Process): Process for primitive creation
"""
name = "PvaClientComms"
super(PvaClientComms, self).__init__(name, process)

self.name = name
self.process = process
self.name = "PvaClientComms"
super(PvaClientComms, self).__init__(self.name, process)

def send_to_server(self, request):
"""Dispatch a request to the server
Expand Down
16 changes: 7 additions & 9 deletions malcolm/comms/pva/pvaservercomms.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ class PvaServerComms(ServerComms):
CACHE_UPDATE = 0

def __init__(self, process, _=None):
name = "PvaServerComms"
super(PvaServerComms, self).__init__(name, process)

self.name = name
self.process = process
self.name = "PvaServerComms"

self._lock = RLock()

Expand All @@ -41,14 +37,16 @@ def __init__(self, process, _=None):
# Create the V4 PVA server object
self.create_pva_server()

# Add a thread for executing the V4 PVA server
self.add_spawn_function(self.start_pva_server)

super(PvaServerComms, self).__init__(self.name, process)

# Set up the subscription for everything (root down)
request = Subscribe(None, self.q, [], True)
request.set_id(self._root_id)
self.process.q.put(request)

# Add a thread for executing the V4 PVA server
self.add_spawn_function(self.start_pva_server)

def _get_unique_id(self):
with self._lock:
self._current_id += 1
Expand Down Expand Up @@ -105,7 +103,7 @@ def _add_new_pva_channel(self, block):
self._endpoints[block] = PvaEndpoint(self.name, block, self._server, self)

def create_pva_server(self):
self.log_debug("Creating PVA server object")
#self.log_debug("Creating PVA server object")
self._server = pvaccess.PvaServer()

def start_pva_server(self):
Expand Down
2 changes: 1 addition & 1 deletion malcolm/comms/websocket/websocketclientcomms.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ def __init__(self, process, params):
url (str): Url for websocket connection. E.g. ws://localhost:8888/ws
"""
url = "ws://%(hostname)s:%(port)d/ws" % params
super(WebsocketClientComms, self).__init__(url, process)
self.url = url
# TODO: Are we starting one or more IOLoops here?
self.loop = IOLoop.current()
self.conn = websocket_connect(
url, callback=self.subscribe_server_blocks,
on_message_callback=self.on_message)
self.add_spawn_function(self.loop.start, self.stop_recv_loop)
super(WebsocketClientComms, self).__init__(url, process)

def on_message(self, message):
"""
Expand Down
7 changes: 2 additions & 5 deletions malcolm/comms/websocket/websocketservercomms.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ class WebsocketServerComms(ServerComms):
"""A class for communication between browser and server"""

def __init__(self, process, params):
logger_name = "WebsocketServerComms(%(port)d)" % params
super(WebsocketServerComms, self).__init__(logger_name, process)

self.process = process

MalcWebSocketHandler.servercomms = self
MalcBlockHandler.servercomms = self

Expand All @@ -71,6 +66,8 @@ def __init__(self, process, params):
self.server.listen(int(params["port"]))
self.loop = IOLoop.current()
self.add_spawn_function(self.loop.start, self.stop_recv_loop)
logger_name = "WebsocketServerComms(%(port)d)" % params
super(WebsocketServerComms, self).__init__(logger_name, process)

def send_to_client(self, response):
"""Dispatch response to a client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def test_start(self, ioloop_mock, _):
self.WS.process.spawn = MagicMock()
self.WS.start()

self.assertEqual([call(self.WS.send_loop), call(self.WS.loop.start)],
self.assertEqual([call(self.WS.loop.start), call(self.WS.send_loop)],
self.WS.process.spawn.call_args_list)

@patch('malcolm.comms.websocket.websocketclientcomms.websocket_connect')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_start(self, _, _2):
self.WS = WebsocketServerComms(self.p, dict(port=1))
self.WS.start()

self.assertEqual([call(self.WS.send_loop), call(self.WS.loop.start)],
self.assertEqual([call(self.WS.loop.start), call(self.WS.send_loop)],
self.p.spawn.call_args_list)

@patch('malcolm.comms.websocket.websocketservercomms.HTTPServer')
Expand Down

0 comments on commit 58b7146

Please sign in to comment.