Skip to content

Commit

Permalink
Merge "Fix RPC responses to allow None response correctly." into stab…
Browse files Browse the repository at this point in the history
…le/diablo
  • Loading branch information
Jenkins authored and openstack-gerrit committed Dec 23, 2011
2 parents e986fa7 + 475f362 commit b4cfd9a
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 48 deletions.
88 changes: 47 additions & 41 deletions nova/rpc/impl_carrot.py
Expand Up @@ -266,15 +266,14 @@ def process_data(self, message_data, message):
# we just log the message and send an error string
# back to the caller
LOG.warn(_('no method for message: %s') % message_data)
if msg_id:
msg_reply(msg_id,
_('No method for message: %s') % message_data)
ctxt.reply(msg_id,
_('No method for message: %s') % message_data)
return
self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
self.pool.spawn_n(self._process_data, ctxt, method, args)

@exception.wrap_exception()
def _process_data(self, msg_id, ctxt, method, args):
"""Thread that maigcally looks for a method on the proxy
def _process_data(self, ctxt, method, args):
"""Thread that magically looks for a method on the proxy
object and calls it.
"""

Expand All @@ -283,23 +282,18 @@ def _process_data(self, msg_id, ctxt, method, args):
# NOTE(vish): magic is fun!
try:
rval = node_func(context=ctxt, **node_args)
if msg_id:
# Check if the result was a generator
if isinstance(rval, types.GeneratorType):
for x in rval:
msg_reply(msg_id, x, None)
else:
msg_reply(msg_id, rval, None)

# This final None tells multicall that it is done.
msg_reply(msg_id, None, None)
elif isinstance(rval, types.GeneratorType):
# NOTE(vish): this iterates through the generator
list(rval)
# Check if the result was a generator
if isinstance(rval, types.GeneratorType):
for x in rval:
ctxt.reply(x, None)
else:
ctxt.reply(rval, None)

# This final None tells multicall that it is done.
ctxt.reply(ending=True)
except Exception as e:
LOG.exception('Exception during message handling')
if msg_id:
msg_reply(msg_id, None, sys.exc_info())
ctxt.reply(None, sys.exc_info())
return


Expand Down Expand Up @@ -447,7 +441,7 @@ def __init__(self, connection=None, msg_id=None):
super(DirectPublisher, self).__init__(connection=connection)


def msg_reply(msg_id, reply=None, failure=None):
def msg_reply(msg_id, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
Expand All @@ -463,12 +457,17 @@ def msg_reply(msg_id, reply=None, failure=None):
with ConnectionPool.item() as conn:
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
msg = {'result': reply, 'failure': failure}
if ending:
msg['ending'] = True
publisher.send(msg)
except TypeError:
publisher.send(
{'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure})
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending:
msg['ending'] = True
publisher.send(msg)

publisher.close()

Expand Down Expand Up @@ -508,8 +507,11 @@ def __init__(self, *args, **kwargs):
self.msg_id = msg_id
super(RpcContext, self).__init__(*args, **kwargs)

def reply(self, *args, **kwargs):
msg_reply(self.msg_id, *args, **kwargs)
def reply(self, reply=None, failure=None, ending=False):
if self.msg_id:
msg_reply(self.msg_id, reply, failure, ending)
if ending:
self.msg_id = None


def multicall(context, topic, msg):
Expand Down Expand Up @@ -537,8 +539,11 @@ def __init__(self, consumer):
self._consumer = consumer
self._results = queue.Queue()
self._closed = False
self._got_ending = False

def close(self):
if self._closed:
return
self._closed = True
self._consumer.close()
ConnectionPool.put(self._consumer.connection)
Expand All @@ -548,30 +553,31 @@ def __call__(self, data, message):
message.ack()
if data['failure']:
self._results.put(RemoteError(*data['failure']))
elif data.get('ending', False):
self._got_ending = True
else:
self._results.put(data['result'])

def __iter__(self):
return self.wait()

def wait(self):
while True:
rv = None
while rv is None and not self._closed:
try:
rv = self._consumer.fetch(enable_callbacks=True)
except Exception:
self.close()
raise
while not self._closed:
try:
rv = self._consumer.fetch(enable_callbacks=True)
except Exception:
self.close()
raise
if rv is None:
time.sleep(0.01)

continue
if self._got_ending:
self.close()
raise StopIteration
result = self._results.get()
if isinstance(result, Exception):
self.close()
raise result
if result == None:
self.close()
raise StopIteration
yield result


Expand Down
23 changes: 16 additions & 7 deletions nova/rpc/impl_kombu.py
Expand Up @@ -625,7 +625,7 @@ def _process_data(self, ctxt, method, args):
else:
ctxt.reply(rval, None)
# This final None tells multicall that it is done.
ctxt.reply(None, None)
ctxt.reply(ending=True)
except Exception as e:
LOG.exception('Exception during message handling')
ctxt.reply(None, sys.exc_info())
Expand Down Expand Up @@ -668,9 +668,11 @@ def __init__(self, *args, **kwargs):
self.msg_id = msg_id
super(RpcContext, self).__init__(*args, **kwargs)

def reply(self, *args, **kwargs):
def reply(self, reply=None, failure=None, ending=False):
if self.msg_id:
msg_reply(self.msg_id, *args, **kwargs)
msg_reply(self.msg_id, reply, failure, ending)
if ending:
self.msg_id = None


class MulticallWaiter(object):
Expand All @@ -679,8 +681,11 @@ def __init__(self, connection):
self._iterator = connection.iterconsume()
self._result = None
self._done = False
self._got_ending = False

def done(self):
if self._done:
return
self._done = True
self._iterator.close()
self._iterator = None
Expand All @@ -690,6 +695,8 @@ def __call__(self, data):
"""The consume() callback will call this. Store the result."""
if data['failure']:
self._result = RemoteError(*data['failure'])
elif data.get('ending', False):
self._got_ending = True
else:
self._result = data['result']

Expand All @@ -699,13 +706,13 @@ def __iter__(self):
raise StopIteration
while True:
self._iterator.next()
if self._got_ending:
self.done()
raise StopIteration
result = self._result
if isinstance(result, Exception):
self.done()
raise result
if result == None:
self.done()
raise StopIteration
yield result


Expand Down Expand Up @@ -760,7 +767,7 @@ def fanout_cast(context, topic, msg):
conn.fanout_send(topic, msg)


def msg_reply(msg_id, reply=None, failure=None):
def msg_reply(msg_id, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
Expand All @@ -780,4 +787,6 @@ def msg_reply(msg_id, reply=None, failure=None):
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, msg)
18 changes: 18 additions & 0 deletions nova/tests/test_rpc_common.py
Expand Up @@ -90,6 +90,17 @@ def test_multicall_succeed_three_times_yield(self):
for i, x in enumerate(result):
self.assertEqual(value + i, x)

def test_multicall_three_nones(self):
value = 42
result = self.rpc.multicall(self.context,
'test',
{"method": "multicall_three_nones",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(x, None)
# i should have been 0, 1, and finally 2:
self.assertEqual(i, 2)

def test_context_passed(self):
"""Makes sure a context is passed through rpc call."""
value = 42
Expand Down Expand Up @@ -176,6 +187,13 @@ def echo_three_times(context, value):
context.reply(value)
context.reply(value + 1)
context.reply(value + 2)
context.reply(ending=True)

@staticmethod
def multicall_three_nones(context, value):
yield None
yield None
yield None

@staticmethod
def echo_three_times_yield(context, value):
Expand Down
2 changes: 2 additions & 0 deletions run_tests.py
Expand Up @@ -64,6 +64,7 @@

gettext.install('nova', unicode=1)

import eventlet
from nose import config
from nose import core
from nose import result
Expand Down Expand Up @@ -336,6 +337,7 @@ def run(self, test):


if __name__ == '__main__':
eventlet.monkey_patch()
logging.setup()
# If any argument looks like a test name but doesn't have "nova.tests" in
# front of it, automatically add that so we don't have to type as much
Expand Down

0 comments on commit b4cfd9a

Please sign in to comment.