Skip to content

Commit

Permalink
Update rpc/impl_qpid.py from oslo
Browse files Browse the repository at this point in the history
The current qpid driver cannot serialize objects containing strings
longer than 65535 characters.  This just became a breaking issue when
the message to scheduler_run_instance went over that limit.  The fix has
been commited to oslo, so this just syncs it over to Nova.

Bug 1175808
Bug 1187595

Change-Id: If95c11a7e03c81d89133f6cad0dcbb6d8acb8148
  • Loading branch information
Andrew Laski committed Jun 5, 2013
1 parent 17cbb83 commit 781a8f9
Showing 1 changed file with 82 additions and 29 deletions.
111 changes: 82 additions & 29 deletions nova/openstack/common/rpc/impl_qpid.py
Expand Up @@ -31,6 +31,7 @@
from nova.openstack.common.rpc import amqp as rpc_amqp
from nova.openstack.common.rpc import common as rpc_common

qpid_codec = importutils.try_import("qpid.codec010")
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")

Expand Down Expand Up @@ -69,6 +70,8 @@

cfg.CONF.register_opts(qpid_opts)

JSON_CONTENT_TYPE = 'application/json; charset=utf8'


class ConsumerBase(object):
"""Consumer base class."""
Expand Down Expand Up @@ -118,15 +121,32 @@ def __init__(self, session, callback, node_name, node_opts,
self.reconnect(session)

def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect"""
"""Re-declare the receiver after a qpid reconnect."""
self.session = session
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1

def _unpack_json_msg(self, msg):
"""Load the JSON data in msg if msg.content_type indicates that it
is necessary. Put the loaded data back into msg.content and
update msg.content_type appropriately.
A Qpid Message containing a dict will have a content_type of
'amqp/map', whereas one containing a string that needs to be converted
back from JSON will have a content_type of JSON_CONTENT_TYPE.
:param msg: a Qpid Message object
:returns: None
"""
if msg.content_type == JSON_CONTENT_TYPE:
msg.content = jsonutils.loads(msg.content)
msg.content_type = 'amqp/map'

def consume(self):
"""Fetch the message and pass it to the callback object"""
"""Fetch the message and pass it to the callback object."""
message = self.receiver.fetch()
try:
self._unpack_json_msg(message)
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
Expand All @@ -139,7 +159,7 @@ def get_receiver(self):


class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'"""
"""Queue/consumer class for 'direct'."""

def __init__(self, conf, session, msg_id, callback):
"""Init a 'direct' queue.
Expand All @@ -157,7 +177,7 @@ def __init__(self, conf, session, msg_id, callback):


class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
"""Consumer class for 'topic'."""

def __init__(self, conf, session, topic, callback, name=None,
exchange_name=None):
Expand All @@ -177,7 +197,7 @@ def __init__(self, conf, session, topic, callback, name=None,


class FanoutConsumer(ConsumerBase):
"""Consumer class for 'fanout'"""
"""Consumer class for 'fanout'."""

def __init__(self, conf, session, topic, callback):
"""Init a 'fanout' queue.
Expand All @@ -196,7 +216,7 @@ def __init__(self, conf, session, topic, callback):


class Publisher(object):
"""Base Publisher class"""
"""Base Publisher class."""

def __init__(self, session, node_name, node_opts=None):
"""Init the Publisher class with the exchange_name, routing_key,
Expand Down Expand Up @@ -225,24 +245,51 @@ def __init__(self, session, node_name, node_opts=None):
self.reconnect(session)

def reconnect(self, session):
"""Re-establish the Sender after a reconnection"""
"""Re-establish the Sender after a reconnection."""
self.sender = session.sender(self.address)

def _pack_json_msg(self, msg):
"""Qpid cannot serialize dicts containing strings longer than 65535
characters. This function dumps the message content to a JSON
string, which Qpid is able to handle.
:param msg: May be either a Qpid Message object or a bare dict.
:returns: A Qpid Message with its content field JSON encoded.
"""
try:
msg.content = jsonutils.dumps(msg.content)
except AttributeError:
# Need to have a Qpid message so we can set the content_type.
msg = qpid_messaging.Message(jsonutils.dumps(msg))
msg.content_type = JSON_CONTENT_TYPE
return msg

def send(self, msg):
"""Send a message"""
"""Send a message."""
try:
# Check if Qpid can encode the message
check_msg = msg
if not hasattr(check_msg, 'content_type'):
check_msg = qpid_messaging.Message(msg)
content_type = check_msg.content_type
enc, dec = qpid_messaging.message.get_codec(content_type)
enc(check_msg.content)
except qpid_codec.CodecException:
# This means the message couldn't be serialized as a dict.
msg = self._pack_json_msg(msg)
self.sender.send(msg)


class DirectPublisher(Publisher):
"""Publisher class for 'direct'"""
"""Publisher class for 'direct'."""
def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
{"type": "Direct"})


class TopicPublisher(Publisher):
"""Publisher class for 'topic'"""
"""Publisher class for 'topic'."""
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
Expand All @@ -252,7 +299,7 @@ def __init__(self, conf, session, topic):


class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'"""
"""Publisher class for 'fanout'."""
def __init__(self, conf, session, topic):
"""init a 'fanout' publisher.
"""
Expand All @@ -262,7 +309,7 @@ def __init__(self, conf, session, topic):


class NotifyPublisher(Publisher):
"""Publisher class for notifications"""
"""Publisher class for notifications."""
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
Expand Down Expand Up @@ -330,7 +377,7 @@ def _lookup_consumer(self, receiver):
return self.consumers[str(receiver)]

def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues"""
"""Handles reconnecting and re-establishing sessions and queues."""
attempt = 0
delay = 1
while True:
Expand Down Expand Up @@ -381,14 +428,20 @@ def ensure(self, error_callback, method, *args, **kwargs):
self.reconnect()

def close(self):
"""Close/release this connection"""
"""Close/release this connection."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.close()
try:
self.connection.close()
except Exception:
# NOTE(dripton) Logging exceptions that happen during cleanup just
# causes confusion; there's really nothing useful we can do with
# them.
pass
self.connection = None

def reset(self):
"""Reset a connection so it can be used again"""
"""Reset a connection so it can be used again."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.session.close()
Expand All @@ -412,7 +465,7 @@ def _declare_consumer():
return self.ensure(_connect_error, _declare_consumer)

def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers"""
"""Return an iterator that will consume from all queues/consumers."""

def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty):
Expand All @@ -436,7 +489,7 @@ def _consume():
yield self.ensure(_error_callback, _consume)

