diff --git a/cinder/openstack/common/rpc/amqp.py b/cinder/openstack/common/rpc/amqp.py index 9addfa1c763..587d0f91e40 100644 --- a/cinder/openstack/common/rpc/amqp.py +++ b/cinder/openstack/common/rpc/amqp.py @@ -46,12 +46,20 @@ from cinder.openstack.common.rpc import common as rpc_common -# TODO(pekowski): Remove this option in Havana. amqp_opts = [ + # TODO(pekowski): Remove this option in Havana. cfg.BoolOpt('amqp_rpc_single_reply_queue', default=False, help='Enable a fast single reply queue if using AMQP based ' 'RPC like RabbitMQ or Qpid.'), + cfg.BoolOpt('amqp_durable_queues', + default=False, + deprecated_name='rabbit_durable_queues', + deprecated_group='DEFAULT', + help='Use durable queues in amqp.'), + cfg.BoolOpt('amqp_auto_delete', + default=False, + help='Auto-delete queues in amqp.'), ] cfg.CONF.register_opts(amqp_opts) diff --git a/cinder/openstack/common/rpc/impl_kombu.py b/cinder/openstack/common/rpc/impl_kombu.py index 681f531843d..424a61c0a1c 100644 --- a/cinder/openstack/common/rpc/impl_kombu.py +++ b/cinder/openstack/common/rpc/impl_kombu.py @@ -82,9 +82,6 @@ default=0, help='maximum retries with trying to connect to RabbitMQ ' '(the default of 0 implies an infinite retry count)'), - cfg.BoolOpt('rabbit_durable_queues', - default=False, - help='use durable queues in RabbitMQ'), cfg.BoolOpt('rabbit_ha_queues', default=False, help='use H/A queues in RabbitMQ (x-ha-policy: all).' @@ -233,9 +230,9 @@ def __init__(self, conf, channel, topic, callback, tag, name=None, Other kombu options may be passed as keyword arguments """ # Default options - options = {'durable': conf.rabbit_durable_queues, + options = {'durable': conf.amqp_durable_queues, 'queue_arguments': _get_queue_arguments(conf), - 'auto_delete': False, + 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} options.update(kwargs) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) @@ -339,8 +336,8 @@ def __init__(self, conf, channel, topic, **kwargs): Kombu options may be passed as keyword args to override defaults """ - options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, + options = {'durable': conf.amqp_durable_queues, + 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} options.update(kwargs) exchange_name = rpc_amqp.get_control_exchange(conf) @@ -370,7 +367,7 @@ class NotifyPublisher(TopicPublisher): """Publisher class for 'notify'""" def __init__(self, conf, channel, topic, **kwargs): - self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.durable = kwargs.pop('durable', conf.amqp_durable_queues) self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) diff --git a/cinder/openstack/common/rpc/impl_qpid.py b/cinder/openstack/common/rpc/impl_qpid.py index 8f52fb84f79..1137278d8df 100644 --- a/cinder/openstack/common/rpc/impl_qpid.py +++ b/cinder/openstack/common/rpc/impl_qpid.py @@ -65,15 +65,33 @@ cfg.BoolOpt('qpid_tcp_nodelay', default=True, help='Disable Nagle algorithm'), + # NOTE(russellb) If any additional versions are added (beyond 1 and 2), + # this file could probably use some additional refactoring so that the + # differences between each version are split into different classes. + cfg.IntOpt('qpid_topology_version', + default=1, + help="The qpid topology version to use. Version 1 is what " + "was originally used by impl_qpid. Version 2 includes " + "some backwards-incompatible changes that allow broker " + "federation to work. Users should update to version 2 " + "when they are able to take everything down, as it " + "requires a clean break."), ] cfg.CONF.register_opts(qpid_opts) +def raise_invalid_topology_version(conf): + msg = (_("Invalid value for qpid_topology_version: %d") % + conf.qpid_topology_version) + LOG.error(msg) + raise Exception(msg) + + class ConsumerBase(object): """Consumer base class.""" - def __init__(self, session, callback, node_name, node_opts, + def __init__(self, conf, session, callback, node_name, node_opts, link_name, link_opts): """Declare a queue on an amqp session. @@ -91,26 +109,38 @@ def __init__(self, session, callback, node_name, node_opts, self.receiver = None self.session = None - addr_opts = { - "create": "always", - "node": { - "type": "topic", - "x-declare": { + if conf.qpid_topology_version == 1: + addr_opts = { + "create": "always", + "node": { + "type": "topic", + "x-declare": { + "durable": True, + "auto-delete": True, + }, + }, + "link": { + "name": link_name, "durable": True, - "auto-delete": True, + "x-declare": { + "durable": False, + "auto-delete": True, + "exclusive": False, + }, }, - }, - "link": { - "name": link_name, - "durable": True, - "x-declare": { - "durable": False, - "auto-delete": True, - "exclusive": False, + } + addr_opts["node"]["x-declare"].update(node_opts) + elif conf.qpid_topology_version == 2: + addr_opts = { + "link": { + "x-declare": { + "auto-delete": True, + }, }, - }, - } - addr_opts["node"]["x-declare"].update(node_opts) + } + else: + raise_invalid_topology_version() + addr_opts["link"]["x-declare"].update(link_opts) self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) @@ -149,11 +179,24 @@ def __init__(self, conf, session, msg_id, callback): 'callback' is the callback to call when messages are received """ - super(DirectConsumer, self).__init__(session, callback, - "%s/%s" % (msg_id, msg_id), - {"type": "direct"}, - msg_id, - {"exclusive": True}) + link_opts = { + "auto-delete": conf.amqp_auto_delete, + "exclusive": True, + "durable": conf.amqp_durable_queues, + } + + if conf.qpid_topology_version == 1: + node_name = "%s/%s" % (msg_id, msg_id) + node_opts = {"type": "direct"} + elif conf.qpid_topology_version == 2: + node_name = "amq.direct/%s" % msg_id + node_opts = {} + else: + raise_invalid_topology_version() + + super(DirectConsumer, self).__init__(conf, session, callback, + node_name, node_opts, msg_id, + link_opts) class TopicConsumer(ConsumerBase): @@ -171,9 +214,20 @@ def __init__(self, conf, session, topic, callback, name=None, """ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) - super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (exchange_name, topic), - {}, name or topic, {}) + link_opts = { + "auto-delete": conf.amqp_auto_delete, + "durable": conf.amqp_durable_queues, + } + + if conf.qpid_topology_version == 1: + node_name = "%s/%s" % (exchange_name, topic) + elif conf.qpid_topology_version == 2: + node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) + else: + raise_invalid_topology_version() + + super(TopicConsumer, self).__init__(conf, session, callback, node_name, + {}, name or topic, link_opts) class FanoutConsumer(ConsumerBase): @@ -187,40 +241,55 @@ def __init__(self, conf, session, topic, callback): 'callback' is the callback to call when messages are received """ - super(FanoutConsumer, self).__init__( - session, callback, - "%s_fanout" % topic, - {"durable": False, "type": "fanout"}, - "%s_fanout_%s" % (topic, uuid.uuid4().hex), - {"exclusive": True}) + link_opts = {"exclusive": True} + + if conf.qpid_topology_version == 1: + node_name = "%s_fanout" % topic + node_opts = {"durable": False, "type": "fanout"} + link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex) + elif conf.qpid_topology_version == 2: + node_name = "amq.topic/fanout/%s" % topic + node_opts = {} + link_name = "" + else: + raise_invalid_topology_version() + + super(FanoutConsumer, self).__init__(conf, session, callback, + node_name, node_opts, link_name, + link_opts) class Publisher(object): """Base Publisher class""" - def __init__(self, session, node_name, node_opts=None): + def __init__(self, conf, session, node_name, node_opts=None): """Init the Publisher class with the exchange_name, routing_key, and other options """ self.sender = None self.session = session - addr_opts = { - "create": "always", - "node": { - "type": "topic", - "x-declare": { - "durable": False, - # auto-delete isn't implemented for exchanges in qpid, - # but put in here anyway - "auto-delete": True, + if conf.qpid_topology_version == 1: + addr_opts = { + "create": "always", + "node": { + "type": "topic", + "x-declare": { + "durable": False, + # auto-delete isn't implemented for exchanges in qpid, + # but put in here anyway + "auto-delete": True, + }, }, - }, - } - if node_opts: - addr_opts["node"]["x-declare"].update(node_opts) + } + if node_opts: + addr_opts["node"]["x-declare"].update(node_opts) - self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) + self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) + elif conf.qpid_topology_version == 2: + self.address = node_name + else: + raise_invalid_topology_version() self.reconnect(session) @@ -237,8 +306,17 @@ class DirectPublisher(Publisher): """Publisher class for 'direct'""" def __init__(self, conf, session, msg_id): """Init a 'direct' publisher.""" - super(DirectPublisher, self).__init__(session, msg_id, - {"type": "Direct"}) + if conf.qpid_topology_version == 1: + node_name = msg_id + node_opts = {"type": "direct"} + elif conf.qpid_topology_version == 2: + node_name = "amq.direct/%s" % msg_id + node_opts = {} + else: + raise_invalid_topology_version() + + super(DirectPublisher, self).__init__(conf, session, node_name, + node_opts) class TopicPublisher(Publisher): @@ -247,8 +325,15 @@ def __init__(self, conf, session, topic): """init a 'topic' publisher. """ exchange_name = rpc_amqp.get_control_exchange(conf) - super(TopicPublisher, self).__init__(session, - "%s/%s" % (exchange_name, topic)) + + if conf.qpid_topology_version == 1: + node_name = "%s/%s" % (exchange_name, topic) + elif conf.qpid_topology_version == 2: + node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) + else: + raise_invalid_topology_version() + + super(TopicPublisher, self).__init__(conf, session, node_name) class FanoutPublisher(Publisher): @@ -256,9 +341,18 @@ class FanoutPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'fanout' publisher. """ - super(FanoutPublisher, self).__init__( - session, - "%s_fanout" % topic, {"type": "fanout"}) + + if conf.qpid_topology_version == 1: + node_name = "%s_fanout" % topic + node_opts = {"type": "fanout"} + elif conf.qpid_topology_version == 2: + node_name = "amq.topic/fanout/%s" % topic + node_opts = {} + else: + raise_invalid_topology_version() + + super(FanoutPublisher, self).__init__(conf, session, node_name, + node_opts) class NotifyPublisher(Publisher): @@ -267,9 +361,17 @@ def __init__(self, conf, session, topic): """init a 'topic' publisher. """ exchange_name = rpc_amqp.get_control_exchange(conf) - super(NotifyPublisher, self).__init__(session, - "%s/%s" % (exchange_name, topic), - {"durable": True}) + node_opts = {"durable": True} + + if conf.qpid_topology_version == 1: + node_name = "%s/%s" % (exchange_name, topic) + elif conf.qpid_topology_version == 2: + node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) + else: + raise_invalid_topology_version() + + super(NotifyPublisher, self).__init__(conf, session, node_name, + node_opts) class Connection(object): diff --git a/etc/cinder/cinder.conf.sample b/etc/cinder/cinder.conf.sample index 8ce6c2555bf..7b39e2c9617 100644 --- a/etc/cinder/cinder.conf.sample +++ b/etc/cinder/cinder.conf.sample @@ -776,6 +776,12 @@ # like RabbitMQ or Qpid. (boolean value) #amqp_rpc_single_reply_queue=false +# Use durable queues in amqp. (boolean value) +#amqp_durable_queues=false + +# Auto-delete queues in amqp. (boolean value) +#amqp_auto_delete=false + # # Options defined in cinder.openstack.common.rpc.impl_kombu @@ -831,9 +837,6 @@ # value) #rabbit_max_retries=0 -# use durable queues in RabbitMQ (boolean value) -#rabbit_durable_queues=false - # use H/A queues in RabbitMQ (x-ha-policy: all).You need to # wipe RabbitMQ database when changing this option. (boolean # value) @@ -873,6 +876,14 @@ # Disable Nagle algorithm (boolean value) #qpid_tcp_nodelay=true +# The qpid topology version to use. Version 1 is what was +# originally used by impl_qpid. Version 2 includes some +# backwards-incompatible changes that allow broker federation +# to work. Users should update to version 2 when they are +# able to take everything down, as it requires a clean break. +# (integer value) +#qpid_topology_version=1 + # # Options defined in cinder.openstack.common.rpc.impl_zmq @@ -1651,4 +1662,4 @@ #volume_dd_blocksize=1M -# Total option count: 353 +# Total option count: 355