Skip to content
This repository has been archived by the owner on Jun 26, 2020. It is now read-only.

Commit

Permalink
Support a new qpid topology
Browse files Browse the repository at this point in the history
There has been a bug open for a while pointing out that the way we
create direct exchanges with qpid results in leaking exchanges since
qpid doesn't support auto-deleting exchanges.  This was somewhat
mitigated by change to use a single reply queue.  This meant we created
far fewer direct exchanges, but the problem persists anyway.

A Qpid expert, William Henry, originally proposed a change to address
this issue.  Unfortunately, it wasn't backwards compatible with existing
installations.  This patch takes the same approach, but makes it
optional and off by default.  This will allow a migration period.

As a really nice side effect, the Qpid experts have told us that this
change will also allow us to use Qpid broker federation to provide HA.

DocImpact
Closes-bug: #1178375
Co-authored-by: William Henry <whenry@redhat.com>
Change-Id: I09b8317c0d8a298237beeb3105f2b90cb13933d8
  • Loading branch information
russellb committed Aug 30, 2013
1 parent f08607c commit 76972e2
Show file tree
Hide file tree
Showing 2 changed files with 288 additions and 117 deletions.
227 changes: 160 additions & 67 deletions openstack/common/rpc/impl_qpid.py
Expand Up @@ -67,17 +67,35 @@
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
help='Disable Nagle algorithm'),
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
# this file could probably use some additional refactoring so that the
# differences between each version are split into different classes.
cfg.IntOpt('qpid_topology_version',
default=1,
help="The qpid topology version to use. Version 1 is what "
"was originally used by impl_qpid. Version 2 includes "
"some backwards-incompatible changes that allow broker "
"federation to work. Users should update to version 2 "
"when they are able to take everything down, as it "
"requires a clean break."),
]

cfg.CONF.register_opts(qpid_opts)

JSON_CONTENT_TYPE = 'application/json; charset=utf8'


def raise_invalid_topology_version(conf):
msg = (_("Invalid value for qpid_topology_version: %d") %
conf.qpid_topology_version)
LOG.error(msg)
raise Exception(msg)


class ConsumerBase(object):
"""Consumer base class."""

def __init__(self, session, callback, node_name, node_opts,
def __init__(self, conf, session, callback, node_name, node_opts,
link_name, link_opts):
"""Declare a queue on an amqp session.
Expand All @@ -95,26 +113,38 @@ def __init__(self, session, callback, node_name, node_opts,
self.receiver = None
self.session = None

addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
if conf.qpid_topology_version == 1:
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": True,
"auto-delete": True,
},
},
"link": {
"name": link_name,
"durable": True,
"auto-delete": True,
"x-declare": {
"durable": False,
"auto-delete": True,
"exclusive": False,
},
},
},
"link": {
"name": link_name,
"durable": True,
"x-declare": {
"durable": False,
"auto-delete": True,
"exclusive": False,
}
addr_opts["node"]["x-declare"].update(node_opts)
elif conf.qpid_topology_version == 2:
addr_opts = {
"link": {
"x-declare": {
"auto-delete": True,
},
},
},
}
addr_opts["node"]["x-declare"].update(node_opts)
}
else:
raise_invalid_topology_version()

addr_opts["link"]["x-declare"].update(link_opts)

self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
Expand Down Expand Up @@ -181,16 +211,24 @@ def __init__(self, conf, session, msg_id, callback):
'callback' is the callback to call when messages are received
"""

super(DirectConsumer, self).__init__(
session, callback,
"%s/%s" % (msg_id, msg_id),
{"type": "direct"},
msg_id,
{
"auto-delete": conf.amqp_auto_delete,
"exclusive": True,
"durable": conf.amqp_durable_queues,
})
link_opts = {
"auto-delete": conf.amqp_auto_delete,
"exclusive": True,
"durable": conf.amqp_durable_queues,
}

if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (msg_id, msg_id)
node_opts = {"type": "direct"}
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % msg_id
node_opts = {}
else:
raise_invalid_topology_version()

super(DirectConsumer, self).__init__(conf, session, callback,
node_name, node_opts, msg_id,
link_opts)


class TopicConsumer(ConsumerBase):
Expand All @@ -208,14 +246,20 @@ def __init__(self, conf, session, topic, callback, name=None,
"""

exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(
session, callback,
"%s/%s" % (exchange_name, topic),
{}, name or topic,
{
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
})
link_opts = {
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
}

if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version()

super(TopicConsumer, self).__init__(conf, session, callback, node_name,
{}, name or topic, link_opts)


class FanoutConsumer(ConsumerBase):
Expand All @@ -230,12 +274,22 @@ def __init__(self, conf, session, topic, callback):
"""
self.conf = conf

super(FanoutConsumer, self).__init__(
session, callback,
"%s_fanout" % topic,
{"durable": False, "type": "fanout"},
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True})
link_opts = {"exclusive": True}

if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"durable": False, "type": "fanout"}
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
link_name = ""
else:
raise_invalid_topology_version()

super(FanoutConsumer, self).__init__(conf, session, callback,
node_name, node_opts, link_name,
link_opts)

def reconnect(self, session):
topic = self.get_node_name().rpartition('_fanout')[0]
Expand All @@ -253,29 +307,34 @@ def reconnect(self, session):
class Publisher(object):
"""Base Publisher class."""

def __init__(self, session, node_name, node_opts=None):
def __init__(self, conf, session, node_name, node_opts=None):
"""Init the Publisher class with the exchange_name, routing_key,
and other options
"""
self.sender = None
self.session = session

addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": False,
# auto-delete isn't implemented for exchanges in qpid,
# but put in here anyway
"auto-delete": True,
if conf.qpid_topology_version == 1:
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": False,
# auto-delete isn't implemented for exchanges in qpid,
# but put in here anyway
"auto-delete": True,
},
},
},
}
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)
}
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)

self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
elif conf.qpid_topology_version == 2:
self.address = node_name
else:
raise_invalid_topology_version()

self.reconnect(session)

Expand Down Expand Up @@ -319,8 +378,18 @@ class DirectPublisher(Publisher):
"""Publisher class for 'direct'."""
def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
{"type": "direct"})

if conf.qpid_topology_version == 1:
node_name = msg_id
node_opts = {"type": "direct"}
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % msg_id
node_opts = {}
else:
raise_invalid_topology_version()

super(DirectPublisher, self).__init__(conf, session, node_name,
node_opts)


class TopicPublisher(Publisher):
Expand All @@ -329,18 +398,34 @@ def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(session,
"%s/%s" % (exchange_name, topic))

if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version()

super(TopicPublisher, self).__init__(conf, session, node_name)


class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'."""
def __init__(self, conf, session, topic):
"""init a 'fanout' publisher.
"""
super(FanoutPublisher, self).__init__(
session,
"%s_fanout" % topic, {"type": "fanout"})

if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"type": "fanout"}
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
else:
raise_invalid_topology_version()

super(FanoutPublisher, self).__init__(conf, session, node_name,
node_opts)


class NotifyPublisher(Publisher):
Expand All @@ -349,9 +434,17 @@ def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
exchange_name = rpc_amqp.get_control_exchange(conf)
super(NotifyPublisher, self).__init__(session,
"%s/%s" % (exchange_name, topic),
{"durable": True})
node_opts = {"durable": True}

if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version()

super(NotifyPublisher, self).__init__(conf, session, node_name,
node_opts)


class Connection(object):
Expand Down

0 comments on commit 76972e2

Please sign in to comment.