Skip to content

Commit

Permalink
Sync Qpid RPC fix from Oslo
Browse files Browse the repository at this point in the history
Qpid cannot serialize dicts containing strings longer than 65535
characters.  This change syncs the fix from Oslo to Quantum.

Fixes bug 1175808

Change-Id: I48071abffa86e71727deed05aca08ac475cbaf05
  • Loading branch information
Ben Nemec committed Jun 7, 2013
1 parent f83931a commit 68a5a38
Showing 1 changed file with 82 additions and 29 deletions.
111 changes: 82 additions & 29 deletions quantum/openstack/common/rpc/impl_qpid.py
Expand Up @@ -31,6 +31,7 @@
from quantum.openstack.common.rpc import amqp as rpc_amqp
from quantum.openstack.common.rpc import common as rpc_common

qpid_codec = importutils.try_import("qpid.codec010")
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")

Expand Down Expand Up @@ -69,6 +70,8 @@

cfg.CONF.register_opts(qpid_opts)

JSON_CONTENT_TYPE = 'application/json; charset=utf8'


class ConsumerBase(object):
"""Consumer base class."""
Expand Down Expand Up @@ -118,15 +121,32 @@ def __init__(self, session, callback, node_name, node_opts,
self.reconnect(session)

def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect"""
"""Re-declare the receiver after a qpid reconnect."""
self.session = session
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1

def _unpack_json_msg(self, msg):
"""Load the JSON data in msg if msg.content_type indicates that it
is necessary. Put the loaded data back into msg.content and
update msg.content_type appropriately.
A Qpid Message containing a dict will have a content_type of
'amqp/map', whereas one containing a string that needs to be converted
back from JSON will have a content_type of JSON_CONTENT_TYPE.
:param msg: a Qpid Message object
:returns: None
"""
if msg.content_type == JSON_CONTENT_TYPE:
msg.content = jsonutils.loads(msg.content)
msg.content_type = 'amqp/map'

def consume(self):
"""Fetch the message and pass it to the callback object"""
"""Fetch the message and pass it to the callback object."""
message = self.receiver.fetch()
try:
self._unpack_json_msg(message)
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
Expand All @@ -139,7 +159,7 @@ def get_receiver(self):


class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'"""
"""Queue/consumer class for 'direct'."""

def __init__(self, conf, session, msg_id, callback):
"""Init a 'direct' queue.
Expand All @@ -157,7 +177,7 @@ def __init__(self, conf, session, msg_id, callback):


class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
"""Consumer class for 'topic'."""

def __init__(self, conf, session, topic, callback, name=None,
exchange_name=None):
Expand All @@ -177,7 +197,7 @@ def __init__(self, conf, session, topic, callback, name=None,


class FanoutConsumer(ConsumerBase):
"""Consumer class for 'fanout'"""
"""Consumer class for 'fanout'."""

def __init__(self, conf, session, topic, callback):
"""Init a 'fanout' queue.
Expand All @@ -196,7 +216,7 @@ def __init__(self, conf, session, topic, callback):


class Publisher(object):
"""Base Publisher class"""
"""Base Publisher class."""

def __init__(self, session, node_name, node_opts=None):
"""Init the Publisher class with the exchange_name, routing_key,
Expand Down Expand Up @@ -225,24 +245,51 @@ def __init__(self, session, node_name, node_opts=None):
self.reconnect(session)

def reconnect(self, session):
"""Re-establish the Sender after a reconnection"""
"""Re-establish the Sender after a reconnection."""
self.sender = session.sender(self.address)

def _pack_json_msg(self, msg):
"""Qpid cannot serialize dicts containing strings longer than 65535
characters. This function dumps the message content to a JSON
string, which Qpid is able to handle.
:param msg: May be either a Qpid Message object or a bare dict.
:returns: A Qpid Message with its content field JSON encoded.
"""
try:
msg.content = jsonutils.dumps(msg.content)
except AttributeError:
# Need to have a Qpid message so we can set the content_type.
msg = qpid_messaging.Message(jsonutils.dumps(msg))
msg.content_type = JSON_CONTENT_TYPE
return msg

def send(self, msg):
"""Send a message"""
"""Send a message."""
try:
# Check if Qpid can encode the message
check_msg = msg
if not hasattr(check_msg, 'content_type'):
check_msg = qpid_messaging.Message(msg)
content_type = check_msg.content_type
enc, dec = qpid_messaging.message.get_codec(content_type)
enc(check_msg.content)
except qpid_codec.CodecException:
# This means the message couldn't be serialized as a dict.
msg = self._pack_json_msg(msg)
self.sender.send(msg)


class DirectPublisher(Publisher):
"""Publisher class for 'direct'"""
"""Publisher class for 'direct'."""
def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
{"type": "Direct"})


class TopicPublisher(Publisher):
"""Publisher class for 'topic'"""
"""Publisher class for 'topic'."""
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
Expand All @@ -252,7 +299,7 @@ def __init__(self, conf, session, topic):


class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'"""
"""Publisher class for 'fanout'."""
def __init__(self, conf, session, topic):
"""init a 'fanout' publisher.
"""
Expand All @@ -262,7 +309,7 @@ def __init__(self, conf, session, topic):


class NotifyPublisher(Publisher):
"""Publisher class for notifications"""
"""Publisher class for notifications."""
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
Expand Down Expand Up @@ -330,7 +377,7 @@ def _lookup_consumer(self, receiver):
return self.consumers[str(receiver)]

def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues"""
"""Handles reconnecting and re-establishing sessions and queues."""
attempt = 0
delay = 1
while True:
Expand Down Expand Up @@ -381,14 +428,20 @@ def ensure(self, error_callback, method, *args, **kwargs):
self.reconnect()

def close(self):
"""Close/release this connection"""
"""Close/release this connection."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.close()
try:
self.connection.close()
except Exception:
# NOTE(dripton) Logging exceptions that happen during cleanup just
# causes confusion; there's really nothing useful we can do with
# them.
pass
self.connection = None

def reset(self):
"""Reset a connection so it can be used again"""
"""Reset a connection so it can be used again."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.session.close()
Expand All @@ -412,7 +465,7 @@ def _declare_consumer():
return self.ensure(_connect_error, _declare_consumer)

def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers"""
"""Return an iterator that will consume from all queues/consumers."""

def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty):
Expand All @@ -436,7 +489,7 @@ def _consume():
yield self.ensure(_error_callback, _consume)