def cancel_consumer_thread(self):
"""Cancel a consumer thread"""
"""Cancel a consumer thread."""
if self.consumer_thread is not None:
self.consumer_thread.kill()
try:
Expand All @@ -451,7 +504,7 @@ def wait_on_proxy_callbacks(self):
proxy_cb.wait()

def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
"""Send to a publisher based on the publisher class."""

def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
Expand Down Expand Up @@ -481,15 +534,15 @@ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
topic, callback)

def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
"""Create a 'fanout' consumer."""
self.declare_consumer(FanoutConsumer, topic, callback)

def direct_send(self, msg_id, msg):
"""Send a 'direct' message"""
"""Send a 'direct' message."""
self.publisher_send(DirectPublisher, msg_id, msg)

def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
"""Send a 'topic' message."""
#
# We want to create a message with attributes, e.g. a TTL. We
# don't really need to keep 'msg' in its JSON format any longer
Expand All @@ -504,15 +557,15 @@ def topic_send(self, topic, msg, timeout=None):
self.publisher_send(TopicPublisher, topic, qpid_message)

def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
"""Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg)

def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic"""
"""Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic, msg)

def consume(self, limit=None):
"""Consume from all queues/consumers"""
"""Consume from all queues/consumers."""
it = self.iterconsume(limit=limit)
while True:
try:
Expand All @@ -521,7 +574,7 @@ def consume(self, limit=None):
return

def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
"""Consumer from all queues/consumers in a greenthread."""
def _consumer_thread():
try:
self.consume()
Expand All @@ -532,7 +585,7 @@ def _consumer_thread():
return self.consumer_thread

def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
"""Create a consumer that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
Expand All @@ -548,7 +601,7 @@ def create_consumer(self, topic, proxy, fanout=False):
return consumer

def create_worker(self, topic, proxy, pool_name):
"""Create a worker that calls a method in a proxy object"""
"""Create a worker that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
Expand Down Expand Up @@ -591,7 +644,7 @@ def join_consumer_pool(self, callback, pool_name, topic,


def create_connection(conf, new=True):
"""Create a connection"""
"""Create a connection."""
return rpc_amqp.create_connection(
conf, new,
rpc_amqp.get_connection_pool(conf, Connection))
Expand Down

0 comments on commit 781a8f9

Please sign in to comment.