Permalink
Browse files

Merge "Add base support for rpc API versioning."

  • Loading branch information...
2 parents b140b34 + 8ed3059 commit 0472976c97992c8017baf050d35ddd9ea7ad20fe Jenkins committed with openstack-gerrit May 17, 2012
View
@@ -56,6 +56,7 @@
from nova.db import base
from nova import flags
from nova import log as logging
+from nova.rpc import dispatcher as rpc_dispatcher
from nova.scheduler import api
from nova import version
@@ -130,12 +131,23 @@ def __init__(cls, names, bases, dict_):
class Manager(base.Base):
__metaclass__ = ManagerMeta
+ # Set RPC API version to 1.0 by default.
+ RPC_API_VERSION = '1.0'
+
def __init__(self, host=None, db_driver=None):
if not host:
host = FLAGS.host
self.host = host
super(Manager, self).__init__(db_driver)
+ def create_rpc_dispatcher(self):
+ '''Get the rpc dispatcher for this manager.
+
+ If a manager would like to set an rpc API version, or support more than
+ one class as the target of rpc messages, override this method.
+ '''
+ return rpc_dispatcher.RpcDispatcher([self])
+
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
for task_name, task in self._periodic_tasks:
View
@@ -17,6 +17,14 @@
# License for the specific language governing permissions and limitations
# under the License.
+"""
+A remote procedure call (rpc) abstraction.
+
+For some wrappers that add message versioning to rpc, see:
+ rpc.dispatcher
+ rpc.proxy
+"""
+
from nova.openstack.common import cfg
from nova.openstack.common import importutils
View
@@ -242,23 +242,26 @@ def __call__(self, message_data):
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
+ version = message_data.get('version', None)
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
- self.pool.spawn_n(self._process_data, ctxt, method, args)
+ self.pool.spawn_n(self._process_data, ctxt, version, method, args)
- def _process_data(self, ctxt, method, args):
- """Thread that magically looks for a method on the proxy
- object and calls it.
+ def _process_data(self, ctxt, version, method, args):
+ """Process a message in a new thread.
+
+ If the proxy object we have has a dispatch method
+ (see rpc.dispatcher.RpcDispatcher), pass it the version,
+ method, and args and let it dispatch as appropriate. If not, use
+ the old behavior of magically calling the specified method on the
+ proxy we have here.
"""
ctxt.update_store()
try:
- node_func = getattr(self.proxy, str(method))
- node_args = dict((str(k), v) for k, v in args.iteritems())
- # NOTE(vish): magic is fun!
- rval = node_func(context=ctxt, **node_args)
+ rval = self.proxy.dispatch(ctxt, version, method, **args)
# Check if the result was a generator
if inspect.isgenerator(rval):
for x in rval:
View
@@ -85,6 +85,11 @@ class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")
+class UnsupportedRpcVersion(RPCException):
+ message = _("Specified RPC version, %(version)s, not supported by "
+ "this endpoint.")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
View
@@ -0,0 +1,105 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Code for rpc message dispatching.
+
+Messages that come in have a version number associated with them. RPC API
+version numbers are in the form:
+
+ Major.Minor
+
+For a given message with version X.Y, the receiver must be marked as able to
+handle messages of version A.B, where:
+
+ A = X
+
+ B >= Y
+
+The Major version number would be incremented for an almost completely new API.
+The Minor version number would be incremented for backwards compatible changes
+to an existing API. A backwards compatible change could be something like
+adding a new method, adding an argument to an existing method (but not
+requiring it), or changing the type for an existing argument (but still
+handling the old type as well).
+
+The conversion over to a versioned API must be done on both the client side and
+server side of the API at the same time. However, as the code stands today,
+there can be both versioned and unversioned APIs implemented in the same code
+base.
+"""
+
+from nova.rpc import common as rpc_common
+
+
+class RpcDispatcher(object):
+ """Dispatch rpc messages according to the requested API version.
+
+ This class can be used as the top level 'manager' for a service. It
+ contains a list of underlying managers that have an API_VERSION attribute.
+ """
+
+ def __init__(self, callbacks):
+ """Initialize the rpc dispatcher.
+
+ :param callbacks: List of proxy objects that are an instance
+ of a class with rpc methods exposed. Each proxy
+ object should have an RPC_API_VERSION attribute.
+ """
+ self.callbacks = callbacks
+ super(RpcDispatcher, self).__init__()
+
+ @staticmethod
+ def _is_compatible(mversion, version):
+ """Determine whether versions are compatible.
+
+ :param mversion: The API version implemented by a callback.
+ :param version: The API version requested by an incoming message.
+ """
+ version_parts = version.split('.')
+ mversion_parts = mversion.split('.')
+ if int(version_parts[0]) != int(mversion_parts[0]): # Major
+ return False
+ if int(version_parts[1]) > int(mversion_parts[1]): # Minor
+ return False
+ return True
+
+ def dispatch(self, ctxt, version, method, **kwargs):
+ """Dispatch a message based on a requested version.
+
+ :param ctxt: The request context
+ :param version: The requested API version from the incoming message
+ :param method: The method requested to be called by the incoming
+ message.
+ :param kwargs: A dict of keyword arguments to be passed to the method.
+
+ :returns: Whatever is returned by the underlying method that gets
+ called.
+ """
+ if not version:
+ version = '1.0'
+
+ for proxyobj in self.callbacks:
+ if hasattr(proxyobj, 'RPC_API_VERSION'):
+ rpc_api_version = proxyobj.RPC_API_VERSION
+ else:
+ rpc_api_version = '1.0'
+ if not hasattr(proxyobj, method):
+ continue
+ if self._is_compatible(rpc_api_version, version):
+ return getattr(proxyobj, method)(ctxt, **kwargs)
+
+ raise rpc_common.UnsupportedRpcVersion(version=version)
View
@@ -47,15 +47,13 @@ def __init__(self, topic, proxy):
self.topic = topic
self.proxy = proxy
- def call(self, context, method, args, timeout):
- node_func = getattr(self.proxy, method)
- node_args = dict((str(k), v) for k, v in args.iteritems())
+ def call(self, context, version, method, args, timeout):
done = eventlet.event.Event()
def _inner():
ctxt = RpcContext.from_dict(context.to_dict())
try:
- rval = node_func(context=ctxt, **node_args)
+ rval = self.proxy.dispatch(context, version, method, **args)
res = []
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
@@ -129,13 +127,14 @@ def multicall(conf, context, topic, msg, timeout=None):
if not method:
return
args = msg.get('args', {})
+ version = msg.get('version', None)
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
else:
- return consumer.call(context, method, args, timeout)
+ return consumer.call(context, version, method, args, timeout)
def call(conf, context, topic, msg, timeout=None):
View
@@ -0,0 +1,161 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A helper class for proxy objects to remote APIs.
+
+For more information about rpc API version numbers, see:
+ rpc/dispatcher.py
+"""
+
+
+from nova import rpc
+
+
+class RpcProxy(object):
+ """A helper class for rpc clients.
+
+ This class is a wrapper around the RPC client API. It allows you to
+ specify the topic and API version in a single place. This is intended to
+ be used as a base class for a class that implements the client side of an
+ rpc API.
+ """
+
+ def __init__(self, topic, default_version):
+ """Initialize an RpcProxy.
+
+ :param topic: The topic to use for all messages.
+ :param default_version: The default API version to request in all
+ outgoing messages. This can be overridden on a per-message
+ basis.
+ """
+ self.topic = topic
+ self.default_version = default_version
+ super(RpcProxy, self).__init__()
+
+ def _set_version(self, msg, vers):
+ """Helper method to set the version in a message.
+
+ :param msg: The message having a version added to it.
+ :param vers: The version number to add to the message.
+ """
+ msg['version'] = vers if vers else self.default_version
+
+ def _get_topic(self, topic):
+ """Return the topic to use for a message."""
+ return topic if topic else self.topic
+
+ @staticmethod
+ def make_msg(method, **kwargs):
+ return {'method': method, 'args': kwargs}
+
+ def call(self, context, msg, topic=None, version=None, timeout=None):
+ """rpc.call() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param timeout: (Optional) A timeout to use when waiting for the
+ response. If no timeout is specified, a default timeout will be
+ used that is usually sufficient.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: The return value from the remote method.
+ """
+ self._set_version(msg, version)
+ return rpc.call(context, self._get_topic(topic), msg, timeout)
+
+ def multicall(self, context, msg, topic=None, version=None, timeout=None):
+ """rpc.multicall() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param timeout: (Optional) A timeout to use when waiting for the
+ response. If no timeout is specified, a default timeout will be
+ used that is usually sufficient.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: An iterator that lets you process each of the returned values
+ from the remote method as they arrive.
+ """
+ self._set_version(msg, version)
+ return rpc.multicall(context, self._get_topic(topic), msg, timeout)
+
+ def cast(self, context, msg, topic=None, version=None):
+ """rpc.cast() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.cast() does not wait on any return value from the
+ remote method.
+ """
+ self._set_version(msg, version)
+ rpc.cast(context, self._get_topic(topic), msg)
+
+ def fanout_cast(self, context, msg, version=None):
+ """rpc.fanout_cast() a remote method.
+
+ :param context: The request context
+ :param msg: The message to send, including the method and args.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.fanout_cast() does not wait on any return value
+ from the remote method.
+ """
+ self._set_version(msg, version)
+ rpc.fanout_cast(context, self.topic, msg)
+
+ def cast_to_server(self, context, server_params, msg, topic=None,
+ version=None):
+ """rpc.cast_to_server() a remote method.
+
+ :param context: The request context
+ :param server_params: Server parameters. See rpc.cast_to_server() for
+ details.
+ :param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.cast_to_server() does not wait on any
+ return values.
+ """
+ self._set_version(msg, version)
+ rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
+
+ def fanout_cast_to_server(self, context, server_params, msg, version=None):
+ """rpc.fanout_cast_to_server() a remote method.
+
+ :param context: The request context
+ :param server_params: Server parameters. See rpc.cast_to_server() for
+ details.
+ :param msg: The message to send, including the method and args.
+ :param version: (Optional) Override the requested API version in this
+ message.
+
+ :returns: None. rpc.fanout_cast_to_server() does not wait on any
+ return values.
+ """
+ self._set_version(msg, version)
+ rpc.fanout_cast_to_server(context, server_params, self.topic, msg)
Oops, something went wrong.

0 comments on commit 0472976

Please sign in to comment.