def cancel_consumer_thread(self):
"""Cancel a consumer thread"""
"""Cancel a consumer thread."""
if self.consumer_thread is not None:
self.consumer_thread.kill()
try:
Expand All @@ -451,7 +504,7 @@ def wait_on_proxy_callbacks(self):
proxy_cb.wait()

def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
"""Send to a publisher based on the publisher class."""

def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
Expand Down Expand Up @@ -481,15 +534,15 @@ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
topic, callback)

def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
"""Create a 'fanout' consumer."""
self.declare_consumer(FanoutConsumer, topic, callback)

def direct_send(self, msg_id, msg):
"""Send a 'direct' message"""
"""Send a 'direct' message."""
self.publisher_send(DirectPublisher, msg_id, msg)

def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
"""Send a 'topic' message."""
#
# We want to create a message with attributes, e.g. a TTL. We
# don't really need to keep 'msg' in its JSON format any longer
Expand All @@ -504,15 +557,15 @@ def topic_send(self, topic, msg, timeout=None):
self.publisher_send(TopicPublisher, topic, qpid_message)

def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
"""Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg)

def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic"""
"""Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic, msg)

def consume(self, limit=None):
"""Consume from all queues/consumers"""
"""Consume from all queues/consumers."""
it = self.iterconsume(limit=limit)
while True:
try:
Expand All @@ -521,7 +574,7 @@ def consume(self, limit=None):
return

def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
"""Consumer from all queues/consumers in a greenthread."""
def _consumer_thread():
try:
self.consume()
Expand All @@ -532,7 +585,7 @@ def _consumer_thread():
return self.consumer_thread

def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
"""Create a consumer that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
Expand All @@ -548,7 +601,7 @@ def create_consumer(self, topic, proxy, fanout=False):
return consumer

def create_worker(self, topic, proxy, pool_name):
"""Create a worker that calls a method in a proxy object"""
"""Create a worker that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
Expand Down Expand Up @@ -591,7 +644,7 @@ def join_consumer_pool(self, callback, pool_name, topic,


def create_connection(conf, new=True):
"""Create a connection"""
"""Create a connection."""
return rpc_amqp.create_connection(
conf, new,
rpc_amqp.get_connection_pool(conf, Connection))
Expand Down

0 comments on commit 68a5a38

Please sign in to comment.