Skip to content

Commit

Permalink
Merge 67b58ac into c86850e
Browse files Browse the repository at this point in the history
  • Loading branch information
rtobar committed Jul 7, 2020
2 parents c86850e + 67b58ac commit 21cbacb
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 254 deletions.
125 changes: 79 additions & 46 deletions daliuge-runtime/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,30 @@ def call_drop(self, sessionId, uid, method, *args):


class ZMQPubSubMixIn(object):
"""
ZeroMQ-based event publisher and subscriber.
Event publishing and event reception are done in their own separate
threads, where the externally-facing ZeroMQ sockets are created and used.
Events to be published are fed into the publishing thread via a safe-thread
Queue object (self._events_out), enabling any local thread to publish events
without having to worry about ZeroMQ thread-safeness.
The event reception thread not only *receives* events, but also updates the
subscription socket to connect to new peers. These updates are fed via a
Queue object (self._subscriptions), enabling any local thread to indicate a
new peer to subscribe to in a thread-safe manner.
Note that we investigated not using Queue objects to communicate between
threads, and use inproc:// ZeroMQ sockets instead. This works, but at a
cost: all threads putting values into these sockets would need to check,
each time they use a socket in any manner, if the Context object is still
valid and hasn't been closed (or alternatively if self._pubsub_running is
still True). Our experience with this alternative was not satisfactory, and
therefore we went for a Queue-based thread communication model, making the
handling of ZeroMQ resources simpler.
"""

subscription = collections.namedtuple('subscription', 'endpoint finished_evt')

Expand All @@ -311,75 +335,61 @@ def __init__(self, host, events_port):
self._events_port = events_port

def start(self):

# temporarily timing import statements to check FS times on HPC environs
zmq = utils.timed_import('zmq')

self._pubsub_running = True
super(ZMQPubSubMixIn, self).start()
self._pubevts = Queue.Queue()
self._recvevts = Queue.Queue()
self._events_in = Queue.Queue()
self._events_out = Queue.Queue()
self._subscriptions = Queue.Queue()

# Setting up zeromq for event publishing/subscription
# They share the same context, there's no need for two separate ones
self._zmqctx = zmq.Context()

# We create the sockets in their respective threads to avoid
# multithreading issues with zmq, but still wait until they are created
# via these events
# Starts background threads, but wait until their sockets are created
timeout = 30
pubsock_created = threading.Event()
subsock_created = threading.Event()

self._zmqpubthread = threading.Thread(target = self._zmq_pub_thread, name="ZMQ evtpub", args=(pubsock_created,))
self._zmqpubthread.start()
if not pubsock_created.wait(timeout):
raise Exception("Failed to create PUB ZMQ socket in %d seconds" % (timeout,))

self._zmqsubthread = threading.Thread(target = self._zmq_sub_thread, name="ZMQ evtsub", args=(subsock_created,))
self._zmqsubthread.start()
if not subsock_created.wait(timeout):
raise Exception("Failed to create PUB ZMQ socket in %d seconds" % (timeout,))

self._zmqsubqthread = threading.Thread(target = self._zmq_sub_queue_thread, name="ZMQ evtsubq")
self._zmqsubqthread.start()
self._event_publisher = self._start_thread(self._publish_events, "Evt pub", timeout)
self._event_receiver = self._start_thread(self._receive_events, "Evt recv", timeout)
self._event_deliverer = self._start_thread(self._deliver_events, "Evt delivery")

def _start_thread(self, target, name, timeout=None):
evt = threading.Event() if timeout else None
args = (evt,) if evt else ()
t = threading.Thread(target=target, name=name, args=args)
t.start()
if evt and not evt.wait(timeout):
raise Exception('Failed to start %s thread in %d seconds' % (name, timeout))
return t

def shutdown(self):
super(ZMQPubSubMixIn, self).shutdown()
self._pubsub_running = False
self._zmqsubqthread.join()
self._zmqpubthread.join()
self._zmqsubthread.join()
self._zmqctx.destroy()
logger.info("ZMQ context used for event pub/sub destroyed")
self._event_deliverer.join()
self._event_publisher.join()
self._event_receiver.join()
logger.info("ZeroMQ event publisher/subscriber finished")

def publish_event(self, evt):
self._pubevts.put(evt)
self._events_out.put(evt)

