Skip to content

Commit

Permalink
Merge "Don't block forever for rpc.(multi)call response."
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Feb 2, 2012
2 parents 44bd902 + e0c59af commit def8544
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 66 deletions.
18 changes: 14 additions & 4 deletions nova/rpc/__init__.py
Expand Up @@ -48,7 +48,7 @@ def create_connection(new=True):
return _get_impl().create_connection(new=new)


def call(context, topic, msg):
def call(context, topic, msg, timeout=None):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
Expand All @@ -59,10 +59,15 @@ def call(context, topic, msg):
when the consumer was created with fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:returns: A dict from the remote method.
:raises: nova.rpc.common.Timeout if a complete response is not received
before the timeout is reached.
"""
return _get_impl().call(context, topic, msg)
return _get_impl().call(context, topic, msg, timeout)


def cast(context, topic, msg):
Expand Down Expand Up @@ -102,7 +107,7 @@ def fanout_cast(context, topic, msg):
return _get_impl().fanout_cast(context, topic, msg)


def multicall(context, topic, msg):
def multicall(context, topic, msg, timeout=None):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
Expand All @@ -117,13 +122,18 @@ def multicall(context, topic, msg):
when the consumer was created with fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
returned and X is the Nth value that was returned by the remote
method.
:raises: nova.rpc.common.Timeout if a complete response is not received
before the timeout is reached.
"""
return _get_impl().multicall(context, topic, msg)
return _get_impl().multicall(context, topic, msg, timeout)


def notify(context, topic, msg):
Expand Down
13 changes: 7 additions & 6 deletions nova/rpc/amqp.py
Expand Up @@ -262,9 +262,10 @@ def _process_data(self, ctxt, method, args):


class MulticallWaiter(object):
def __init__(self, connection):
def __init__(self, connection, timeout):
self._connection = connection
self._iterator = connection.iterconsume()
self._iterator = connection.iterconsume(
timeout=timeout or FLAGS.rpc_response_timeout)
self._result = None
self._done = False
self._got_ending = False
Expand Down Expand Up @@ -307,7 +308,7 @@ def create_connection(new=True):
return ConnectionContext(pooled=not new)


def multicall(context, topic, msg):
def multicall(context, topic, msg, timeout):
"""Make a call that returns multiple times."""
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
Expand All @@ -320,15 +321,15 @@ def multicall(context, topic, msg):
pack_context(msg, context)

conn = ConnectionContext()
wait_msg = MulticallWaiter(conn)
wait_msg = MulticallWaiter(conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg)
return wait_msg


def call(context, topic, msg):
def call(context, topic, msg, timeout):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)
rv = multicall(context, topic, msg, timeout)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
Expand Down
12 changes: 12 additions & 0 deletions nova/rpc/common.py
Expand Up @@ -34,6 +34,9 @@
cfg.IntOpt('rpc_conn_pool_size',
default=30,
help='Size of RPC connection pool'),
cfg.IntOpt('rpc_response_timeout',
default=3600,
help='Seconds to wait for a response from call or multicall'),
]

flags.FLAGS.add_options(rpc_opts)
Expand All @@ -59,6 +62,15 @@ def __init__(self, exc_type=None, value=None, traceback=None):
traceback=traceback)


class Timeout(exception.NovaException):
"""Signifies that a timeout has occurred.
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
message = _("Timeout while waiting on RPC response.")


class Connection(object):
"""A connection, returned by rpc.create_connection().
Expand Down
7 changes: 4 additions & 3 deletions nova/rpc/impl_carrot.py
Expand Up @@ -522,8 +522,9 @@ def reply(self, reply=None, failure=None, ending=False):
self.msg_id = None


