From 87f22bd887b73b9d6cb08b98db9d93d067afd137 Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Thu, 5 Dec 2013 11:34:38 -0700 Subject: [PATCH] Properly reconnect subscribing clients when QPID broker restarts This is a cherrypick of oslo-incubator: e227c0ed7e0ed1f9b8d029336f8aeb60e38c23df From the oslo-incubator bug: When the QPID broker is restarted (or fails over), subscribed clients will attempt to re-establish their connections. In the case of fanout subscriptions, this reconnection functionality is broken. For version 1 topologies, the clients attempt to reconnect twice to the same exclusive address - which is illegal. In the case of version 2 topologies, the address parsing is broken and an illegal address is created on reconnect. This fix avoids the problem by removing the special-case reconnect code that manages UUID addresses; it is unnecessary as the QPID broker will generate unique queue names automatically when the clients reconnect. Change-Id: I563db129e63817ad55165e318f164f06b141ee33 Closes-Bug: 1251757 --- heat/openstack/common/rpc/impl_qpid.py | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/heat/openstack/common/rpc/impl_qpid.py b/heat/openstack/common/rpc/impl_qpid.py index 34be1c5a0ba..ad0a4b5a46b 100644 --- a/heat/openstack/common/rpc/impl_qpid.py +++ b/heat/openstack/common/rpc/impl_qpid.py @@ -18,7 +18,6 @@ import functools import itertools import time -import uuid import eventlet import greenlet @@ -124,7 +123,6 @@ def __init__(self, conf, session, callback, node_name, node_opts, }, }, "link": { - "name": link_name, "durable": True, "x-declare": { "durable": False, @@ -133,6 +131,8 @@ def __init__(self, conf, session, callback, node_name, node_opts, }, }, } + if link_name: + addr_opts["link"]["name"] = link_name addr_opts["node"]["x-declare"].update(node_opts) elif conf.qpid_topology_version == 2: addr_opts = { @@ -279,30 +279,16 @@ def __init__(self, conf, session, topic, callback): if conf.qpid_topology_version == 1: node_name = "%s_fanout" % topic node_opts = {"durable": False, "type": "fanout"} - link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex) elif conf.qpid_topology_version == 2: node_name = "amq.topic/fanout/%s" % topic node_opts = {} - link_name = "" else: raise_invalid_topology_version() super(FanoutConsumer, self).__init__(conf, session, callback, - node_name, node_opts, link_name, + node_name, node_opts, None, link_opts) - def reconnect(self, session): - topic = self.get_node_name().rpartition('_fanout')[0] - params = { - 'session': session, - 'topic': topic, - 'callback': self.callback, - } - - self.__init__(conf=self.conf, **params) - - super(FanoutConsumer, self).reconnect(session) - class Publisher(object): """Base Publisher class."""