diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 0fdc1963f..edb665b6b 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -23,6 +23,7 @@ import eventlet import greenlet +import qpid.codec010 as qpid_codec import qpid.messaging import qpid.messaging.exceptions @@ -63,6 +64,8 @@ cfg.CONF.register_opts(qpid_opts) +JSON_CONTENT_TYPE = 'application/json; charset=utf8' + class ConsumerBase(object): """Consumer base class.""" @@ -117,10 +120,27 @@ def reconnect(self, session): self.receiver = session.receiver(self.address) self.receiver.capacity = 1 + def _unpack_json_msg(self, msg): + """Load the JSON data in msg if msg.content_type indicates that it + is necessary. Put the loaded data back into msg.content and + update msg.content_type appropriately. + + A Qpid Message containing a dict will have a content_type of + 'amqp/map', whereas one containing a string that needs to be converted + back from JSON will have a content_type of JSON_CONTENT_TYPE. + + :param msg: a Qpid Message object + :returns: None + """ + if msg.content_type == JSON_CONTENT_TYPE: + msg.content = jsonutils.loads(msg.content) + msg.content_type = 'amqp/map' + def consume(self): """Fetch the message and pass it to the callback object""" message = self.receiver.fetch() try: + self._unpack_json_msg(message) self.callback(message.content) except Exception: LOG.exception(_("Failed to process message... skipping it.")) @@ -220,8 +240,35 @@ def reconnect(self, session): """Re-establish the Sender after a reconnection""" self.sender = session.sender(self.address) + def _pack_json_msg(self, msg): + """Qpid cannot serialize dicts containing strings longer than 65535 + characters. This function dumps the message content to a JSON + string, which Qpid is able to handle. + + :param msg: May be either a Qpid Message object or a bare dict. + :returns: A Qpid Message with its content field JSON encoded. + """ + try: + msg.content = jsonutils.dumps(msg.content) + except AttributeError: + # Need to have a Qpid message so we can set the content_type. + msg = qpid.messaging.Message(jsonutils.dumps(msg)) + msg.content_type = JSON_CONTENT_TYPE + return msg + def send(self, msg): """Send a message""" + try: + # Check if Qpid can encode the message + check_msg = msg + if not hasattr(check_msg, 'content_type'): + check_msg = qpid.messaging.Message(msg) + content_type = check_msg.content_type + enc, dec = qpid.messaging.message.get_codec(content_type) + enc(check_msg.content) + except qpid_codec.CodecException: + # This means the message couldn't be serialized as a dict. + msg = self._pack_json_msg(msg) self.sender.send(msg) diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py index 4e0cc2999..239844a46 100644 --- a/tests/unit/rpc/test_qpid.py +++ b/tests/unit/rpc/test_qpid.py @@ -30,6 +30,7 @@ from openstack.common import cfg from openstack.common import context +from openstack.common import jsonutils from openstack.common.rpc import amqp as rpc_amqp from openstack.common import testutils @@ -366,6 +367,92 @@ def test_call(self): def test_multicall(self): self._test_call(multi=True) + def _test_publisher(self, message=True): + """Test that messages containing long strings are correctly serialized + in a way that Qpid can handle. + + :param message: The publisher may be passed either a Qpid Message + object or a bare dict. This parameter controls which of those the test + will send. + """ + self.sent_msg = None + + def send_stub(msg): + self.sent_msg = msg + + # Qpid cannot serialize a dict containing a string > 65535 chars. + raw_msg = {'test': 'a' * 65536} + if message: + base_msg = qpid.messaging.Message(raw_msg) + else: + base_msg = raw_msg + expected_msg = qpid.messaging.Message(jsonutils.dumps(raw_msg)) + expected_msg.content_type = impl_qpid.JSON_CONTENT_TYPE + mock_session = self.mox.CreateMock(self.orig_session) + mock_sender = self.mox.CreateMock(self.orig_sender) + mock_session.sender(mox.IgnoreArg()).AndReturn(mock_sender) + self.stubs.Set(mock_sender, 'send', send_stub) + self.mox.ReplayAll() + + publisher = impl_qpid.Publisher(mock_session, 'test_node') + publisher.send(base_msg) + + self.assertEqual(self.sent_msg.content, expected_msg.content) + self.assertEqual(self.sent_msg.content_type, expected_msg.content_type) + + @testutils.skip_if(qpid is None, "Test requires qpid") + def test_publisher_long_message(self): + self._test_publisher(message=True) + + @testutils.skip_if(qpid is None, "Test requires qpid") + def test_publisher_long_dict(self): + self._test_publisher(message=False) + + def _test_consumer_long_message(self, json=True): + """Verify that the Qpid implementation correctly deserializes + message content. + + :param json: For compatibility, this code needs to support both + messages that are and are not JSON encoded. This param + specifies which is being tested. + """ + def fake_callback(msg): + self.received_msg = msg + + # The longest string Qpid can handle itself + chars = 65535 + if json: + # The first length that requires JSON encoding + chars = 65536 + raw_msg = {'test': 'a' * chars} + if json: + fake_message = qpid.messaging.Message(jsonutils.dumps(raw_msg)) + fake_message.content_type = impl_qpid.JSON_CONTENT_TYPE + else: + fake_message = qpid.messaging.Message(raw_msg) + mock_session = self.mox.CreateMock(self.orig_session) + mock_receiver = self.mox.CreateMock(self.orig_receiver) + mock_session.receiver(mox.IgnoreArg()).AndReturn(mock_receiver) + mock_receiver.fetch().AndReturn(fake_message) + mock_session.acknowledge(mox.IgnoreArg()) + self.mox.ReplayAll() + + consumer = impl_qpid.DirectConsumer(None, + mock_session, + 'bogus_msg_id', + fake_callback) + consumer.consume() + + self.assertEqual(self.received_msg, raw_msg) + + @testutils.skip_if(qpid is None, "Test requires qpid") + def test_consumer_long_message(self): + self._test_consumer_long_message(json=True) + + @testutils.skip_if(qpid is None, "Test requires qpid") + def test_consumer_long_message_no_json(self): + self._test_consumer_long_message(json=False) + # #from nova.tests.rpc import common