def subscribe(self, host, port):
timeout = 5
finished_evt = threading.Event()
endpoint = "tcp://%s:%d" % (host, port)
endpoint = "tcp://%s:%d" % (utils.zmq_safe(host), port)
self._subscriptions.put(ZMQPubSubMixIn.subscription(endpoint, finished_evt))
if not finished_evt.wait(timeout):
raise DaliugeException("ZMQ subscription not achieved within %d seconds" % (timeout,))
logger.info("Subscribed for events originating from %s", endpoint)

def _zmq_pub_thread(self, sock_created):
def _publish_events(self, sock_created):
import zmq

pub = self._zmqctx.socket(zmq.PUB) # @UndefinedVariable
pub = self._context.socket(zmq.PUB) # @UndefinedVariable
pub.set_hwm(0) # Never drop messages that should be sent
endpoint = "tcp://%s:%d" % (utils.zmq_safe(self._events_host), self._events_port)
pub.bind(endpoint)
logger.info("Listening for events via ZeroMQ on %s", endpoint)
logger.info("Publishing events via ZeroMQ on %s", endpoint)
sock_created.set()

while self._pubsub_running:

try:
obj = self._pubevts.get_nowait()
obj = self._events_out.get_nowait()
except Queue.Empty:
time.sleep(0.01)
continue
Expand All @@ -392,42 +402,58 @@ def _zmq_pub_thread(self, sock_created):
logger.debug("Got an 'Again' when publishing event")
time.sleep(0.01)
continue
pub.close()

def _zmq_sub_queue_thread(self):
def _deliver_events(self):
while self._pubsub_running:
try:
evt = self._recvevts.get_nowait()
evt = self._events_in.get_nowait()
self.deliver_event(evt)
except Queue.Empty:
time.sleep(0.01)

def _zmq_sub_thread(self, sock_created):
def _receive_events(self, sock_created):
import zmq
from zmq.utils.monitor import recv_monitor_message

sub = self._zmqctx.socket(zmq.SUB) # @UndefinedVariable
sub = self._context.socket(zmq.SUB) # @UndefinedVariable
sub.setsockopt(zmq.SUBSCRIBE, six.b('')) # @UndefinedVariable
sub_monitor = sub.get_monitor_socket()
sock_created.set()

pending_connections = {}
while self._pubsub_running:

# A new subscription has been requested
try:
subscription = self._subscriptions.get_nowait()
sub.connect(subscription.endpoint)
subscription.finished_evt.set()
pending_connections[subscription.endpoint] = subscription.finished_evt
except Queue.Empty:
pass

try:
msg = recv_monitor_message(sub_monitor, flags=zmq.NOBLOCK)
if msg['event'] != zmq.EVENT_CONNECTED:
continue
finished_evt = pending_connections.pop(utils.b2s(msg['endpoint']))
finished_evt.set()
except zmq.error.Again:
pass

try:
evt = sub.recv_pyobj(flags = zmq.NOBLOCK) # @UndefinedVariable
self._recvevts.put(evt)
self._events_in.put(evt)
except zmq.error.Again:
time.sleep(0.01)
except Exception:
# Figure out what to do here
logger.exception("Something bad happened in %s:%d to ZMQ :'(", self._events_host, self._events_port)
break

sub_monitor.close()
sub.close()


# So far we currently support ZMQ only for event publishing
EventMixIn = ZMQPubSubMixIn
Expand All @@ -440,7 +466,14 @@ class NodeManager(EventMixIn, RpcMixIn, NodeManagerBase):
def __init__(self, useDLM=True, dlgPath=None, error_listener=None, event_listeners=[], max_threads=0,
host=None, rpc_port=constants.NODE_DEFAULT_RPC_PORT,
events_port=constants.NODE_DEFAULT_EVENTS_PORT):
# We "just know" that our RpcMixIn will have a create_context static
# method, which in reality means we are using the ZeroRPCServer class
self._context = RpcMixIn.create_context()
host = host or '127.0.0.1'
EventMixIn.__init__(self, host, events_port)
RpcMixIn.__init__(self, host, rpc_port)
NodeManagerBase.__init__(self, useDLM, dlgPath, error_listener, event_listeners, max_threads)
NodeManagerBase.__init__(self, useDLM, dlgPath, error_listener, event_listeners, max_threads)

def shutdown(self):
super(NodeManager, self).shutdown()
self._context.term()
Loading

0 comments on commit 21cbacb

Please sign in to comment.