Skip to content

Commit

Permalink
Monitor socket for connection establishment
Browse files Browse the repository at this point in the history
Only once the connection has been fully made we can declare success and
let the flow continue in the thread that initiated the subscription.

Signed-off-by: Rodrigo Tobar <rtobar@icrar.org>
  • Loading branch information
rtobar committed Jul 7, 2020
1 parent 9db41c3 commit 67b58ac
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion daliuge-runtime/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,21 +414,33 @@ def _deliver_events(self):

def _receive_events(self, sock_created):
import zmq
from zmq.utils.monitor import recv_monitor_message

sub = self._context.socket(zmq.SUB) # @UndefinedVariable
sub.setsockopt(zmq.SUBSCRIBE, six.b('')) # @UndefinedVariable
sub_monitor = sub.get_monitor_socket()
sock_created.set()

pending_connections = {}
while self._pubsub_running:

# A new subscription has been requested
try:
subscription = self._subscriptions.get_nowait()
sub.connect(subscription.endpoint)
subscription.finished_evt.set()
pending_connections[subscription.endpoint] = subscription.finished_evt
except Queue.Empty:
pass

try:
msg = recv_monitor_message(sub_monitor, flags=zmq.NOBLOCK)
if msg['event'] != zmq.EVENT_CONNECTED:
continue
finished_evt = pending_connections.pop(utils.b2s(msg['endpoint']))
finished_evt.set()
except zmq.error.Again:
pass

try:
evt = sub.recv_pyobj(flags = zmq.NOBLOCK) # @UndefinedVariable
self._events_in.put(evt)
Expand All @@ -438,6 +450,8 @@ def _receive_events(self, sock_created):
# Figure out what to do here
logger.exception("Something bad happened in %s:%d to ZMQ :'(", self._events_host, self._events_port)
break

sub_monitor.close()
sub.close()


Expand Down

0 comments on commit 67b58ac

Please sign in to comment.