Skip to content
This repository has been archived by the owner on Jun 26, 2020. It is now read-only.

Commit

Permalink
Support RPC envelopes in impl_zmq
Browse files Browse the repository at this point in the history
This patch began as a set
of tests verifying the functionality of
sending and receiving RPC envelopes when
using impl_zmq. It was discovered that
when enabled, RPC envelopes were not
actually working,

The ZeroMQ driver includes its own envelopes.
This patch introduce versioning to that
envelope, eliminating the previously reserved
'style' field.

A new iteration of the zeromq-envelope is
introduced, 'impl_zmq_v2'. It specifies
that the zeromq-envelope should be followed
by an unpacked array representing key value
pairs of the standard RPC Envelope.

Because the key-values of the RPC Envelope
can be successfully transformed with bytes(),
this prevents the need to double-serialize
the content traversing the message bus.

Also removes some unused imports.

Closes bug 1123709
Closes bug 1055446

Change-Id: Ib04e3d092c9596146f1048d3502ac248496d313b
  • Loading branch information
Eric Windisch committed Feb 19, 2013
1 parent 8316250 commit f1e5d56
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 28 deletions.
93 changes: 67 additions & 26 deletions openstack/common/rpc/impl_zmq.py
Expand Up @@ -216,12 +216,18 @@ def __init__(self, addr, socket_type=None, bind=False):
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)

def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0

if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
self.outq.send(map(bytes,
(msg_id, topic, 'cast', _serialize(data))))
return

rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
self.outq.send(map(bytes,
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))

def close(self):
self.outq.close()
Expand Down Expand Up @@ -320,7 +326,7 @@ def normalize_reply(self, result, replies):
else:
return [result]

def process(self, style, target, proxy, ctx, data):
def process(self, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})

Expand Down Expand Up @@ -432,7 +438,7 @@ def consume(self, sock):

#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
msg_id, topic, style, in_msg = data
topic = data[1]
topic = topic.split('.', 1)[0]

LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
Expand Down Expand Up @@ -520,6 +526,21 @@ def consume_in_thread(self):
super(ZmqProxy, self).consume_in_thread()


def unflatten_envelope(packenv):
"""Unflattens the RPC envelope.
Takes a list and returns a dictionary.
i.e. [1,2,3,4] => {1: 2, 3: 4}
"""
i = iter(packenv)
h = {}
try:
while True:
k = i.next()
h[k] = i.next()
except StopIteration:
return h


class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
Expand All @@ -540,15 +561,27 @@ def consume(self, sock):
self.mapping[sock].send(data)
return

msg_id, topic, style, in_msg = data
proxy = self.proxies[sock]

ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
if data[2] == 'cast': # Legacy protocol
packenv = data[3]

proxy = self.proxies[sock]
ctx, msg = _deserialize(packenv)
request = rpc_common.deserialize_msg(msg)
ctx = RpcContext.unmarshal(ctx)
elif data[2] == 'impl_zmq_v2':
packenv = data[4:]

self.pool.spawn_n(self.process, style, topic,
proxy, ctx, request)
msg = unflatten_envelope(packenv)
request = rpc_common.deserialize_msg(msg)

# Unmarshal only after verifying the message.
ctx = RpcContext.unmarshal(data[3])
else:
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
return

self.pool.spawn_n(self.process, proxy, ctx, request)


class Connection(rpc_common.Connection):
Expand Down Expand Up @@ -593,8 +626,8 @@ def consume_in_thread(self):
self.reactor.consume_in_thread()


def _cast(addr, context, topic, msg, timeout=None, serialize=True,
force_envelope=False, _msg_id=None):
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
_msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]

Expand All @@ -603,7 +636,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)

# assumes cast can't return an exception
conn.cast(_msg_id, topic, payload, serialize, force_envelope)
conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
Expand All @@ -612,7 +645,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,


def _call(addr, context, topic, msg, timeout=None,
serialize=True, force_envelope=False):
envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout

Expand Down Expand Up @@ -647,15 +680,24 @@ def _call(addr, context, topic, msg, timeout=None,
)

LOG.debug(_("Sending cast"))
_cast(addr, context, topic, payload,
serialize=serialize, force_envelope=force_envelope)
_cast(addr, context, topic, payload, envelope)

LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
responses = _deserialize(msg[-1])[-1]['args']['response']

if msg[2] == 'cast': # Legacy version
raw_msg = _deserialize(msg[-1])[-1]
elif msg[2] == 'impl_zmq_v2':
rpc_envelope = unflatten_envelope(msg[4:])
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
else:
raise rpc_common.UnsupportedRpcEnvelopeVersion(
_("Unsupported or unknown ZMQ envelope returned."))

responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
Expand All @@ -676,8 +718,8 @@ def _call(addr, context, topic, msg, timeout=None,
return responses[-1]


def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False, _msg_id=None):
def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
Expand All @@ -703,11 +745,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,

if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
_topic, msg, timeout, serialize,
force_envelope, _msg_id)
_topic, msg, timeout, envelope,
_msg_id)
return
return method(_addr, context, _topic, msg, timeout,
serialize, force_envelope)
envelope)


def create_connection(conf, new=True):
Expand Down Expand Up @@ -746,8 +788,7 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
kwargs['serialize'] = kwargs.pop('envelope')
kwargs['force_envelope'] = True
kwargs['envelope'] = kwargs.get('envelope', True)
cast(conf, context, topic, msg, **kwargs)


Expand Down
12 changes: 10 additions & 2 deletions tests/unit/rpc/test_zmq.py
Expand Up @@ -30,8 +30,7 @@

from openstack.common import exception
from openstack.common.gettextutils import _
from openstack.common import processutils
from openstack.common import rpc
from openstack.common.rpc import common as rpc_common
from tests.unit.rpc import common

try:
Expand Down Expand Up @@ -110,6 +109,15 @@ class RpcZmqBaseTopicTestCase(_RpcZmqBaseTestCase):
pass


class RpcZmqEnvelopeEnabledTestCase(_RpcZmqBaseTestCase):
"""
This sends messages with envelopes enabled.
"""
def setUp(self):
super(RpcZmqEnvelopeEnabledTestCase, self).setUp()
self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True)


class RpcZmqDirectTopicTestCase(_RpcZmqBaseTestCase):
"""
Test communication directly to a host,
Expand Down

0 comments on commit f1e5d56

Please sign in to comment.