diff --git a/doc/source/configuring.rst b/doc/source/configuring.rst index b9c168d2cd..cf1f40784e 100644 --- a/doc/source/configuring.rst +++ b/doc/source/configuring.rst @@ -670,6 +670,12 @@ Optional. Default: ``/`` Virtual host to use for connection when using ``rabbit`` strategy. +* ``rabbit_notification_exchange`` + +Optional. Default: ``glance`` + +Exchange name to use for connection when using ``rabbit`` strategy. + * ``rabbit_notification_topic`` Optional. Default: ``glance_notifications`` diff --git a/etc/glance-api.conf b/etc/glance-api.conf index 3851d4406e..e1c4d81c6e 100644 --- a/etc/glance-api.conf +++ b/etc/glance-api.conf @@ -84,6 +84,7 @@ rabbit_use_ssl = false rabbit_userid = guest rabbit_password = guest rabbit_virtual_host = / +rabbit_notification_exchange = glance rabbit_notification_topic = glance_notifications # ============ Filesystem Store Options ======================== diff --git a/glance/common/notifier.py b/glance/common/notifier.py index 5d5ed83396..983e65cf4d 100644 --- a/glance/common/notifier.py +++ b/glance/common/notifier.py @@ -16,11 +16,13 @@ # under the License. import datetime +import json import logging import socket import uuid import kombu.connection +import kombu.entity from glance.common import cfg from glance.common import exception @@ -68,6 +70,7 @@ class RabbitStrategy(object): cfg.StrOpt('rabbit_userid', default='guest'), cfg.StrOpt('rabbit_password', default='guest'), cfg.StrOpt('rabbit_virtual_host', default='/'), + cfg.StrOpt('rabbit_notification_exchange', default='glance'), cfg.StrOpt('rabbit_notification_topic', default='glance_notifications') ] @@ -76,20 +79,40 @@ def __init__(self, conf): self._conf = conf self._conf.register_opts(self.opts) + self.topic = self._conf.rabbit_notification_topic + self.connect() + + def connect(self): self.connection = kombu.connection.BrokerConnection( hostname=self._conf.rabbit_host, userid=self._conf.rabbit_userid, password=self._conf.rabbit_password, virtual_host=self._conf.rabbit_virtual_host, ssl=self._conf.rabbit_use_ssl) + self.channel = self.connection.channel() - self.topic = self._conf.rabbit_notification_topic + self.exchange = kombu.entity.Exchange( + channel=self.channel, + type="topic", + name=self._conf.rabbit_notification_exchange) + self.exchange.declare() def _send_message(self, message, priority): - topic = "%s.%s" % (self.topic, priority) - queue = self.connection.SimpleQueue(topic) - queue.put(message, serializer="json") - queue.close() + routing_key = "%s.%s" % (self.topic, priority.lower()) + + # NOTE(jerdfelt): Normally the consumer would create the queue, but + # we do this to ensure that messages don't get dropped if the + # consumer is started after we do + queue = kombu.entity.Queue( + channel=self.channel, + exchange=self.exchange, + durable=True, + name=routing_key, + routing_key=routing_key) + queue.declare() + + msg = self.exchange.Message(json.dumps(message)) + self.exchange.publish(msg, routing_key=routing_key) def warn(self, msg): self._send_message(msg, "WARN") diff --git a/glance/tests/unit/test_notifier.py b/glance/tests/unit/test_notifier.py index a26114568e..824bf288c4 100644 --- a/glance/tests/unit/test_notifier.py +++ b/glance/tests/unit/test_notifier.py @@ -86,6 +86,7 @@ class TestRabbitNotifier(unittest.TestCase): def setUp(self): notifier.RabbitStrategy._send_message = self._send_message + notifier.RabbitStrategy.connect = lambda s: None self.called = False conf = utils.TestConfigOpts({"notifier_strategy": "rabbit"}) self.notifier = notifier.Notifier(conf)