Skip to content
This repository has been archived by the owner on Jun 26, 2020. It is now read-only.

Commit

Permalink
zmq-receiver msg forwarding in greenthreads
Browse files Browse the repository at this point in the history
Because PUSH sockets can block and there
may not be PULL consumers, the oslo-zmq-receiver
process may stall in certain senarios.

Improves error handling in ZmqProxy.

This addresses bug 1097856.

Additionally, this bug seems to improve
(but not necessarily fix) the behavior
reported in bug 1065532.

Change-Id: I6df6035a6676c5bcdddaec7a332ac77e621ba9f3
  • Loading branch information
Eric Windisch committed Jan 11, 2013
1 parent bd1e5a3 commit ab04310
Showing 1 changed file with 48 additions and 20 deletions.
68 changes: 48 additions & 20 deletions openstack/common/rpc/impl_zmq.py
Expand Up @@ -61,6 +61,10 @@
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),

cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
help='Maximum number of ingress messages to locally buffer '
'per topic. Default is unlimited.'),

cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),

Expand Down Expand Up @@ -413,12 +417,6 @@ def __init__(self, conf):
super(ZmqProxy, self).__init__(conf)

self.topic_proxy = {}
ipc_dir = CONF.rpc_zmq_ipc_dir

self.topic_proxy['zmq_replies'] = \
ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])

def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
Expand All @@ -444,20 +442,50 @@ def consume(self, sock):
sock_type = zmq.PUSH

if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
sock_type, bind=True)
self.topic_proxy[topic] = outq
self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic)

# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)

LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
def publisher(waiter):
LOG.info(_("Creating proxy for topic: %s"), topic)

try:
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
(ipc_dir, topic),
sock_type, bind=True)
except RPCException:
waiter.send_exception(*sys.exc_info())
return

self.topic_proxy[topic] = eventlet.queue.LightQueue(
CONF.rpc_zmq_topic_backlog)
self.sockets.append(out_sock)

# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)

waiter.send(True)

while(True):
data = self.topic_proxy[topic].get()
out_sock.send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
{'data': data})

wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation)

try:
wait_sock_creation.wait()
except RPCException:
LOG.error(_("Topic socket file creation failed."))
return

try:
self.topic_proxy[topic].put_nowait(data)
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
{'data': data})
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})


class ZmqReactor(ZmqBaseReactor):
Expand Down

0 comments on commit ab04310

Please sign in to comment.