From 1af403ff66026b6616f683c6414851e51619088a Mon Sep 17 00:00:00 2001 From: Flavio Percoco Date: Fri, 6 Dec 2013 11:25:59 +0100 Subject: [PATCH] Sync rpc fix from oslo-incubator Sync the following fixes from oslo-incubator: ef406a2 Create a shared queue for QPID topic consumers e227c0e Properly reconnect subscribing clients when QPID broker restarts Closes-bug: #1251757 Closes-bug: #1257293 Change-Id: I917c7a6e5cb64c6383e74d0fb6e095ad17d4d478 --- nova/openstack/common/rpc/impl_qpid.py | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py index f5af17d6e6c..a403e04a0f2 100644 --- a/nova/openstack/common/rpc/impl_qpid.py +++ b/nova/openstack/common/rpc/impl_qpid.py @@ -18,7 +18,6 @@ import functools import itertools import time -import uuid import eventlet import greenlet @@ -124,7 +123,6 @@ def __init__(self, conf, session, callback, node_name, node_opts, }, }, "link": { - "name": link_name, "durable": True, "x-declare": { "durable": False, @@ -139,6 +137,7 @@ def __init__(self, conf, session, callback, node_name, node_opts, "link": { "x-declare": { "auto-delete": True, + "exclusive": False, }, }, } @@ -146,6 +145,8 @@ def __init__(self, conf, session, callback, node_name, node_opts, raise_invalid_topology_version() addr_opts["link"]["x-declare"].update(link_opts) + if link_name: + addr_opts["link"]["name"] = link_name self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) @@ -220,14 +221,16 @@ def __init__(self, conf, session, msg_id, callback): if conf.qpid_topology_version == 1: node_name = "%s/%s" % (msg_id, msg_id) node_opts = {"type": "direct"} + link_name = msg_id elif conf.qpid_topology_version == 2: node_name = "amq.direct/%s" % msg_id node_opts = {} + link_name = None else: raise_invalid_topology_version() super(DirectConsumer, self).__init__(conf, session, callback, - node_name, node_opts, msg_id, + node_name, node_opts, link_name, link_opts) @@ -279,30 +282,16 @@ def __init__(self, conf, session, topic, callback): if conf.qpid_topology_version == 1: node_name = "%s_fanout" % topic node_opts = {"durable": False, "type": "fanout"} - link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex) elif conf.qpid_topology_version == 2: node_name = "amq.topic/fanout/%s" % topic node_opts = {} - link_name = "" else: raise_invalid_topology_version() super(FanoutConsumer, self).__init__(conf, session, callback, - node_name, node_opts, link_name, + node_name, node_opts, None, link_opts) - def reconnect(self, session): - topic = self.get_node_name().rpartition('_fanout')[0] - params = { - 'session': session, - 'topic': topic, - 'callback': self.callback, - } - - self.__init__(conf=self.conf, **params) - - super(FanoutConsumer, self).reconnect(session) - class Publisher(object): """Base Publisher class."""