Skip to content

Commit

Permalink
Fix RPC responses to allow None response correctly.
Browse files Browse the repository at this point in the history
Fixes bug 897155

Cherry picked from 84693b4

Also adds a new fake rpc implementation that tests use by default.
This speeds up the test run by ~10% on my system.  We can decide to
ditch fake_rabbit at some point later..

Change-Id: I8877fad3d41ae055c15b1adff99e535c34e9ce92
  • Loading branch information
comstud committed Dec 9, 2011
1 parent 8992773 commit 475f362
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 48 deletions.
88 changes: 47 additions & 41 deletions nova/rpc/impl_carrot.py
Expand Up @@ -266,15 +266,14 @@ def process_data(self, message_data, message):
# we just log the message and send an error string
# back to the caller
LOG.warn(_('no method for message: %s') % message_data)
if msg_id:
msg_reply(msg_id,
_('No method for message: %s') % message_data)
ctxt.reply(msg_id,
_('No method for message: %s') % message_data)
return
self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
self.pool.spawn_n(self._process_data, ctxt, method, args)

@exception.wrap_exception()
def _process_data(self, msg_id, ctxt, method, args):
"""Thread that maigcally looks for a method on the proxy
def _process_data(self, ctxt, method, args):
"""Thread that magically looks for a method on the proxy
object and calls it.
"""

Expand All @@ -283,23 +282,18 @@ def _process_data(self, msg_id, ctxt, method, args):
# NOTE(vish): magic is fun!
try:
rval = node_func(context=ctxt, **node_args)
if msg_id:
# Check if the result was a generator
if isinstance(rval, types.GeneratorType):
for x in rval:
msg_reply(msg_id, x, None)
else:
msg_reply(msg_id, rval, None)

# This final None tells multicall that it is done.
msg_reply(msg_id, None, None)
elif isinstance(rval, types.GeneratorType):
# NOTE(vish): this iterates through the generator
list(rval)
# Check if the result was a generator
if isinstance(rval, types.GeneratorType):
for x in rval:
ctxt.reply(x, None)
else:
ctxt.reply(rval, None)

# This final None tells multicall that it is done.
ctxt.reply(ending=True)
except Exception as e:
LOG.exception('Exception during message handling')
if msg_id:
msg_reply(msg_id, None, sys.exc_info())
ctxt.reply(None, sys.exc_info())
return


Expand Down Expand Up @@ -447,7 +441,7 @@ def __init__(self, connection=None, msg_id=None):
super(DirectPublisher, self).__init__(connection=connection)


def msg_reply(msg_id, reply=None, failure=None):
def msg_reply(msg_id, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
Expand All @@ -463,12 +457,17 @@ def msg_reply(msg_id, reply=None, failure=None):
with ConnectionPool.item() as conn:
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
msg = {'result': reply, 'failure': failure}
if ending:
msg['ending'] = True
publisher.send(msg)
except TypeError:
publisher.send(
{'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure})
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending:
msg['ending'] = True
publisher.send(msg)

publisher.close()

Expand Down Expand Up @@ -508,8 +507,11 @@ def __init__(self, *args, **kwargs):
self.msg_id = msg_id
super(RpcContext, self).__init__(*args, **kwargs)

def reply(self, *args, **kwargs):
msg_reply(self.msg_id, *args, **kwargs)
def reply(self, reply=None, failure=None, ending=False):
if self.msg_id:
msg_reply(self.msg_id, reply, failure, ending)
if ending:
self.msg_id = None


def multicall(context, topic, msg):
Expand Down Expand Up @@ -537,8 +539,11 @@ def __init__(self, consumer):
self._consumer = consumer
self._results = queue.Queue()
self._closed = False
self._got_ending = False

def close(self):
if self._closed:
return
self._closed = True
self._consumer.close()
ConnectionPool.put(self._consumer.connection)
Expand All @@ -548,30 +553,31 @@ def __call__(self, data, message):
message.ack()
if data['failure']:
self._results.put(RemoteError(*data['failure']))
elif data.get('ending', False):
self._got_ending = True
else:
self._results.put(data['result'])

def __iter__(self):
return self.wait()

def wait(self):
while True:
rv = None
while rv is None and not self._closed:
try:
rv = self._consumer.fetch(enable_callbacks=True)
except Exception:
self.close()
raise
while not self._closed:
try:
rv = self._consumer.fetch(enable_callbacks=True)
except Exception:
self.close()
raise
if rv is None:
time.sleep(0.01)

continue
if self._got_ending:
self.close()
raise StopIteration
result = self._results.get()
if isinstance(result, Exception):
self.close()
raise result
if result == None:
self.close()
raise StopIteration
yield result


Expand Down
23 changes: 16 additions & 7 deletions nova/rpc/impl_kombu.py
Expand Up @@ -625,7 +625,7 @@ def _process_data(self, ctxt, method, args):
else:
ctxt.reply(rval, None)
# This final None tells multicall that it is done.
ctxt.reply(None, None)
ctxt.reply(ending=True)
except Exception as e:
LOG.exception('Exception during message handling')
ctxt.reply(None, sys.exc_info())
Expand Down Expand Up @@ -668,9 +668,11 @@ def __init__(self, *args, **kwargs):
self.msg_id = msg_id
super(RpcContext, self).__init__(*args, **kwargs)

def reply(self, *args, **kwargs):
def reply(self, reply=None, failure=None, ending=False):
if self.msg_id:
msg_reply(self.msg_id, *args, **kwargs)
msg_reply(self.msg_id, reply, failure, ending)
if ending:
self.msg_id = None


class MulticallWaiter(object):
Expand All @@ -679,15 +681,20 @@ def __init__(self, connection):
self._iterator = connection.iterconsume()
self._result = None
self._done = False
self._got_ending = False

def done(self):
if self._done:
return
self._done = True
self._connection.close()

def __call__(self, data):
"""The consume() callback will call this. Store the result."""
if data['failure']:
self._result = RemoteError(*data['failure'])
elif data.get('ending', False):
self._got_ending = True
else:
self._result = data['result']

Expand All @@ -697,13 +704,13 @@ def __iter__(self):
raise StopIteration
while True:
self._iterator.next()
if self._got_ending:
self.done()
raise StopIteration
result = self._result
if isinstance(result, Exception):
self.done()
raise result
if result == None:
self.done()
raise StopIteration
yield result


Expand Down Expand Up @@ -758,7 +765,7 @@ def fanout_cast(context, topic, msg):
conn.fanout_send(topic, msg)


def msg_reply(msg_id, reply=None, failure=None):
def msg_reply(msg_id, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
Expand All @@ -778,4 +785,6 @@ def msg_reply(msg_id, reply=None, failure=None):
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, msg)
18 changes: 18 additions & 0 deletions nova/tests/test_rpc_common.py
Expand Up @@ -90,6 +90,17 @@ def test_multicall_succeed_three_times_yield(self):
for i, x in enumerate(result):
self.assertEqual(value + i, x)

def test_multicall_three_nones(self):
value = 42
result = self.rpc.multicall(self.context,
'test',
{"method": "multicall_three_nones",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(x, None)
# i should have been 0, 1, and finally 2:
self.assertEqual(i, 2)

def test_context_passed(self):
"""Makes sure a context is passed through rpc call."""
value = 42
Expand Down Expand Up @@ -176,6 +187,13 @@ def echo_three_times(context, value):
context.reply(value)
context.reply(value + 1)
context.reply(value + 2)
context.reply(ending=True)

@staticmethod
def multicall_three_nones(context, value):
yield None
yield None
yield None

@staticmethod
def echo_three_times_yield(context, value):
Expand Down
2 changes: 2 additions & 0 deletions run_tests.py
Expand Up @@ -64,6 +64,7 @@

gettext.install('nova', unicode=1)

import eventlet
from nose import config
from nose import core
from nose import result
Expand Down Expand Up @@ -336,6 +337,7 @@ def run(self, test):


if __name__ == '__main__':
eventlet.monkey_patch()
logging.setup()
# If any argument looks like a test name but doesn't have "nova.tests" in
# front of it, automatically add that so we don't have to type as much
Expand Down

0 comments on commit 475f362

Please sign in to comment.