Skip to content
This repository has been archived by the owner on Jun 26, 2020. It is now read-only.

Commit

Permalink
Fix problem with long messages in Qpid
Browse files Browse the repository at this point in the history
Qpid has a limitation where it cannot serialize a dict containing a
string greater than 65535 characters. This change alters the Qpid
implementation to JSON encode the dict before sending it, but only if
Qpid would fail to serialize it. This maintains as much backward
compatibility as possible, though long messages will still fail if they
are sent to an older receiver.

Even though this change will modify the message format, it will only do
it when messages are longer than 65K which would be broken anyway and
could cause serious bugs like the one linked below.

Fixes bug 1215091

(cherry picked from commit 7ce5441)

Conflicts:
	openstack/common/rpc/impl_qpid.py

Change-Id: I2f0e88435748bab631d969573d3a598d9e1f7fef
  • Loading branch information
Ben Nemec authored and Xavier Queralt committed Sep 2, 2013
1 parent 3fed731 commit 478ac3a
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 0 deletions.
47 changes: 47 additions & 0 deletions openstack/common/rpc/impl_qpid.py
Expand Up @@ -23,6 +23,7 @@

import eventlet
import greenlet
import qpid.codec010 as qpid_codec
import qpid.messaging
import qpid.messaging.exceptions

Expand Down Expand Up @@ -63,6 +64,8 @@

cfg.CONF.register_opts(qpid_opts)

JSON_CONTENT_TYPE = 'application/json; charset=utf8'


class ConsumerBase(object):
"""Consumer base class."""
Expand Down Expand Up @@ -117,10 +120,27 @@ def reconnect(self, session):
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1

def _unpack_json_msg(self, msg):
"""Load the JSON data in msg if msg.content_type indicates that it
is necessary. Put the loaded data back into msg.content and
update msg.content_type appropriately.
A Qpid Message containing a dict will have a content_type of
'amqp/map', whereas one containing a string that needs to be converted
back from JSON will have a content_type of JSON_CONTENT_TYPE.
:param msg: a Qpid Message object
:returns: None
"""
if msg.content_type == JSON_CONTENT_TYPE:
msg.content = jsonutils.loads(msg.content)
msg.content_type = 'amqp/map'

def consume(self):
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
try:
self._unpack_json_msg(message)
self.callback(message.content)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
Expand Down Expand Up @@ -220,8 +240,35 @@ def reconnect(self, session):
"""Re-establish the Sender after a reconnection"""
self.sender = session.sender(self.address)

def _pack_json_msg(self, msg):
"""Qpid cannot serialize dicts containing strings longer than 65535
characters. This function dumps the message content to a JSON
string, which Qpid is able to handle.
:param msg: May be either a Qpid Message object or a bare dict.
:returns: A Qpid Message with its content field JSON encoded.
"""
try:
msg.content = jsonutils.dumps(msg.content)
except AttributeError:
# Need to have a Qpid message so we can set the content_type.
msg = qpid.messaging.Message(jsonutils.dumps(msg))
msg.content_type = JSON_CONTENT_TYPE
return msg

def send(self, msg):
"""Send a message"""
try:
# Check if Qpid can encode the message
check_msg = msg
if not hasattr(check_msg, 'content_type'):
check_msg = qpid.messaging.Message(msg)
content_type = check_msg.content_type
enc, dec = qpid.messaging.message.get_codec(content_type)
enc(check_msg.content)
except qpid_codec.CodecException:
# This means the message couldn't be serialized as a dict.
msg = self._pack_json_msg(msg)
self.sender.send(msg)


Expand Down
87 changes: 87 additions & 0 deletions tests/unit/rpc/test_qpid.py
Expand Up @@ -30,6 +30,7 @@

from openstack.common import cfg
from openstack.common import context
from openstack.common import jsonutils
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common import testutils

Expand Down Expand Up @@ -366,6 +367,92 @@ def test_call(self):
def test_multicall(self):
self._test_call(multi=True)

def _test_publisher(self, message=True):
"""Test that messages containing long strings are correctly serialized
in a way that Qpid can handle.
:param message: The publisher may be passed either a Qpid Message
object or a bare dict. This parameter controls which of those the test
will send.
"""
self.sent_msg = None

def send_stub(msg):
self.sent_msg = msg

# Qpid cannot serialize a dict containing a string > 65535 chars.
raw_msg = {'test': 'a' * 65536}
if message:
base_msg = qpid.messaging.Message(raw_msg)
else:
base_msg = raw_msg
expected_msg = qpid.messaging.Message(jsonutils.dumps(raw_msg))
expected_msg.content_type = impl_qpid.JSON_CONTENT_TYPE
mock_session = self.mox.CreateMock(self.orig_session)
mock_sender = self.mox.CreateMock(self.orig_sender)
mock_session.sender(mox.IgnoreArg()).AndReturn(mock_sender)
self.stubs.Set(mock_sender, 'send', send_stub)
self.mox.ReplayAll()

publisher = impl_qpid.Publisher(mock_session, 'test_node')
publisher.send(base_msg)

self.assertEqual(self.sent_msg.content, expected_msg.content)
self.assertEqual(self.sent_msg.content_type, expected_msg.content_type)

@testutils.skip_if(qpid is None, "Test requires qpid")
def test_publisher_long_message(self):
self._test_publisher(message=True)

@testutils.skip_if(qpid is None, "Test requires qpid")
def test_publisher_long_dict(self):
self._test_publisher(message=False)

def _test_consumer_long_message(self, json=True):
"""Verify that the Qpid implementation correctly deserializes
message content.
:param json: For compatibility, this code needs to support both
messages that are and are not JSON encoded. This param
specifies which is being tested.
"""
def fake_callback(msg):
self.received_msg = msg

# The longest string Qpid can handle itself
chars = 65535
if json:
# The first length that requires JSON encoding
chars = 65536
raw_msg = {'test': 'a' * chars}
if json:
fake_message = qpid.messaging.Message(jsonutils.dumps(raw_msg))
fake_message.content_type = impl_qpid.JSON_CONTENT_TYPE
else:
fake_message = qpid.messaging.Message(raw_msg)
mock_session = self.mox.CreateMock(self.orig_session)
mock_receiver = self.mox.CreateMock(self.orig_receiver)
mock_session.receiver(mox.IgnoreArg()).AndReturn(mock_receiver)
mock_receiver.fetch().AndReturn(fake_message)
mock_session.acknowledge(mox.IgnoreArg())
self.mox.ReplayAll()

consumer = impl_qpid.DirectConsumer(None,
mock_session,
'bogus_msg_id',
fake_callback)
consumer.consume()

self.assertEqual(self.received_msg, raw_msg)

@testutils.skip_if(qpid is None, "Test requires qpid")
def test_consumer_long_message(self):
self._test_consumer_long_message(json=True)

@testutils.skip_if(qpid is None, "Test requires qpid")
def test_consumer_long_message_no_json(self):
self._test_consumer_long_message(json=False)


#
#from nova.tests.rpc import common
Expand Down

0 comments on commit 478ac3a

Please sign in to comment.