def multicall(context, topic, msg):
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
# NOTE(russellb): carrot doesn't support timeouts
LOG.debug(_('Making asynchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
Expand Down Expand Up @@ -594,9 +595,9 @@ def create_connection(new=True):
return Connection.instance(new=new)


def call(context, topic, msg):
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)
rv = multicall(context, topic, msg, timeout)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
Expand Down
79 changes: 52 additions & 27 deletions nova/rpc/impl_fake.py
Expand Up @@ -18,14 +18,21 @@
"""

import inspect
import signal
import sys
import time
import traceback

import eventlet

from nova import context
from nova import flags
from nova.rpc import common as rpc_common

CONSUMERS = {}

FLAGS = flags.FLAGS


class RpcContext(context.RequestContext):
def __init__(self, *args, **kwargs):
Expand All @@ -45,31 +52,49 @@ def __init__(self, topic, proxy):
self.topic = topic
self.proxy = proxy

def call(self, context, method, args):
def call(self, context, method, args, timeout):
node_func = getattr(self.proxy, method)
node_args = dict((str(k), v) for k, v in args.iteritems())

ctxt = RpcContext.from_dict(context.to_dict())
try:
rval = node_func(context=ctxt, **node_args)
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
if failure:
raise failure[0], failure[1], failure[2]
yield reply
# if ending not 'sent'...we might have more data to
# return from the function itself
if not ctxt._done:
if inspect.isgenerator(rval):
for val in rval:
yield val
else:
yield rval
except Exception:
exc_info = sys.exc_info()
raise rpc_common.RemoteError(exc_info[0].__name__,
str(exc_info[1]),
''.join(traceback.format_exception(*exc_info)))
done = eventlet.event.Event()

def _inner():
ctxt = RpcContext.from_dict(context.to_dict())
try:
rval = node_func(context=ctxt, **node_args)
res = []
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
if failure:
raise failure[0], failure[1], failure[2]
res.append(reply)
# if ending not 'sent'...we might have more data to
# return from the function itself
if not ctxt._done:
if inspect.isgenerator(rval):
for val in rval:
res.append(val)
else:
res.append(rval)
done.send(res)
except Exception:
exc_info = sys.exc_info()
done.send_exception(
rpc_common.RemoteError(exc_info[0].__name__,
str(exc_info[1]),
''.join(traceback.format_exception(*exc_info))))

thread = eventlet.greenthread.spawn(_inner)

if timeout:
start_time = time.time()
while not done.ready():
eventlet.greenthread.sleep(1)
cur_time = time.time()
if (cur_time - start_time) > timeout:
thread.kill()
raise rpc_common.Timeout()

return done.wait()


class Connection(object):
Expand Down Expand Up @@ -99,7 +124,7 @@ def create_connection(new=True):
return Connection()


def multicall(context, topic, msg):
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""

method = msg.get('method')
Expand All @@ -112,12 +137,12 @@ def multicall(context, topic, msg):
except (KeyError, IndexError):
return iter([None])
else:
return consumer.call(context, method, args)
return consumer.call(context, method, args, timeout)


def call(context, topic, msg):
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)
rv = multicall(context, topic, msg, timeout)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
Expand Down
26 changes: 16 additions & 10 deletions nova/rpc/impl_kombu.py
Expand Up @@ -15,6 +15,7 @@
# under the License.

import itertools
import socket
import sys
import time
import uuid
Expand Down Expand Up @@ -425,7 +426,7 @@ def ensure(self, error_callback, method, *args, **kwargs):
while True:
try:
return method(*args, **kwargs)
except self.connection_errors, e:
except (self.connection_errors, socket.timeout), e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
Expand Down Expand Up @@ -478,15 +479,20 @@ def _declare_consumer():

return self.ensure(_connect_error, _declare_consumer)

def iterconsume(self, limit=None):
def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers"""

info = {'do_consume': True}

def _error_callback(exc):
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
info['do_consume'] = True
if isinstance(exc, socket.timeout):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
info['do_consume'] = True

def _consume():
if info['do_consume']:
Expand All @@ -496,7 +502,7 @@ def _consume():
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
info['do_consume'] = False
return self.connection.drain_events()
return self.connection.drain_events(timeout=timeout)

for iteration in itertools.count(0):
if limit and iteration >= limit:
Expand Down Expand Up @@ -595,14 +601,14 @@ def create_connection(new=True):
return rpc_amqp.create_connection(new)


def multicall(context, topic, msg):
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
return rpc_amqp.multicall(context, topic, msg)
return rpc_amqp.multicall(context, topic, msg, timeout)


def call(context, topic, msg):
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
return rpc_amqp.call(context, topic, msg)
return rpc_amqp.call(context, topic, msg, timeout)


def cast(context, topic, msg):
Expand Down
25 changes: 16 additions & 9 deletions nova/rpc/impl_qpid.py
Expand Up @@ -28,6 +28,7 @@
from nova.common import cfg
from nova import flags
from nova.rpc import amqp as rpc_amqp
from nova.rpc import common as rpc_common
from nova.rpc.common import LOG


Expand Down Expand Up @@ -338,7 +339,8 @@ def ensure(self, error_callback, method, *args, **kwargs):
while True:
try:
return method(*args, **kwargs)
except qpid.messaging.exceptions.ConnectionError, e:
except (qpid.messaging.exceptions.Empty,
qpid.messaging.exceptions.ConnectionError), e:
if error_callback:
error_callback(e)
self.reconnect()
Expand Down Expand Up @@ -372,15 +374,20 @@ def _declare_consumer():

return self.ensure(_connect_error, _declare_consumer)

def iterconsume(self, limit=None):
def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers"""

def _error_callback(exc):
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
if isinstance(exc, qpid.messaging.exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))

def _consume():
nxt_receiver = self.session.next_receiver()
nxt_receiver = self.session.next_receiver(timeout=timeout)
self._lookup_consumer(nxt_receiver).consume()

for iteration in itertools.count(0):
Expand Down Expand Up @@ -483,14 +490,14 @@ def create_connection(new=True):
return rpc_amqp.create_connection(new)


def multicall(context, topic, msg):
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
return rpc_amqp.multicall(context, topic, msg)
return rpc_amqp.multicall(context, topic, msg, timeout)


def call(context, topic, msg):
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
return rpc_amqp.call(context, topic, msg)
return rpc_amqp.call(context, topic, msg, timeout)


def cast(context, topic, msg):
Expand Down

0 comments on commit def8544

Please sign in to comment.