Skip to content

Commit

Permalink
Monitor socket for connection establishment
Browse files Browse the repository at this point in the history
Only once the connection has been fully made we can declare success and
let the flow continue in the thread that initiated the subscription.

Signed-off-by: Rodrigo Tobar <rtobar@icrar.org>
  • Loading branch information
rtobar committed Jul 3, 2020
1 parent de1d3a7 commit a82a424
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions daliuge-runtime/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,29 +403,40 @@ def _publish_events(self, sock_created):

def _receive_events(self, sock_created):
import zmq
from zmq.utils.monitor import recv_monitor_message

sub = self._context.socket(zmq.SUB) # @UndefinedVariable
sub.setsockopt(zmq.SUBSCRIBE, six.b('')) # @UndefinedVariable
sub_monitor = sub.get_monitor_socket()
poller = zmq.Poller() # @UndefinedVariable
poller.register(sub, zmq.POLLIN) # @UndefinedVariable
poller.register(sub_monitor, zmq.POLLIN) # @UndefinedVariable
sock_created.set()

pending_connections = {}
while self._pubsub_running:

# A new subscription has been requested
try:
subscription = self._subscriptions.get(block=True, timeout=0.01)
sub.connect(subscription.endpoint)
subscription.finished_evt.set()
pending_connections[subscription.endpoint] = subscription.finished_evt
except Queue.Empty:
pass

if not self._pubsub_running:
break
for _ in poller.poll(100):
evt = sub.recv_pyobj(flags=zmq.NOBLOCK)
self._events_in.put(evt)
logger.debug('Put incoming event in queue for delivery: %r', evt)
for sock, _ in poller.poll(100):
if sock == sub_monitor:
msg = recv_monitor_message(sock, flags=zmq.NOBLOCK)
if msg['event'] != zmq.EVENT_CONNECTED:
continue
finished_evt = pending_connections.pop(utils.b2s(msg['endpoint']))
finished_evt.set()
elif sock == sub:
evt = sub.recv_pyobj(flags=zmq.NOBLOCK)
self._events_in.put(evt)
sub_monitor.close()
sub.close()

def _deliver_events(self):
Expand Down

0 comments on commit a82a424

Please sign in to comment.