Navigation Menu

Skip to content

Commit

Permalink
Sync rpc fix from oslo-incubator
Browse files Browse the repository at this point in the history
Sync the following fixes from oslo-incubator:

ef406a2 Create a shared queue for QPID topic consumers
e227c0e Properly reconnect subscribing clients when QPID broker restarts

Closes-bug: #1251757
Closes-bug: #1257293

Change-Id: I917c7a6e5cb64c6383e74d0fb6e095ad17d4d478
  • Loading branch information
flaper87 committed Dec 6, 2013
1 parent 164d0b0 commit 1af403f
Showing 1 changed file with 7 additions and 18 deletions.
25 changes: 7 additions & 18 deletions nova/openstack/common/rpc/impl_qpid.py
Expand Up @@ -18,7 +18,6 @@
import functools
import itertools
import time
import uuid

import eventlet
import greenlet
Expand Down Expand Up @@ -124,7 +123,6 @@ def __init__(self, conf, session, callback, node_name, node_opts,
},
},
"link": {
"name": link_name,
"durable": True,
"x-declare": {
"durable": False,
Expand All @@ -139,13 +137,16 @@ def __init__(self, conf, session, callback, node_name, node_opts,
"link": {
"x-declare": {
"auto-delete": True,
"exclusive": False,
},
},
}
else:
raise_invalid_topology_version()

addr_opts["link"]["x-declare"].update(link_opts)
if link_name:
addr_opts["link"]["name"] = link_name

self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))

Expand Down Expand Up @@ -220,14 +221,16 @@ def __init__(self, conf, session, msg_id, callback):
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (msg_id, msg_id)
node_opts = {"type": "direct"}
link_name = msg_id
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % msg_id
node_opts = {}
link_name = None
else:
raise_invalid_topology_version()

super(DirectConsumer, self).__init__(conf, session, callback,
node_name, node_opts, msg_id,
node_name, node_opts, link_name,
link_opts)


Expand Down Expand Up @@ -279,30 +282,16 @@ def __init__(self, conf, session, topic, callback):
if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"durable": False, "type": "fanout"}
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
link_name = ""
else:
raise_invalid_topology_version()

super(FanoutConsumer, self).__init__(conf, session, callback,
node_name, node_opts, link_name,
node_name, node_opts, None,
link_opts)

def reconnect(self, session):
topic = self.get_node_name().rpartition('_fanout')[0]
params = {
'session': session,
'topic': topic,
'callback': self.callback,
}

self.__init__(conf=self.conf, **params)

super(FanoutConsumer, self).reconnect(session)


class Publisher(object):
"""Base Publisher class."""
Expand Down

0 comments on commit 1af403f

Please sign in to comment.