diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cb7882d38f..7e2812bc54 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,6 +11,7 @@ repos: rev: 22.6.0 hooks: - id: black + exclude: ^(.*_pb2.py|.*_pb2_grpc.py) - repo: https://github.com/pycqa/flake8 rev: 5.0.4 diff --git a/linter-requirements.txt b/linter-requirements.txt index d1108f8eae..289df0cd7f 100644 --- a/linter-requirements.txt +++ b/linter-requirements.txt @@ -2,6 +2,7 @@ mypy black flake8==5.0.4 # flake8 depends on pyflakes>=3.0.0 and this dropped support for Python 2 "# type:" comments types-certifi +types-protobuf types-redis types-setuptools pymongo # There is no separate types module. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000..20ee9680f7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,10 @@ +[tool.black] +# 'extend-exclude' excludes files or directories in addition to the defaults +extend-exclude = ''' +# A regex preceded with ^/ will apply only to files and directories +# in the root of the project. +( + .*_pb2.py # exclude autogenerated Protocol Buffer files anywhere in the project + | .*_pb2_grpc.py # exclude autogenerated Protocol Buffer files anywhere in the project +) +''' diff --git a/sentry_sdk/integrations/fastapi.py b/sentry_sdk/integrations/fastapi.py index 11c9bdcf51..6fbe53b92b 100644 --- a/sentry_sdk/integrations/fastapi.py +++ b/sentry_sdk/integrations/fastapi.py @@ -1,6 +1,7 @@ import asyncio from copy import deepcopy +from sentry_sdk._functools import wraps from sentry_sdk._types import TYPE_CHECKING from sentry_sdk.hub import Hub, _should_send_default_pii from sentry_sdk.integrations import DidNotEnable @@ -79,6 +80,7 @@ def _sentry_get_request_handler(*args, **kwargs): ): old_call = dependant.call + @wraps(old_call) def _sentry_call(*args, **kwargs): # type: (*Any, **Any) -> Any hub = Hub.current diff --git a/sentry_sdk/integrations/grpc/__init__.py b/sentry_sdk/integrations/grpc/__init__.py index 59bfd502e5..2cb7c8192a 100644 --- a/sentry_sdk/integrations/grpc/__init__.py +++ b/sentry_sdk/integrations/grpc/__init__.py @@ -1,2 +1,152 @@ -from .server import ServerInterceptor # noqa: F401 -from .client import ClientInterceptor # noqa: F401 +from functools import wraps + +import grpc +from grpc import Channel, Server, intercept_channel +from grpc.aio import Channel as AsyncChannel +from grpc.aio import Server as AsyncServer + +from sentry_sdk.integrations import Integration +from sentry_sdk._types import TYPE_CHECKING + +from .client import ClientInterceptor +from .server import ServerInterceptor +from .aio.server import ServerInterceptor as AsyncServerInterceptor +from .aio.client import ( + SentryUnaryUnaryClientInterceptor as AsyncUnaryUnaryClientInterceptor, +) +from .aio.client import ( + SentryUnaryStreamClientInterceptor as AsyncUnaryStreamClientIntercetor, +) + +from typing import Any, Optional, Sequence + +# Hack to get new Python features working in older versions +# without introducing a hard dependency on `typing_extensions` +# from: https://stackoverflow.com/a/71944042/300572 +if TYPE_CHECKING: + from typing import ParamSpec, Callable +else: + # Fake ParamSpec + class ParamSpec: + def __init__(self, _): + self.args = None + self.kwargs = None + + # Callable[anything] will return None + class _Callable: + def __getitem__(self, _): + return None + + # Make instances + Callable = _Callable() + +P = ParamSpec("P") + + +def _wrap_channel_sync(func: Callable[P, Channel]) -> Callable[P, Channel]: + "Wrapper for synchronous secure and insecure channel." + + @wraps(func) + def patched_channel(*args: Any, **kwargs: Any) -> Channel: + channel = func(*args, **kwargs) + if not ClientInterceptor._is_intercepted: + ClientInterceptor._is_intercepted = True + return intercept_channel(channel, ClientInterceptor()) + else: + return channel + + return patched_channel + + +def _wrap_intercept_channel(func: Callable[P, Channel]) -> Callable[P, Channel]: + @wraps(func) + def patched_intercept_channel( + channel: Channel, *interceptors: grpc.ServerInterceptor + ) -> Channel: + if ClientInterceptor._is_intercepted: + interceptors = tuple( + [ + interceptor + for interceptor in interceptors + if not isinstance(interceptor, ClientInterceptor) + ] + ) + else: + interceptors = interceptors + return intercept_channel(channel, *interceptors) + + return patched_intercept_channel # type: ignore + + +def _wrap_channel_async(func: Callable[P, AsyncChannel]) -> Callable[P, AsyncChannel]: + "Wrapper for asynchronous secure and insecure channel." + + @wraps(func) + def patched_channel( + *args: P.args, + interceptors: Optional[Sequence[grpc.aio.ClientInterceptor]] = None, + **kwargs: P.kwargs, + ) -> Channel: + sentry_interceptors = [ + AsyncUnaryUnaryClientInterceptor(), + AsyncUnaryStreamClientIntercetor(), + ] + interceptors = [*sentry_interceptors, *(interceptors or [])] + return func(*args, interceptors=interceptors, **kwargs) # type: ignore + + return patched_channel # type: ignore + + +def _wrap_sync_server(func: Callable[P, Server]) -> Callable[P, Server]: + """Wrapper for synchronous server.""" + + @wraps(func) + def patched_server( + *args: P.args, + interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None, + **kwargs: P.kwargs, + ) -> Server: + interceptors = [ + interceptor + for interceptor in interceptors or [] + if not isinstance(interceptor, ServerInterceptor) + ] + server_interceptor = ServerInterceptor() + interceptors = [server_interceptor, *(interceptors or [])] + return func(*args, interceptors=interceptors, **kwargs) # type: ignore + + return patched_server # type: ignore + + +def _wrap_async_server(func: Callable[P, AsyncServer]) -> Callable[P, AsyncServer]: + """Wrapper for asynchronous server.""" + + @wraps(func) + def patched_aio_server( + *args: P.args, + interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None, + **kwargs: P.kwargs, + ) -> Server: + server_interceptor = AsyncServerInterceptor() + interceptors = [server_interceptor, *(interceptors or [])] + return func(*args, interceptors=interceptors, **kwargs) # type: ignore + + return patched_aio_server # type: ignore + + +class GRPCIntegration(Integration): + identifier = "grpc" + + @staticmethod + def setup_once() -> None: + import grpc + + grpc.insecure_channel = _wrap_channel_sync(grpc.insecure_channel) + grpc.secure_channel = _wrap_channel_sync(grpc.secure_channel) + grpc.intercept_channel = _wrap_intercept_channel(grpc.intercept_channel) + + grpc.aio.insecure_channel = _wrap_channel_async(grpc.aio.insecure_channel) + grpc.aio.secure_channel = _wrap_channel_async(grpc.aio.secure_channel) + + grpc.server = _wrap_sync_server(grpc.server) + grpc.aio.server = _wrap_async_server(grpc.aio.server) diff --git a/sentry_sdk/integrations/grpc/aio/__init__.py b/sentry_sdk/integrations/grpc/aio/__init__.py new file mode 100644 index 0000000000..59bfd502e5 --- /dev/null +++ b/sentry_sdk/integrations/grpc/aio/__init__.py @@ -0,0 +1,2 @@ +from .server import ServerInterceptor # noqa: F401 +from .client import ClientInterceptor # noqa: F401 diff --git a/sentry_sdk/integrations/grpc/aio/client.py b/sentry_sdk/integrations/grpc/aio/client.py new file mode 100644 index 0000000000..e0b36541f3 --- /dev/null +++ b/sentry_sdk/integrations/grpc/aio/client.py @@ -0,0 +1,91 @@ +from typing import Callable, Union, AsyncIterable, Any + +from grpc.aio import ( + UnaryUnaryClientInterceptor, + UnaryStreamClientInterceptor, + ClientCallDetails, + UnaryUnaryCall, + UnaryStreamCall, +) +from google.protobuf.message import Message + +from sentry_sdk import Hub +from sentry_sdk.consts import OP + + +class ClientInterceptor: + @staticmethod + def _update_client_call_details_metadata_from_hub( + client_call_details: ClientCallDetails, hub: Hub + ) -> ClientCallDetails: + metadata = ( + list(client_call_details.metadata) if client_call_details.metadata else [] + ) + for key, value in hub.iter_trace_propagation_headers(): + metadata.append((key, value)) + + client_call_details = ClientCallDetails( + method=client_call_details.method, + timeout=client_call_details.timeout, + metadata=metadata, + credentials=client_call_details.credentials, + wait_for_ready=client_call_details.wait_for_ready, + ) + + return client_call_details + + +class SentryUnaryUnaryClientInterceptor(ClientInterceptor, UnaryUnaryClientInterceptor): # type: ignore + async def intercept_unary_unary( + self, + continuation: Callable[[ClientCallDetails, Message], UnaryUnaryCall], + client_call_details: ClientCallDetails, + request: Message, + ) -> Union[UnaryUnaryCall, Message]: + hub = Hub.current + method = client_call_details.method + + with hub.start_span( + op=OP.GRPC_CLIENT, description="unary unary call to %s" % method.decode() + ) as span: + span.set_data("type", "unary unary") + span.set_data("method", method) + + client_call_details = self._update_client_call_details_metadata_from_hub( + client_call_details, hub + ) + + response = await continuation(client_call_details, request) + status_code = await response.code() + span.set_data("code", status_code.name) + + return response + + +class SentryUnaryStreamClientInterceptor( + ClientInterceptor, UnaryStreamClientInterceptor # type: ignore +): + async def intercept_unary_stream( + self, + continuation: Callable[[ClientCallDetails, Message], UnaryStreamCall], + client_call_details: ClientCallDetails, + request: Message, + ) -> Union[AsyncIterable[Any], UnaryStreamCall]: + hub = Hub.current + method = client_call_details.method + + with hub.start_span( + op=OP.GRPC_CLIENT, description="unary stream call to %s" % method.decode() + ) as span: + span.set_data("type", "unary stream") + span.set_data("method", method) + + client_call_details = self._update_client_call_details_metadata_from_hub( + client_call_details, hub + ) + + response = await continuation(client_call_details, request) + # status_code = await response.code() + # span.set_data("code", status_code) + + return response diff --git a/sentry_sdk/integrations/grpc/aio/server.py b/sentry_sdk/integrations/grpc/aio/server.py new file mode 100644 index 0000000000..56d21a90a1 --- /dev/null +++ b/sentry_sdk/integrations/grpc/aio/server.py @@ -0,0 +1,95 @@ +from sentry_sdk import Hub +from sentry_sdk._types import MYPY +from sentry_sdk.consts import OP +from sentry_sdk.integrations import DidNotEnable +from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_CUSTOM +from sentry_sdk.utils import event_from_exception + +if MYPY: + from collections.abc import Awaitable, Callable + from typing import Any + + +try: + import grpc + from grpc import HandlerCallDetails, RpcMethodHandler + from grpc.aio import ServicerContext +except ImportError: + raise DidNotEnable("grpcio is not installed") + + +class ServerInterceptor(grpc.aio.ServerInterceptor): # type: ignore + def __init__(self, find_name=None): + # type: (ServerInterceptor, Callable[[ServicerContext], str] | None) -> None + self._find_method_name = find_name or self._find_name + + super(ServerInterceptor, self).__init__() + + async def intercept_service(self, continuation, handler_call_details): + # type: (ServerInterceptor, Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]], HandlerCallDetails) -> Awaitable[RpcMethodHandler] + self._handler_call_details = handler_call_details + handler = await continuation(handler_call_details) + + if not handler.request_streaming and not handler.response_streaming: + handler_factory = grpc.unary_unary_rpc_method_handler + + async def wrapped(request, context): + # type: (Any, ServicerContext) -> Any + name = self._find_method_name(context) + if not name: + return await handler(request, context) + + hub = Hub.current + + # What if the headers are empty? + transaction = Transaction.continue_from_headers( + dict(context.invocation_metadata()), + op=OP.GRPC_SERVER, + name=name, + source=TRANSACTION_SOURCE_CUSTOM, + ) + + with hub.start_transaction(transaction=transaction): + try: + return await handler.unary_unary(request, context) + except Exception as exc: + event, hint = event_from_exception( + exc, + mechanism={"type": "grpc", "handled": False}, + ) + hub.capture_event(event, hint=hint) + raise + + elif not handler.request_streaming and handler.response_streaming: + handler_factory = grpc.unary_stream_rpc_method_handler + + async def wrapped(request, context): # type: ignore + # type: (Any, ServicerContext) -> Any + async for r in handler.unary_stream(request, context): + yield r + + elif handler.request_streaming and not handler.response_streaming: + handler_factory = grpc.stream_unary_rpc_method_handler + + async def wrapped(request, context): + # type: (Any, ServicerContext) -> Any + response = handler.stream_unary(request, context) + return await response + + elif handler.request_streaming and handler.response_streaming: + handler_factory = grpc.stream_stream_rpc_method_handler + + async def wrapped(request, context): # type: ignore + # type: (Any, ServicerContext) -> Any + async for r in handler.stream_stream(request, context): + yield r + + return handler_factory( + wrapped, + request_deserializer=handler.request_deserializer, + response_serializer=handler.response_serializer, + ) + + def _find_name(self, context): + # type: (ServicerContext) -> str + return self._handler_call_details.method diff --git a/sentry_sdk/integrations/grpc/client.py b/sentry_sdk/integrations/grpc/client.py index 1eb3621b0b..955c3c4217 100644 --- a/sentry_sdk/integrations/grpc/client.py +++ b/sentry_sdk/integrations/grpc/client.py @@ -11,7 +11,7 @@ from grpc import ClientCallDetails, Call from grpc._interceptor import _UnaryOutcome from grpc.aio._interceptor import UnaryStreamCall - from google.protobuf.message import Message # type: ignore + from google.protobuf.message import Message except ImportError: raise DidNotEnable("grpcio is not installed") @@ -19,6 +19,8 @@ class ClientInterceptor( grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor # type: ignore ): + _is_intercepted = False + def intercept_unary_unary(self, continuation, client_call_details, request): # type: (ClientInterceptor, Callable[[ClientCallDetails, Message], _UnaryOutcome], ClientCallDetails, Message) -> _UnaryOutcome hub = Hub.current @@ -57,7 +59,8 @@ def intercept_unary_stream(self, continuation, client_call_details, request): response = continuation( client_call_details, request ) # type: UnaryStreamCall - span.set_data("code", response.code().name) + # Setting code on unary-stream leads to execution getting stuck + # span.set_data("code", response.code().name) return response diff --git a/sentry_sdk/integrations/grpc/server.py b/sentry_sdk/integrations/grpc/server.py index cdeea4a2fa..ce7c2f2a58 100644 --- a/sentry_sdk/integrations/grpc/server.py +++ b/sentry_sdk/integrations/grpc/server.py @@ -6,7 +6,7 @@ if MYPY: from typing import Callable, Optional - from google.protobuf.message import Message # type: ignore + from google.protobuf.message import Message try: import grpc diff --git a/sentry_sdk/metrics.py b/sentry_sdk/metrics.py index fe8e86b345..0b0abee51b 100644 --- a/sentry_sdk/metrics.py +++ b/sentry_sdk/metrics.py @@ -5,13 +5,14 @@ import random import time import zlib +from datetime import datetime from functools import wraps, partial from threading import Event, Lock, Thread from contextlib import contextmanager +import sentry_sdk from sentry_sdk._compat import text_type -from sentry_sdk.hub import Hub -from sentry_sdk.utils import now, nanosecond_time +from sentry_sdk.utils import now, nanosecond_time, to_timestamp from sentry_sdk.envelope import Envelope, Item from sentry_sdk.tracing import ( TRANSACTION_SOURCE_ROUTE, @@ -29,6 +30,7 @@ from typing import Optional from typing import Generator from typing import Tuple + from typing import Union from sentry_sdk._types import BucketKey from sentry_sdk._types import DurationUnit @@ -406,7 +408,7 @@ def add( value, # type: MetricValue unit, # type: MeasurementUnit tags, # type: Optional[MetricTags] - timestamp=None, # type: Optional[float] + timestamp=None, # type: Optional[Union[float, datetime]] ): # type: (...) -> None if not self._ensure_thread() or self._flusher is None: @@ -414,6 +416,8 @@ def add( if timestamp is None: timestamp = time.time() + elif isinstance(timestamp, datetime): + timestamp = to_timestamp(timestamp) bucket_timestamp = int( (timestamp // self.ROLLUP_IN_SECONDS) * self.ROLLUP_IN_SECONDS @@ -500,7 +504,7 @@ def _serialize_tags( def _get_aggregator_and_update_tags(key, tags): # type: (str, Optional[MetricTags]) -> Tuple[Optional[MetricsAggregator], Optional[MetricTags]] """Returns the current metrics aggregator if there is one.""" - hub = Hub.current + hub = sentry_sdk.Hub.current client = hub.client if client is None or client.metrics_aggregator is None: return None, tags @@ -531,7 +535,7 @@ def incr( value=1.0, # type: float unit="none", # type: MeasurementUnit tags=None, # type: Optional[MetricTags] - timestamp=None, # type: Optional[float] + timestamp=None, # type: Optional[Union[float, datetime]] ): # type: (...) -> None """Increments a counter.""" @@ -545,7 +549,7 @@ def __init__( self, key, # type: str tags, # type: Optional[MetricTags] - timestamp, # type: Optional[float] + timestamp, # type: Optional[Union[float, datetime]] value, # type: Optional[float] unit, # type: DurationUnit ): @@ -597,7 +601,7 @@ def timing( value=None, # type: Optional[float] unit="second", # type: DurationUnit tags=None, # type: Optional[MetricTags] - timestamp=None, # type: Optional[float] + timestamp=None, # type: Optional[Union[float, datetime]] ): # type: (...) -> _Timing """Emits a distribution with the time it takes to run the given code block. @@ -620,7 +624,7 @@ def distribution( value, # type: float unit="none", # type: MeasurementUnit tags=None, # type: Optional[MetricTags] - timestamp=None, # type: Optional[float] + timestamp=None, # type: Optional[Union[float, datetime]] ): # type: (...) -> None """Emits a distribution.""" @@ -634,7 +638,7 @@ def set( value, # type: MetricValue unit="none", # type: MeasurementUnit tags=None, # type: Optional[MetricTags] - timestamp=None, # type: Optional[float] + timestamp=None, # type: Optional[Union[float, datetime]] ): # type: (...) -> None """Emits a set.""" @@ -648,7 +652,7 @@ def gauge( value, # type: float unit="none", # type: MetricValue tags=None, # type: Optional[MetricTags] - timestamp=None, # type: Optional[float] + timestamp=None, # type: Optional[Union[float, datetime]] ): # type: (...) -> None """Emits a gauge.""" diff --git a/sentry_sdk/tracing.py b/sentry_sdk/tracing.py index 704339286f..3bdb46f6f6 100644 --- a/sentry_sdk/tracing.py +++ b/sentry_sdk/tracing.py @@ -1,7 +1,7 @@ import uuid import random -from datetime import timedelta +from datetime import datetime, timedelta import sentry_sdk from sentry_sdk.consts import INSTRUMENTER @@ -14,13 +14,13 @@ if TYPE_CHECKING: import typing - from datetime import datetime from typing import Any from typing import Dict from typing import Iterator from typing import List from typing import Optional from typing import Tuple + from typing import Union import sentry_sdk.profiler from sentry_sdk._types import Event, MeasurementUnit, SamplingContext @@ -131,7 +131,7 @@ def __init__( status=None, # type: Optional[str] transaction=None, # type: Optional[str] # deprecated containing_transaction=None, # type: Optional[Transaction] - start_timestamp=None, # type: Optional[datetime] + start_timestamp=None, # type: Optional[Union[datetime, float]] ): # type: (...) -> None self.trace_id = trace_id or uuid.uuid4().hex @@ -146,7 +146,11 @@ def __init__( self._tags = {} # type: Dict[str, str] self._data = {} # type: Dict[str, Any] self._containing_transaction = containing_transaction - self.start_timestamp = start_timestamp or datetime_utcnow() + if start_timestamp is None: + start_timestamp = datetime.utcnow() + elif isinstance(start_timestamp, float): + start_timestamp = datetime.utcfromtimestamp(start_timestamp) + self.start_timestamp = start_timestamp try: # profiling depends on this value and requires that # it is measured in nanoseconds @@ -439,7 +443,7 @@ def is_success(self): return self.status == "ok" def finish(self, hub=None, end_timestamp=None): - # type: (Optional[sentry_sdk.Hub], Optional[datetime]) -> Optional[str] + # type: (Optional[sentry_sdk.Hub], Optional[Union[float, datetime]]) -> Optional[str] # Note: would be type: (Optional[sentry_sdk.Hub]) -> None, but that leads # to incompatible return types for Span.finish and Transaction.finish. """Sets the end timestamp of the span. @@ -463,6 +467,8 @@ def finish(self, hub=None, end_timestamp=None): try: if end_timestamp: + if isinstance(end_timestamp, float): + end_timestamp = datetime.utcfromtimestamp(end_timestamp) self.timestamp = end_timestamp else: elapsed = nanosecond_time() - self._start_timestamp_monotonic_ns @@ -627,7 +633,7 @@ def containing_transaction(self): return self def finish(self, hub=None, end_timestamp=None): - # type: (Optional[sentry_sdk.Hub], Optional[datetime]) -> Optional[str] + # type: (Optional[sentry_sdk.Hub], Optional[Union[float, datetime]]) -> Optional[str] """Finishes the transaction and sends it to Sentry. All finished spans in the transaction will also be sent to Sentry. @@ -935,7 +941,7 @@ def get_trace_context(self): return {} def finish(self, hub=None, end_timestamp=None): - # type: (Optional[sentry_sdk.Hub], Optional[datetime]) -> Optional[str] + # type: (Optional[sentry_sdk.Hub], Optional[Union[float, datetime]]) -> Optional[str] pass def set_measurement(self, name, value, unit=""): diff --git a/tests/integrations/fastapi/test_fastapi.py b/tests/integrations/fastapi/test_fastapi.py index 524eed0560..56d52be474 100644 --- a/tests/integrations/fastapi/test_fastapi.py +++ b/tests/integrations/fastapi/test_fastapi.py @@ -377,6 +377,28 @@ def test_transaction_name( ) +def test_route_endpoint_equal_dependant_call(sentry_init): + """ + Tests that the route endpoint name is equal to the wrapped dependant call name. + """ + sentry_init( + auto_enabling_integrations=False, # Make sure that httpx integration is not added, because it adds tracing information to the starlette test clients request. + integrations=[ + StarletteIntegration(), + FastApiIntegration(), + ], + traces_sample_rate=1.0, + debug=True, + ) + + app = fastapi_app_factory() + + for route in app.router.routes: + if not hasattr(route, "dependant"): + continue + assert route.endpoint.__qualname__ == route.dependant.call.__qualname__ + + @pytest.mark.parametrize( "request_url,transaction_style,expected_transaction_name,expected_transaction_source", [ diff --git a/tests/integrations/grpc/__init__.py b/tests/integrations/grpc/__init__.py index 88a0a201e4..f18dce91e2 100644 --- a/tests/integrations/grpc/__init__.py +++ b/tests/integrations/grpc/__init__.py @@ -1,3 +1,8 @@ +import sys +from pathlib import Path + import pytest +# For imports inside gRPC autogenerated code to work +sys.path.append(str(Path(__file__).parent)) pytest.importorskip("grpc") diff --git a/tests/integrations/grpc/compile_test_services.sh b/tests/integrations/grpc/compile_test_services.sh new file mode 100755 index 0000000000..777a27e6e5 --- /dev/null +++ b/tests/integrations/grpc/compile_test_services.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +# Run this script from the project root to generate the python code + +TARGET_PATH=./tests/integrations/grpc + +# Create python file +python -m grpc_tools.protoc \ + --proto_path=$TARGET_PATH/protos/ \ + --python_out=$TARGET_PATH/ \ + --pyi_out=$TARGET_PATH/ \ + --grpc_python_out=$TARGET_PATH/ \ + $TARGET_PATH/protos/grpc_test_service.proto + +echo Code generation successfull diff --git a/tests/integrations/grpc/grpc_test_service.proto b/tests/integrations/grpc/grpc_test_service.proto deleted file mode 100644 index 43497c7129..0000000000 --- a/tests/integrations/grpc/grpc_test_service.proto +++ /dev/null @@ -1,11 +0,0 @@ -syntax = "proto3"; - -package grpc_test_server; - -service gRPCTestService{ - rpc TestServe(gRPCTestMessage) returns (gRPCTestMessage); -} - -message gRPCTestMessage { - string text = 1; -} diff --git a/tests/integrations/grpc/grpc_test_service_pb2.py b/tests/integrations/grpc/grpc_test_service_pb2.py index 94765dae2c..84ea7f632a 100644 --- a/tests/integrations/grpc/grpc_test_service_pb2.py +++ b/tests/integrations/grpc/grpc_test_service_pb2.py @@ -2,26 +2,26 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: grpc_test_service.proto """Generated protocol buffer code.""" -from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database - +from google.protobuf.internal import builder as _builder # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x17grpc_test_service.proto\x12\x10grpc_test_server"\x1f\n\x0fgRPCTestMessage\x12\x0c\n\x04text\x18\x01 \x01(\t2d\n\x0fgRPCTestService\x12Q\n\tTestServe\x12!.grpc_test_server.gRPCTestMessage\x1a!.grpc_test_server.gRPCTestMessageb\x06proto3' -) -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "grpc_test_service_pb2", globals()) + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x17grpc_test_service.proto\x12\x10grpc_test_server\"\x1f\n\x0fgRPCTestMessage\x12\x0c\n\x04text\x18\x01 \x01(\t2\xf8\x02\n\x0fgRPCTestService\x12Q\n\tTestServe\x12!.grpc_test_server.gRPCTestMessage\x1a!.grpc_test_server.gRPCTestMessage\x12Y\n\x0fTestUnaryStream\x12!.grpc_test_server.gRPCTestMessage\x1a!.grpc_test_server.gRPCTestMessage0\x01\x12\\\n\x10TestStreamStream\x12!.grpc_test_server.gRPCTestMessage\x1a!.grpc_test_server.gRPCTestMessage(\x01\x30\x01\x12Y\n\x0fTestStreamUnary\x12!.grpc_test_server.gRPCTestMessage\x1a!.grpc_test_server.gRPCTestMessage(\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'grpc_test_service_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _GRPCTESTMESSAGE._serialized_start = 45 - _GRPCTESTMESSAGE._serialized_end = 76 - _GRPCTESTSERVICE._serialized_start = 78 - _GRPCTESTSERVICE._serialized_end = 178 + DESCRIPTOR._options = None + _globals['_GRPCTESTMESSAGE']._serialized_start=45 + _globals['_GRPCTESTMESSAGE']._serialized_end=76 + _globals['_GRPCTESTSERVICE']._serialized_start=79 + _globals['_GRPCTESTSERVICE']._serialized_end=455 # @@protoc_insertion_point(module_scope) diff --git a/tests/integrations/grpc/grpc_test_service_pb2.pyi b/tests/integrations/grpc/grpc_test_service_pb2.pyi index 02a0b7045b..f16d8a2d65 100644 --- a/tests/integrations/grpc/grpc_test_service_pb2.pyi +++ b/tests/integrations/grpc/grpc_test_service_pb2.pyi @@ -1,32 +1,11 @@ -""" -@generated by mypy-protobuf. Do not edit manually! -isort:skip_file -""" -import builtins -import google.protobuf.descriptor -import google.protobuf.message -import sys +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Optional as _Optional -if sys.version_info >= (3, 8): - import typing as typing_extensions -else: - import typing_extensions +DESCRIPTOR: _descriptor.FileDescriptor -DESCRIPTOR: google.protobuf.descriptor.FileDescriptor - -@typing_extensions.final -class gRPCTestMessage(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - TEXT_FIELD_NUMBER: builtins.int - text: builtins.str - def __init__( - self, - *, - text: builtins.str = ..., - ) -> None: ... - def ClearField( - self, field_name: typing_extensions.Literal["text", b"text"] - ) -> None: ... - -global___gRPCTestMessage = gRPCTestMessage +class gRPCTestMessage(_message.Message): + __slots__ = ["text"] + TEXT_FIELD_NUMBER: _ClassVar[int] + text: str + def __init__(self, text: _Optional[str] = ...) -> None: ... diff --git a/tests/integrations/grpc/grpc_test_service_pb2_grpc.py b/tests/integrations/grpc/grpc_test_service_pb2_grpc.py index 73b7d94c16..ad897608ca 100644 --- a/tests/integrations/grpc/grpc_test_service_pb2_grpc.py +++ b/tests/integrations/grpc/grpc_test_service_pb2_grpc.py @@ -2,7 +2,7 @@ """Client and server classes corresponding to protobuf-defined services.""" import grpc -import tests.integrations.grpc.grpc_test_service_pb2 as grpc__test__service__pb2 +import grpc_test_service_pb2 as grpc__test__service__pb2 class gRPCTestServiceStub(object): @@ -15,10 +15,25 @@ def __init__(self, channel): channel: A grpc.Channel. """ self.TestServe = channel.unary_unary( - "/grpc_test_server.gRPCTestService/TestServe", - request_serializer=grpc__test__service__pb2.gRPCTestMessage.SerializeToString, - response_deserializer=grpc__test__service__pb2.gRPCTestMessage.FromString, - ) + '/grpc_test_server.gRPCTestService/TestServe', + request_serializer=grpc__test__service__pb2.gRPCTestMessage.SerializeToString, + response_deserializer=grpc__test__service__pb2.gRPCTestMessage.FromString, + ) + self.TestUnaryStream = channel.unary_stream( + '/grpc_test_server.gRPCTestService/TestUnaryStream', + request_serializer=grpc__test__service__pb2.gRPCTestMessage.SerializeToString, + response_deserializer=grpc__test__service__pb2.gRPCTestMessage.FromString, + ) + self.TestStreamStream = channel.stream_stream( + '/grpc_test_server.gRPCTestService/TestStreamStream', + request_serializer=grpc__test__service__pb2.gRPCTestMessage.SerializeToString, + response_deserializer=grpc__test__service__pb2.gRPCTestMessage.FromString, + ) + self.TestStreamUnary = channel.stream_unary( + '/grpc_test_server.gRPCTestService/TestStreamUnary', + request_serializer=grpc__test__service__pb2.gRPCTestMessage.SerializeToString, + response_deserializer=grpc__test__service__pb2.gRPCTestMessage.FromString, + ) class gRPCTestServiceServicer(object): @@ -27,53 +42,124 @@ class gRPCTestServiceServicer(object): def TestServe(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def TestUnaryStream(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def TestStreamStream(self, request_iterator, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def TestStreamUnary(self, request_iterator, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') def add_gRPCTestServiceServicer_to_server(servicer, server): rpc_method_handlers = { - "TestServe": grpc.unary_unary_rpc_method_handler( - servicer.TestServe, - request_deserializer=grpc__test__service__pb2.gRPCTestMessage.FromString, - response_serializer=grpc__test__service__pb2.gRPCTestMessage.SerializeToString, - ), + 'TestServe': grpc.unary_unary_rpc_method_handler( + servicer.TestServe, + request_deserializer=grpc__test__service__pb2.gRPCTestMessage.FromString, + response_serializer=grpc__test__service__pb2.gRPCTestMessage.SerializeToString, + ), + 'TestUnaryStream': grpc.unary_stream_rpc_method_handler( + servicer.TestUnaryStream, + request_deserializer=grpc__test__service__pb2.gRPCTestMessage.FromString, + response_serializer=grpc__test__service__pb2.gRPCTestMessage.SerializeToString, + ), + 'TestStreamStream': grpc.stream_stream_rpc_method_handler( + servicer.TestStreamStream, + request_deserializer=grpc__test__service__pb2.gRPCTestMessage.FromString, + response_serializer=grpc__test__service__pb2.gRPCTestMessage.SerializeToString, + ), + 'TestStreamUnary': grpc.stream_unary_rpc_method_handler( + servicer.TestStreamUnary, + request_deserializer=grpc__test__service__pb2.gRPCTestMessage.FromString, + response_serializer=grpc__test__service__pb2.gRPCTestMessage.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( - "grpc_test_server.gRPCTestService", rpc_method_handlers - ) + 'grpc_test_server.gRPCTestService', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) -# This class is part of an EXPERIMENTAL API. + # This class is part of an EXPERIMENTAL API. class gRPCTestService(object): """Missing associated documentation comment in .proto file.""" @staticmethod - def TestServe( - request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None, - ): - return grpc.experimental.unary_unary( - request, + def TestServe(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/grpc_test_server.gRPCTestService/TestServe', + grpc__test__service__pb2.gRPCTestMessage.SerializeToString, + grpc__test__service__pb2.gRPCTestMessage.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def TestUnaryStream(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream(request, target, '/grpc_test_server.gRPCTestService/TestUnaryStream', + grpc__test__service__pb2.gRPCTestMessage.SerializeToString, + grpc__test__service__pb2.gRPCTestMessage.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def TestStreamStream(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/grpc_test_server.gRPCTestService/TestStreamStream', + grpc__test__service__pb2.gRPCTestMessage.SerializeToString, + grpc__test__service__pb2.gRPCTestMessage.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def TestStreamUnary(request_iterator, target, - "/grpc_test_server.gRPCTestService/TestServe", + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_unary(request_iterator, target, '/grpc_test_server.gRPCTestService/TestStreamUnary', grpc__test__service__pb2.gRPCTestMessage.SerializeToString, grpc__test__service__pb2.gRPCTestMessage.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - ) + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/tests/integrations/grpc/protos/grpc_test_service.proto b/tests/integrations/grpc/protos/grpc_test_service.proto new file mode 100644 index 0000000000..9eba747218 --- /dev/null +++ b/tests/integrations/grpc/protos/grpc_test_service.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package grpc_test_server; + +service gRPCTestService{ + rpc TestServe(gRPCTestMessage) returns (gRPCTestMessage); + rpc TestUnaryStream(gRPCTestMessage) returns (stream gRPCTestMessage); + rpc TestStreamStream(stream gRPCTestMessage) returns (stream gRPCTestMessage); + rpc TestStreamUnary(stream gRPCTestMessage) returns (gRPCTestMessage); +} + +message gRPCTestMessage { + string text = 1; +} diff --git a/tests/integrations/grpc/test_grpc.py b/tests/integrations/grpc/test_grpc.py index c6d7a6c6cc..0813d655ae 100644 --- a/tests/integrations/grpc/test_grpc.py +++ b/tests/integrations/grpc/test_grpc.py @@ -1,16 +1,16 @@ from __future__ import absolute_import import os - +from typing import List, Optional from concurrent import futures +from unittest.mock import Mock import grpc import pytest from sentry_sdk import Hub, start_transaction from sentry_sdk.consts import OP -from sentry_sdk.integrations.grpc.client import ClientInterceptor -from sentry_sdk.integrations.grpc.server import ServerInterceptor +from sentry_sdk.integrations.grpc import GRPCIntegration from tests.integrations.grpc.grpc_test_service_pb2 import gRPCTestMessage from tests.integrations.grpc.grpc_test_service_pb2_grpc import ( gRPCTestServiceServicer, @@ -24,7 +24,7 @@ @pytest.mark.forked def test_grpc_server_starts_transaction(sentry_init, capture_events_forksafe): - sentry_init(traces_sample_rate=1.0) + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) events = capture_events_forksafe() server = _set_up() @@ -47,9 +47,42 @@ def test_grpc_server_starts_transaction(sentry_init, capture_events_forksafe): assert span["op"] == "test" +@pytest.mark.forked +def test_grpc_server_other_interceptors(sentry_init, capture_events_forksafe): + """Ensure compatibility with additional server interceptors.""" + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + events = capture_events_forksafe() + mock_intercept = lambda continuation, handler_call_details: continuation( + handler_call_details + ) + mock_interceptor = Mock() + mock_interceptor.intercept_service.side_effect = mock_intercept + + server = _set_up(interceptors=[mock_interceptor]) + + with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: + stub = gRPCTestServiceStub(channel) + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + mock_interceptor.intercept_service.assert_called_once() + + events.write_file.close() + event = events.read_event() + span = event["spans"][0] + + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert span["op"] == "test" + + @pytest.mark.forked def test_grpc_server_continues_transaction(sentry_init, capture_events_forksafe): - sentry_init(traces_sample_rate=1.0) + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) events = capture_events_forksafe() server = _set_up() @@ -94,14 +127,88 @@ def test_grpc_server_continues_transaction(sentry_init, capture_events_forksafe) @pytest.mark.forked def test_grpc_client_starts_span(sentry_init, capture_events_forksafe): - sentry_init(traces_sample_rate=1.0) + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + events = capture_events_forksafe() + + server = _set_up() + + with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: + stub = gRPCTestServiceStub(channel) + + with start_transaction(): + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + events.write_file.close() + events.read_event() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["data"] == { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "code": "OK", + } + + +@pytest.mark.forked +def test_grpc_client_unary_stream_starts_span(sentry_init, capture_events_forksafe): + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + events = capture_events_forksafe() + + server = _set_up() + + with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: + stub = gRPCTestServiceStub(channel) + + with start_transaction(): + [el for el in stub.TestUnaryStream(gRPCTestMessage(text="test"))] + + _tear_down(server=server) + + events.write_file.close() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" + ) + assert span["data"] == { + "type": "unary stream", + "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", + } + + +# using unittest.mock.Mock not possible because grpc verifies +# that the interceptor is of the correct type +class MockClientInterceptor(grpc.UnaryUnaryClientInterceptor): + call_counter = 0 + + def intercept_unary_unary(self, continuation, client_call_details, request): + self.__class__.call_counter += 1 + return continuation(client_call_details, request) + + +@pytest.mark.forked +def test_grpc_client_other_interceptor(sentry_init, capture_events_forksafe): + """Ensure compatibility with additional client interceptors.""" + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) events = capture_events_forksafe() - interceptors = [ClientInterceptor()] server = _set_up() with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: - channel = grpc.intercept_channel(channel, *interceptors) + channel = grpc.intercept_channel(channel, MockClientInterceptor()) stub = gRPCTestServiceStub(channel) with start_transaction(): @@ -109,6 +216,8 @@ def test_grpc_client_starts_span(sentry_init, capture_events_forksafe): _tear_down(server=server) + assert MockClientInterceptor.call_counter == 1 + events.write_file.close() events.read_event() local_transaction = events.read_event() @@ -131,14 +240,12 @@ def test_grpc_client_starts_span(sentry_init, capture_events_forksafe): def test_grpc_client_and_servers_interceptors_integration( sentry_init, capture_events_forksafe ): - sentry_init(traces_sample_rate=1.0) + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) events = capture_events_forksafe() - interceptors = [ClientInterceptor()] server = _set_up() with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: - channel = grpc.intercept_channel(channel, *interceptors) stub = gRPCTestServiceStub(channel) with start_transaction(): @@ -156,13 +263,36 @@ def test_grpc_client_and_servers_interceptors_integration( ) -def _set_up(): +@pytest.mark.forked +def test_stream_stream(sentry_init): + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + _set_up() + with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: + stub = gRPCTestServiceStub(channel) + response_iterator = stub.TestStreamStream(iter((gRPCTestMessage(text="test"),))) + for response in response_iterator: + assert response.text == "test" + + +def test_stream_unary(sentry_init): + """Test to verify stream-stream works. + Tracing not supported for it yet. + """ + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + _set_up() + with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: + stub = gRPCTestServiceStub(channel) + response = stub.TestStreamUnary(iter((gRPCTestMessage(text="test"),))) + assert response.text == "test" + + +def _set_up(interceptors: Optional[List[grpc.ServerInterceptor]] = None): server = grpc.server( futures.ThreadPoolExecutor(max_workers=2), - interceptors=[ServerInterceptor(find_name=_find_name)], + interceptors=interceptors, ) - add_gRPCTestServiceServicer_to_server(TestService, server) + add_gRPCTestServiceServicer_to_server(TestService(), server) server.add_insecure_port("[::]:{}".format(PORT)) server.start() @@ -187,3 +317,18 @@ def TestServe(request, context): # noqa: N802 pass return gRPCTestMessage(text=request.text) + + @staticmethod + def TestUnaryStream(request, context): # noqa: N802 + for _ in range(3): + yield gRPCTestMessage(text=request.text) + + @staticmethod + def TestStreamStream(request, context): # noqa: N802 + for r in request: + yield r + + @staticmethod + def TestStreamUnary(request, context): # noqa: N802 + requests = [r for r in request] + return requests.pop() diff --git a/tests/integrations/grpc/test_grpc_aio.py b/tests/integrations/grpc/test_grpc_aio.py new file mode 100644 index 0000000000..d5a716bb4b --- /dev/null +++ b/tests/integrations/grpc/test_grpc_aio.py @@ -0,0 +1,236 @@ +from __future__ import absolute_import + +import asyncio +import os + +import grpc +import pytest +import pytest_asyncio +import sentry_sdk + +from sentry_sdk import Hub, start_transaction +from sentry_sdk.consts import OP +from sentry_sdk.integrations.grpc import GRPCIntegration +from tests.integrations.grpc.grpc_test_service_pb2 import gRPCTestMessage +from tests.integrations.grpc.grpc_test_service_pb2_grpc import ( + gRPCTestServiceServicer, + add_gRPCTestServiceServicer_to_server, + gRPCTestServiceStub, +) + +AIO_PORT = 50052 +AIO_PORT += os.getpid() % 100 # avoid port conflicts when running tests in parallel + + +@pytest.fixture(scope="function") +def event_loop(request): + """Create an instance of the default event loop for each test case.""" + loop = asyncio.new_event_loop() + yield loop + loop.close() + + +@pytest_asyncio.fixture(scope="function") +async def grpc_server(sentry_init, event_loop): + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + server = grpc.aio.server() + server.add_insecure_port("[::]:{}".format(AIO_PORT)) + add_gRPCTestServiceServicer_to_server(TestService, server) + + await event_loop.create_task(server.start()) + + try: + yield server + finally: + await server.stop(None) + + +@pytest.mark.asyncio +async def test_grpc_server_starts_transaction(capture_events, grpc_server): + events = capture_events() + + async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: + stub = gRPCTestServiceStub(channel) + await stub.TestServe(gRPCTestMessage(text="test")) + + (event,) = events + span = event["spans"][0] + + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert span["op"] == "test" + + +@pytest.mark.asyncio +async def test_grpc_server_continues_transaction(capture_events, grpc_server): + events = capture_events() + + async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: + stub = gRPCTestServiceStub(channel) + + with sentry_sdk.start_transaction() as transaction: + metadata = ( + ( + "baggage", + "sentry-trace_id={trace_id},sentry-environment=test," + "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( + trace_id=transaction.trace_id + ), + ), + ( + "sentry-trace", + "{trace_id}-{parent_span_id}-{sampled}".format( + trace_id=transaction.trace_id, + parent_span_id=transaction.span_id, + sampled=1, + ), + ), + ) + + await stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) + + (event, _) = events + span = event["spans"][0] + + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert span["op"] == "test" + + +@pytest.mark.asyncio +async def test_grpc_server_exception(capture_events, grpc_server): + events = capture_events() + + async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: + stub = gRPCTestServiceStub(channel) + try: + await stub.TestServe(gRPCTestMessage(text="exception")) + raise AssertionError() + except Exception: + pass + + (event, _) = events + + assert event["exception"]["values"][0]["type"] == "TestService.TestException" + assert event["exception"]["values"][0]["value"] == "test" + assert event["exception"]["values"][0]["mechanism"]["handled"] is False + assert event["exception"]["values"][0]["mechanism"]["type"] == "grpc" + + +@pytest.mark.asyncio +async def test_grpc_client_starts_span( + grpc_server, sentry_init, capture_events_forksafe +): + events = capture_events_forksafe() + + async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: + stub = gRPCTestServiceStub(channel) + with start_transaction(): + await stub.TestServe(gRPCTestMessage(text="test")) + + events.write_file.close() + events.read_event() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["data"] == { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "code": "OK", + } + + +@pytest.mark.asyncio +async def test_grpc_client_unary_stream_starts_span( + grpc_server, capture_events_forksafe +): + events = capture_events_forksafe() + + async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: + stub = gRPCTestServiceStub(channel) + with start_transaction(): + response = stub.TestUnaryStream(gRPCTestMessage(text="test")) + [_ async for _ in response] + + events.write_file.close() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" + ) + assert span["data"] == { + "type": "unary stream", + "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", + } + + +@pytest.mark.asyncio +async def test_stream_stream(grpc_server): + """Test to verify stream-stream works. + Tracing not supported for it yet. + """ + async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: + stub = gRPCTestServiceStub(channel) + response = stub.TestStreamStream((gRPCTestMessage(text="test"),)) + async for r in response: + assert r.text == "test" + + +@pytest.mark.asyncio +async def test_stream_unary(grpc_server): + """Test to verify stream-stream works. + Tracing not supported for it yet. + """ + async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: + stub = gRPCTestServiceStub(channel) + response = await stub.TestStreamUnary((gRPCTestMessage(text="test"),)) + assert response.text == "test" + + +class TestService(gRPCTestServiceServicer): + class TestException(Exception): + def __init__(self): + super().__init__("test") + + @classmethod + async def TestServe(cls, request, context): # noqa: N802 + hub = Hub.current + with hub.start_span(op="test", description="test"): + pass + + if request.text == "exception": + raise cls.TestException() + + return gRPCTestMessage(text=request.text) + + @classmethod + async def TestUnaryStream(cls, request, context): # noqa: N802 + for _ in range(3): + yield gRPCTestMessage(text=request.text) + + @classmethod + async def TestStreamStream(cls, request, context): # noqa: N802 + async for r in request: + yield r + + @classmethod + async def TestStreamUnary(cls, request, context): # noqa: N802 + requests = [r async for r in request] + return requests.pop() diff --git a/tox.ini b/tox.ini index 8da78550ef..c0ec2527a6 100644 --- a/tox.ini +++ b/tox.ini @@ -373,6 +373,7 @@ deps = grpc: protobuf grpc: mypy-protobuf grpc: types-protobuf + grpc: pytest-asyncio # HTTPX httpx: pytest-httpx