Skip to content

Commit

Permalink
Properly reconnect subscribing clients when QPID broker restarts
Browse files Browse the repository at this point in the history
When the QPID broker is restarted (or fails over), subscribed clients
will attempt to re-establish their connections.  In the case of fanout
subscriptions, this reconnection functionality is broken. For version
1 topologies, the clients attempt to reconnect twice to the same
exclusive address - which is illegal.  In the case of version 2
topologies, the address parsing is broken and an illegal address is
created on reconnect.  This fix avoids the problem by removing the
special-case reconnect code that manages UUID addresses; it is
unnecessary as the QPID broker will generate unique queue names
automatically when the clients reconnect.

Closes-bug: #1251757
Change-Id: I6051fb503663bb8c7c5468db6bcde10f6cf1b318
  • Loading branch information
kgiusti committed Dec 2, 2013
1 parent 86b0750 commit ffa5c07
Showing 1 changed file with 3 additions and 17 deletions.
20 changes: 3 additions & 17 deletions oslo/messaging/_drivers/impl_qpid.py
Expand Up @@ -19,7 +19,6 @@
import itertools
import logging
import time
import uuid

import eventlet
import greenlet
Expand Down Expand Up @@ -125,7 +124,6 @@ def __init__(self, conf, session, callback, node_name, node_opts,
},
},
"link": {
"name": link_name,
"durable": True,
"x-declare": {
"durable": False,
Expand All @@ -134,6 +132,8 @@ def __init__(self, conf, session, callback, node_name, node_opts,
},
},
}
if link_name:
addr_opts["link"]["name"] = link_name
addr_opts["node"]["x-declare"].update(node_opts)
elif conf.qpid_topology_version == 2:
addr_opts = {
Expand Down Expand Up @@ -280,30 +280,16 @@ def __init__(self, conf, session, topic, callback):
if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"durable": False, "type": "fanout"}
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
link_name = ""
else:
raise_invalid_topology_version(conf)

super(FanoutConsumer, self).__init__(conf, session, callback,
node_name, node_opts, link_name,
node_name, node_opts, None,
link_opts)

def reconnect(self, session):
topic = self.get_node_name().rpartition('_fanout')[0]
params = {
'session': session,
'topic': topic,
'callback': self.callback,
}

self.__init__(conf=self.conf, **params)

super(FanoutConsumer, self).reconnect(session)


class Publisher(object):
"""Base Publisher class."""
Expand Down

0 comments on commit ffa5c07

Please sign in to comment.