Skip to content

Commit

Permalink
Properly reconnect subscribing clients when QPID broker restarts
Browse files Browse the repository at this point in the history
This is a cherrypick of oslo-incubator:
e227c0ed7e0ed1f9b8d029336f8aeb60e38c23df

From the oslo-incubator bug:
When the QPID broker is restarted (or fails over), subscribed clients
will attempt to re-establish their connections.  In the case of fanout
subscriptions, this reconnection functionality is broken. For version
1 topologies, the clients attempt to reconnect twice to the same
exclusive address - which is illegal.  In the case of version 2
topologies, the address parsing is broken and an illegal address is
created on reconnect.  This fix avoids the problem by removing the
special-case reconnect code that manages UUID addresses; it is
unnecessary as the QPID broker will generate unique queue names
automatically when the clients reconnect.

Change-Id: I563db129e63817ad55165e318f164f06b141ee33
Closes-Bug: 1251757
  • Loading branch information
Steven Dake committed Dec 5, 2013
1 parent 12fa115 commit 87f22bd
Showing 1 changed file with 3 additions and 17 deletions.
20 changes: 3 additions & 17 deletions heat/openstack/common/rpc/impl_qpid.py
Expand Up @@ -18,7 +18,6 @@
import functools
import itertools
import time
import uuid

import eventlet
import greenlet
Expand Down Expand Up @@ -124,7 +123,6 @@ def __init__(self, conf, session, callback, node_name, node_opts,
},
},
"link": {
"name": link_name,
"durable": True,
"x-declare": {
"durable": False,
Expand All @@ -133,6 +131,8 @@ def __init__(self, conf, session, callback, node_name, node_opts,
},
},
}
if link_name:
addr_opts["link"]["name"] = link_name
addr_opts["node"]["x-declare"].update(node_opts)
elif conf.qpid_topology_version == 2:
addr_opts = {
Expand Down Expand Up @@ -279,30 +279,16 @@ def __init__(self, conf, session, topic, callback):
if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"durable": False, "type": "fanout"}
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
link_name = ""
else:
raise_invalid_topology_version()

super(FanoutConsumer, self).__init__(conf, session, callback,
node_name, node_opts, link_name,
node_name, node_opts, None,
link_opts)

def reconnect(self, session):
topic = self.get_node_name().rpartition('_fanout')[0]
params = {
'session': session,
'topic': topic,
'callback': self.callback,
}

self.__init__(conf=self.conf, **params)

super(FanoutConsumer, self).reconnect(session)


class Publisher(object):
"""Base Publisher class."""
Expand Down

0 comments on commit 87f22bd

Please sign in to comment.