diff --git a/elasticapm/contrib/grpc/async_server_interceptor.py b/elasticapm/contrib/grpc/async_server_interceptor.py new file mode 100644 index 000000000..5af0c1372 --- /dev/null +++ b/elasticapm/contrib/grpc/async_server_interceptor.py @@ -0,0 +1,68 @@ +# BSD 3-Clause License +# +# Copyright (c) 2022, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import inspect + +import grpc + +import elasticapm +from elasticapm.contrib.grpc.server_interceptor import _ServicerContextWrapper, _wrap_rpc_behavior, get_trace_parent + + +class _AsyncServerInterceptor(grpc.aio.ServerInterceptor): + async def intercept_service(self, continuation, handler_call_details): + def transaction_wrapper(behavior, request_streaming, response_streaming): + async def _interceptor(request_or_iterator, context): + if request_streaming or response_streaming: # only unary-unary is supported + return behavior(request_or_iterator, context) + tp = get_trace_parent(handler_call_details) + client = elasticapm.get_client() + transaction = client.begin_transaction("request", trace_parent=tp) + try: + result = behavior(request_or_iterator, _ServicerContextWrapper(context, transaction)) + + # This is so we can support both sync and async rpc functions + if inspect.isawaitable(result): + result = await result + + if transaction and not transaction.outcome: + transaction.set_success() + return result + except Exception: + if transaction: + transaction.set_failure() + client.capture_exception(handled=False) + raise + finally: + client.end_transaction(name=handler_call_details.method) + + return _interceptor + + return _wrap_rpc_behavior(await continuation(handler_call_details), transaction_wrapper) diff --git a/elasticapm/contrib/grpc/server_interceptor.py b/elasticapm/contrib/grpc/server_interceptor.py index 077bb670e..581d28545 100644 --- a/elasticapm/contrib/grpc/server_interceptor.py +++ b/elasticapm/contrib/grpc/server_interceptor.py @@ -28,8 +28,6 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import inspect - import grpc import wrapt @@ -121,35 +119,3 @@ def _interceptor(request_or_iterator, context): return _interceptor return _wrap_rpc_behavior(continuation(handler_call_details), transaction_wrapper) - - -class _AsyncServerInterceptor(grpc.ServerInterceptor): - async def intercept_service(self, continuation, handler_call_details): - def transaction_wrapper(behavior, request_streaming, response_streaming): - async def _interceptor(request_or_iterator, context): - if request_streaming or response_streaming: # only unary-unary is supported - return behavior(request_or_iterator, context) - tp = get_trace_parent(handler_call_details) - client = elasticapm.get_client() - transaction = client.begin_transaction("request", trace_parent=tp) - try: - result = behavior(request_or_iterator, _ServicerContextWrapper(context, transaction)) - - # This is so we can support both sync and async rpc functions - if inspect.isawaitable(result): - result = await result - - if transaction and not transaction.outcome: - transaction.set_success() - return result - except Exception: - if transaction: - transaction.set_failure() - client.capture_exception(handled=False) - raise - finally: - client.end_transaction(name=handler_call_details.method) - - return _interceptor - - return _wrap_rpc_behavior(await continuation(handler_call_details), transaction_wrapper) diff --git a/elasticapm/instrumentation/packages/grpc.py b/elasticapm/instrumentation/packages/grpc.py index d41538fee..c881577c0 100644 --- a/elasticapm/instrumentation/packages/grpc.py +++ b/elasticapm/instrumentation/packages/grpc.py @@ -93,7 +93,7 @@ def get_instrument_list(self): return [] def call(self, module, method, wrapped, instance, args, kwargs): - from elasticapm.contrib.grpc.server_interceptor import _AsyncServerInterceptor + from elasticapm.contrib.grpc.async_server_interceptor import _AsyncServerInterceptor interceptors = kwargs.get("interceptors") or (args[2] if len(args) > 2 else []) interceptors.insert(0, _AsyncServerInterceptor())