New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC Python Client and Server Interceptors #12778

Closed
wants to merge 3 commits into
base: master
from
Jump to file or symbol
Failed to load files and symbols.
+423 −23
Diff settings

Always

Just for now

Next

Add client interceptors to gRPC Python

Implements the client-side interceptor machinery for gRPC Python.
  • Loading branch information...
mehrdada committed Sep 29, 2017
commit a938f50ffd8edf26216e00936b63f20456d5c376
@@ -342,6 +342,181 @@ def details(self):
raise NotImplementedError()
############## Invocation-Side Interceptor Interfaces & Classes ##############
class UnaryUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting unary-unary invocations.
This is an EXPERIMENTAL API and is subject to change without notice."""
@abc.abstractmethod
def intercept_unary_unary_call(self, invoker, method, request, **kwargs):
"""Intercepts a synchronous unary-unary invocation.
Args:
invoker: The handler to pass control to to continue with
the invocation. It is the interceptor's responsibility
to call it if it decides to move the RPC machinery
forward. The interceptor can use
response, call = invoker(method, request, **kwargs)
to continue with the RPC. invoker can raise RpcError
to indicate an RPC terminated with non-OK status.
method: The name of the RPC method.
request: The request value for the RPC.
Returns:
The response value for the RPC and a Call object for the RPC.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
raised RpcError will also be a Call for the RPC affording the RPC's
metadata, status code, and details.
"""
raise NotImplementedError()
@abc.abstractmethod
def intercept_unary_unary_future(self, invoker, method, request, **kwargs):
"""Intercepts an asynchronous unary-unary invocation.
Args:
invoker: The handler to pass control to to continue with
the invocation. It is the interceptor's responsibility
to call it if it decides to move the RPC machinery
forward. The interceptor can use
future = invoker(method, request, **kwargs)
to continue with the RPC. invoker returns an object that
is both a Call for the RPC and a Future. In the event of
RPC completion, the return Call-Future's result value will
be the response message of the RPC.
method: The name of the RPC method.
request: The request value for the RPC.
Returns:
An object that is both a Call for the RPC and a Future. In the event of
RPC completion, the return Call-Future's result value will be the
response message of the RPC. Should the event terminate with non-OK
status, the returned Call-Future's exception value will be an RpcError.
"""
raise NotImplementedError()
class UnaryStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting unary-stream invocations.
This is an EXPERIMENTAL API and is subject to change without notice."""
@abc.abstractmethod
def intercept_unary_stream_call(self, invoker, method, request, **kwargs):
"""Intercepts a unary-stream invocation.
Args:
invoker: The handler to pass control to to continue with
the invocation. It is the interceptor's responsibility
to call it if it decides to move the RPC machinery
forward. The interceptor can use
response_iterator = invoker(method, request, **kwargs)
to continue with the RPC. invoker returns an object that
is both a Call for the RPC and an iterator of response
values.
method: The name of the RPC method.
request: The request value for the RPC.
Returns:
An object that is both a Call for the RPC and an iterator of response
values. Drawing response values from the returned Call-iterator may
raise RpcError indicating termination of the RPC with non-OK status.
"""
raise NotImplementedError()
class StreamUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting stream-unary invocations.
This is an EXPERIMENTAL API and is subject to change without notice."""
@abc.abstractmethod
def intercept_stream_unary_call(self, invoker, method, request_iterator,
**kwargs):
"""Intercepts a synchronous stream-unary invocation.
Args:
invoker: The handler to pass control to to continue with
the invocation. It is the interceptor's responsibility
to call it if it decides to move the RPC machinery
forward. The interceptor can use
reponse, call = (method, request_iterator, **kwargs)
to continue with the RPC. invoker can raise RpcError
to indicate an RPC terminated with non-OK status.
method: The name of the RPC method.
request_iterator: An iterator that yields request values for the RPC.
Returns:
The response value for the RPC and a Call object for the RPC.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
raised RpcError will also be a Call for the RPC affording the RPC's
metadata, status code, and details.
"""
raise NotImplementedError()
@abc.abstractmethod
def intercept_stream_unary_future(self, invoker, method, request_iterator,
**kwargs):
"""Intercepts an asynchronous stream-unary invocation.
Args:
invoker: The handler to pass control to to continue with
the invocation. It is the interceptor's responsibility
to call it if it decides to move the RPC machinery
forward. The interceptor can use
future = invoker(method, request_iterator, **kwargs)
to continue with the RPC. invoker returns an object that
is both a Call for the RPC and a Future. In the event of
RPC completion, the return Call-Future's result value will
be the response message of the RPC.
method: The name of the RPC method.
request_iterator: An iterator that yields request values for the RPC.
Returns:
An object that is both a Call for the RPC and a Future. In the event of
RPC completion, the return Call-Future's result value will be the
response message of the RPC. Should the event terminate with non-OK
status, the returned Call-Future's exception value will be an RpcError.
"""
raise NotImplementedError()
class StreamStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting stream-stream invocations.
This is an EXPERIMENTAL API and is subject to change without notice."""
@abc.abstractmethod
def intercept_stream_stream_call(self, invoker, method, request_iterator,
**kwargs):
"""Intercepts a stream-stream invocation.
invoker: The handler to pass control to to continue with
the invocation. It is the interceptor's responsibility
to call it if it decides to move the RPC machinery
forward. The interceptor can use
response_iterator = invoker(method, request_iterator, **kwargs)
to continue with the RPC. invoker returns an object that
is both a Call for the RPC and an iterator of response
values.
method: The name of the RPC method.
request_iterator: An iterator that yields request values for the RPC.
Returns:
An object that is both a Call for the RPC and an iterator of response
values. Drawing response values from the returned Call-iterator may
raise RpcError indicating termination of the RPC with non-OK status.
"""
raise NotImplementedError()
############ Authentication & Authorization Interfaces & Classes #############
@@ -1301,6 +1476,34 @@ def secure_channel(target, credentials, options=None):
credentials._credentials)
def intercept_channel(channel, *interceptors):
"""Intercepts a channel through a set of interceptors.
This is an EXPERIMENTAL API and is subject to change without notice.
Args:
channel: A Channel.
interceptors: Zero or more objects of type
UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor, or
StreamStreamClientInterceptor.
Interceptors are given control in the order they are listed.
Returns:
A Channel that intercepts each invocation via the provided interceptors.
Raises:
TypeError: If interceptor does not derive from any of
UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor, or
StreamStreamClientInterceptor.
"""
from grpc import _interceptor # pylint: disable=cyclic-import
return _interceptor.intercept_channel(channel, *interceptors)
def server(thread_pool,
handlers=None,
options=None,
@@ -1330,23 +1533,24 @@ def server(thread_pool,
################################### __all__ #################################
__all__ = ('FutureTimeoutError', 'FutureCancelledError', 'Future',
'ChannelConnectivity', 'StatusCode', 'RpcError', 'RpcContext',
'Call', 'ChannelCredentials', 'CallCredentials',
'AuthMetadataContext', 'AuthMetadataPluginCallback',
'AuthMetadataPlugin', 'ServerCredentials', 'UnaryUnaryMultiCallable',
'UnaryStreamMultiCallable', 'StreamUnaryMultiCallable',
'StreamStreamMultiCallable', 'Channel', 'ServicerContext',
'RpcMethodHandler', 'HandlerCallDetails', 'GenericRpcHandler',
'ServiceRpcHandler', 'Server', 'unary_unary_rpc_method_handler',
'unary_stream_rpc_method_handler', 'stream_unary_rpc_method_handler',
'stream_stream_rpc_method_handler',
'method_handlers_generic_handler', 'ssl_channel_credentials',
'metadata_call_credentials', 'access_token_call_credentials',
'composite_call_credentials', 'composite_channel_credentials',
'ssl_server_credentials', 'channel_ready_future', 'insecure_channel',
'secure_channel', 'server',)
__all__ = (
'FutureTimeoutError', 'FutureCancelledError', 'Future',
'ChannelConnectivity', 'StatusCode', 'RpcError', 'RpcContext', 'Call',
'ChannelCredentials', 'CallCredentials', 'AuthMetadataContext',
'AuthMetadataPluginCallback', 'AuthMetadataPlugin', 'ServerCredentials',
'UnaryUnaryClientInterceptor', 'UnaryUnaryMultiCallable',
'UnaryStreamClientInterceptor', 'UnaryStreamMultiCallable',
'StreamUnaryClientInterceptor', 'StreamUnaryMultiCallable',
'StreamStreamClientInterceptor', 'StreamStreamMultiCallable', 'Channel',
'ServicerContext', 'RpcMethodHandler', 'HandlerCallDetails',
'GenericRpcHandler', 'ServiceRpcHandler', 'Server',
'unary_unary_rpc_method_handler', 'unary_stream_rpc_method_handler',
'stream_unary_rpc_method_handler', 'stream_stream_rpc_method_handler',
'method_handlers_generic_handler', 'ssl_channel_credentials',
'metadata_call_credentials', 'access_token_call_credentials',
'composite_call_credentials', 'composite_channel_credentials',
'ssl_server_credentials', 'channel_ready_future', 'insecure_channel',
'intercept_channel', 'secure_channel', 'server',)
############################### Extension Shims ################################
# Here to maintain backwards compatibility; avoid using these in new code!
Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.