From b4d59976256751a6a9b00573d45d38ce04d6e920 Mon Sep 17 00:00:00 2001 From: Daniel Szoke Date: Thu, 18 Jan 2024 17:00:09 +0100 Subject: [PATCH 1/2] Remove store endpoint --- sentry_sdk/_types.py | 1 - sentry_sdk/client.py | 66 ++++------ sentry_sdk/consts.py | 11 ++ sentry_sdk/envelope.py | 11 +- sentry_sdk/transport.py | 123 ++++++++---------- sentry_sdk/utils.py | 15 +-- tests/conftest.py | 59 ++------- .../excepthook/test_excepthook.py | 22 ++-- tests/integrations/gcp/test_gcp.py | 88 ++++++------- tests/test_client.py | 108 ++------------- tests/test_monitor.py | 5 +- tests/test_transport.py | 7 +- 12 files changed, 177 insertions(+), 339 deletions(-) diff --git a/sentry_sdk/_types.py b/sentry_sdk/_types.py index 2536541072..e304156c60 100644 --- a/sentry_sdk/_types.py +++ b/sentry_sdk/_types.py @@ -57,7 +57,6 @@ "monitor", ] SessionStatus = Literal["ok", "exited", "crashed", "abnormal"] - EndpointType = Literal["store", "envelope"] DurationUnit = Literal[ "nanosecond", diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index c476c9afb8..c4bf136261 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -17,7 +17,7 @@ logger, ) from sentry_sdk.serializer import serialize -from sentry_sdk.tracing import trace, has_tracing_enabled +from sentry_sdk.tracing import trace from sentry_sdk.transport import make_transport from sentry_sdk.consts import ( DEFAULT_MAX_VALUE_LENGTH, @@ -603,58 +603,40 @@ def capture_event( ): return None - tracing_enabled = has_tracing_enabled(self.options) attachments = hint.get("attachments") trace_context = event_opt.get("contexts", {}).get("trace") or {} dynamic_sampling_context = trace_context.pop("dynamic_sampling_context", {}) - # If tracing is enabled all events should go to /envelope endpoint. - # If no tracing is enabled only transactions, events with attachments, and checkins should go to the /envelope endpoint. - should_use_envelope_endpoint = ( - tracing_enabled - or is_transaction - or is_checkin - or bool(attachments) - or bool(self.spotlight) - ) - if should_use_envelope_endpoint: - headers = { - "event_id": event_opt["event_id"], - "sent_at": format_timestamp(datetime.now(timezone.utc)), - } - - if dynamic_sampling_context: - headers["trace"] = dynamic_sampling_context - - envelope = Envelope(headers=headers) - - if is_transaction: - if profile is not None: - envelope.add_profile(profile.to_json(event_opt, self.options)) - envelope.add_transaction(event_opt) - elif is_checkin: - envelope.add_checkin(event_opt) - else: - envelope.add_event(event_opt) + headers = { + "event_id": event_opt["event_id"], + "sent_at": format_timestamp(datetime.now(timezone.utc)), + } - for attachment in attachments or (): - envelope.add_item(attachment.to_envelope_item()) + if dynamic_sampling_context: + headers["trace"] = dynamic_sampling_context - if self.spotlight: - self.spotlight.capture_envelope(envelope) + envelope = Envelope(headers=headers) - if self.transport is None: - return None + if is_transaction: + if profile is not None: + envelope.add_profile(profile.to_json(event_opt, self.options)) + envelope.add_transaction(event_opt) + elif is_checkin: + envelope.add_checkin(event_opt) + else: + envelope.add_event(event_opt) - self.transport.capture_envelope(envelope) + for attachment in attachments or (): + envelope.add_item(attachment.to_envelope_item()) - else: - if self.transport is None: - return None + if self.spotlight: + self.spotlight.capture_envelope(envelope) + + if self.transport is None: + return None - # All other events go to the legacy /store/ endpoint (will be removed in the future). - self.transport.capture_event(event_opt) + self.transport.capture_envelope(envelope) return event_id diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 1df8aaec6a..7c82eebd8c 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -1,3 +1,4 @@ +from enum import StrEnum from sentry_sdk._types import TYPE_CHECKING # up top to prevent circular import due to integration import @@ -236,6 +237,16 @@ class OP: SOCKET_DNS = "socket.dns" +class EndpointType(StrEnum): + """ + The type of an endpoint. This is an enum, rather than a constant, for historical reasons + (the old /store endpoint). The enum also preserve future compatibility, in case we ever + have a new endpoint. + """ + + ENVELOPE = "envelope" + + # This type exists to trick mypy and PyCharm into thinking `init` and `Client` # take these arguments (even though they take opaque **kwargs) class ClientConstructor: diff --git a/sentry_sdk/envelope.py b/sentry_sdk/envelope.py index 35e82a741d..7c91b01170 100644 --- a/sentry_sdk/envelope.py +++ b/sentry_sdk/envelope.py @@ -95,11 +95,12 @@ def add_item( def get_event(self): # type: (...) -> Optional[Event] - for items in self.items: - event = items.get_event() - if event is not None: - return event - return None + return next(self.events, None) + + @property + def events(self): + # type: () -> Iterator[Event] + return (item.get_event() for item in self.items if item.get_event() is not None) def get_transaction_event(self): # type: (...) -> Optional[Event] diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index cd33956f54..449f58cad8 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -1,4 +1,8 @@ +from __future__ import print_function + +from abc import ABC, abstractmethod import io +import warnings import urllib3 import certifi import gzip @@ -6,7 +10,8 @@ from datetime import datetime, timedelta, timezone from collections import defaultdict -from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions, json_dumps +from sentry_sdk.consts import EndpointType +from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions from sentry_sdk.worker import BackgroundWorker from sentry_sdk.envelope import Envelope, Item, PayloadRef from sentry_sdk._types import TYPE_CHECKING @@ -25,7 +30,7 @@ from urllib3.poolmanager import PoolManager from urllib3.poolmanager import ProxyManager - from sentry_sdk._types import Event, EndpointType + from sentry_sdk._types import Event DataCategory = Optional[str] @@ -35,12 +40,18 @@ from urllib import getproxies # type: ignore -class Transport: +class Transport(ABC): """Baseclass for all transports. A transport is used to send an event to sentry. """ + # TODO: For the items without default implementations, we may wish to mark them as abstract methods, + # or provide a default implementation. We could probably move implementations of some of the + # methods (such as `record_lost_event`) from the HttpTransport class to this class. + # However, marking methods as abstract methods is a breaking change, so we should consider + # whether we would like to make the change for the SDK 2.0 release. + parsed_dsn = None # type: Optional[Dsn] def __init__( @@ -58,11 +69,22 @@ def capture_event( ): # type: (...) -> None """ + DEPRECATED: Please use capture_envelope instead. + This gets invoked with the event dictionary when an event should be sent to sentry. """ - raise NotImplementedError() + warnings.warn( + "capture_event is deprecated, please use capture_envelope instead!", + DeprecationWarning, + ) + + envelope = Envelope() + envelope.add_event(event) + self.capture_envelope(envelope) + + @abstractmethod def capture_envelope( self, envelope # type: Envelope ): @@ -71,11 +93,10 @@ def capture_envelope( Send an envelope to Sentry. Envelopes are a data container format that can hold any type of data - submitted to Sentry. We use it for transactions and sessions, but - regular "error" events should go through `capture_event` for backwards - compat. + submitted to Sentry. We use it to send all event data (including errors, + transactions, crons checkins, etc.) to Sentry. """ - raise NotImplementedError() + pass def flush( self, @@ -216,7 +237,7 @@ def _send_request( self, body, # type: bytes headers, # type: Dict[str, str] - endpoint_type="store", # type: EndpointType + endpoint_type=EndpointType.ENVELOPE, # type: EndpointType envelope=None, # type: Optional[Envelope] ): # type: (...) -> None @@ -333,46 +354,6 @@ def is_healthy(self): # type: () -> bool return not (self._is_worker_full() or self._is_rate_limited()) - def _send_event( - self, event # type: Event - ): - # type: (...) -> None - - if self._check_disabled("error"): - self.on_dropped_event("self_rate_limits") - self.record_lost_event("ratelimit_backoff", data_category="error") - return None - - body = io.BytesIO() - if self._compresslevel == 0: - body.write(json_dumps(event)) - else: - with gzip.GzipFile( - fileobj=body, mode="w", compresslevel=self._compresslevel - ) as f: - f.write(json_dumps(event)) - - assert self.parsed_dsn is not None - logger.debug( - "Sending event, type:%s level:%s event_id:%s project:%s host:%s" - % ( - event.get("type") or "null", - event.get("level") or "null", - event.get("event_id") or "null", - self.parsed_dsn.project_id, - self.parsed_dsn.host, - ) - ) - - headers = { - "Content-Type": "application/json", - } - if self._compresslevel > 0: - headers["Content-Encoding"] = "gzip" - - self._send_request(body.getvalue(), headers=headers) - return None - def _send_envelope( self, envelope # type: Envelope ): @@ -430,7 +411,7 @@ def _send_envelope( self._send_request( body.getvalue(), headers=headers, - endpoint_type="envelope", + endpoint_type=EndpointType.ENVELOPE, envelope=envelope, ) return None @@ -501,23 +482,6 @@ def _make_pool( else: return urllib3.PoolManager(**opts) - def capture_event( - self, event # type: Event - ): - # type: (...) -> None - hub = self.hub_cls.current - - def send_event_wrapper(): - # type: () -> None - with hub: - with capture_internal_exceptions(): - self._send_event(event) - self._flush_client_reports() - - if not self._worker.submit(send_event_wrapper): - self.on_dropped_event("full_queue") - self.record_lost_event("queue_overflow", data_category="error") - def capture_envelope( self, envelope # type: Envelope ): @@ -555,6 +519,11 @@ def kill(self): class _FunctionTransport(Transport): + """ + DEPRECATED: Users wishing to provide a custom transport should subclass + the Transport class, rather than providing a function. + """ + def __init__( self, func # type: Callable[[Event], None] ): @@ -569,19 +538,31 @@ def capture_event( self._func(event) return None + def capture_envelope(self, envelope: Envelope) -> None: + # Since function transports expect to be called with an event, we need + # to iterate over the envelope and call the function for each event, via + # the deprecated capture_event method. + for event in envelope.events: + self.capture_event(event) + def make_transport(options): # type: (Dict[str, Any]) -> Optional[Transport] ref_transport = options["transport"] - # If no transport is given, we use the http transport class - if ref_transport is None: - transport_cls = HttpTransport # type: Type[Transport] - elif isinstance(ref_transport, Transport): + # By default, we use the http transport class + transport_cls = HttpTransport # type: Type[Transport] + + if isinstance(ref_transport, Transport): return ref_transport elif isinstance(ref_transport, type) and issubclass(ref_transport, Transport): transport_cls = ref_transport elif callable(ref_transport): + warnings.warn( + "Function transports are deprecated and will be removed in a future release." + "Please provide a Transport instance or subclass, instead.", + DeprecationWarning, + ) return _FunctionTransport(ref_transport) # if a transport class is given only instantiate it if the dsn is not diff --git a/sentry_sdk/utils.py b/sentry_sdk/utils.py index 910238f004..de6ef983c3 100644 --- a/sentry_sdk/utils.py +++ b/sentry_sdk/utils.py @@ -27,7 +27,7 @@ import sentry_sdk from sentry_sdk._compat import PY37 from sentry_sdk._types import TYPE_CHECKING -from sentry_sdk.consts import DEFAULT_MAX_VALUE_LENGTH +from sentry_sdk.consts import DEFAULT_MAX_VALUE_LENGTH, EndpointType if TYPE_CHECKING: from types import FrameType, TracebackType @@ -46,7 +46,7 @@ Union, ) - from sentry_sdk._types import EndpointType, ExcInfo + from sentry_sdk._types import ExcInfo epoch = datetime(1970, 1, 1) @@ -305,17 +305,8 @@ def __init__( self.version = version self.client = client - @property - def store_api_url(self): - # type: () -> str - """Returns the API url for storing events. - - Deprecated: use get_api_url instead. - """ - return self.get_api_url(type="store") - def get_api_url( - self, type="store" # type: EndpointType + self, type=EndpointType.ENVELOPE # type: EndpointType ): # type: (...) -> str """Returns the API url for storing events.""" diff --git a/tests/conftest.py b/tests/conftest.py index 6dcda5a5c6..f2859ab1dd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -38,7 +38,7 @@ from sentry_sdk.integrations import _processed_integrations # noqa: F401 from sentry_sdk.profiler import teardown_profiler from sentry_sdk.transport import Transport -from sentry_sdk.utils import capture_internal_exceptions, reraise +from sentry_sdk.utils import reraise from tests import _warning_recorder, _warning_recorder_mgr @@ -154,30 +154,9 @@ def _capture_internal_warnings(): @pytest.fixture -def monkeypatch_test_transport(monkeypatch, validate_event_schema): - def check_event(event): - def check_string_keys(map): - for key, value in map.items(): - assert isinstance(key, str) - if isinstance(value, dict): - check_string_keys(value) - - with capture_internal_exceptions(): - check_string_keys(event) - validate_event_schema(event) - - def check_envelope(envelope): - with capture_internal_exceptions(): - # There used to be a check here for errors are not sent in envelopes. - # We changed the behaviour to send errors in envelopes when tracing is enabled. - # This is checked in test_client.py::test_sending_events_with_tracing - # and test_client.py::test_sending_events_with_no_tracing - pass - +def monkeypatch_test_transport(monkeypatch): def inner(client): - monkeypatch.setattr( - client, "transport", TestTransport(check_event, check_envelope) - ) + monkeypatch.setattr(client, "transport", TestTransport()) return inner @@ -209,6 +188,7 @@ def inner(*a, **kw): client = sentry_sdk.Client(*a, **kw) hub.bind_client(client) if "transport" not in kw: + # TODO: Why do we even monkeypatch the transport, when we could just set it on the kw before constructing the Client? monkeypatch_test_transport(sentry_sdk.Hub.current.client) if request.node.get_closest_marker("forked"): @@ -222,11 +202,12 @@ def inner(*a, **kw): class TestTransport(Transport): - def __init__(self, capture_event_callback, capture_envelope_callback): + def __init__(self): Transport.__init__(self) - self.capture_event = capture_event_callback - self.capture_envelope = capture_envelope_callback - self._queue = None + + def capture_envelope(self, _: Envelope) -> None: + """No-op capture_envelope for tests""" + pass @pytest.fixture @@ -234,21 +215,16 @@ def capture_events(monkeypatch): def inner(): events = [] test_client = sentry_sdk.Hub.current.client - old_capture_event = test_client.transport.capture_event old_capture_envelope = test_client.transport.capture_envelope - def append_event(event): - events.append(event) - return old_capture_event(event) - - def append_envelope(envelope): + def append_event(envelope): for item in envelope: if item.headers.get("type") in ("event", "transaction"): - test_client.transport.capture_event(item.payload.json) + events.append(item.payload.json) return old_capture_envelope(envelope) - monkeypatch.setattr(test_client.transport, "capture_event", append_event) - monkeypatch.setattr(test_client.transport, "capture_envelope", append_envelope) + monkeypatch.setattr(test_client.transport, "capture_envelope", append_event) + return events return inner @@ -259,21 +235,14 @@ def capture_envelopes(monkeypatch): def inner(): envelopes = [] test_client = sentry_sdk.Hub.current.client - old_capture_event = test_client.transport.capture_event old_capture_envelope = test_client.transport.capture_envelope - def append_event(event): - envelope = Envelope() - envelope.add_event(event) - envelopes.append(envelope) - return old_capture_event(event) - def append_envelope(envelope): envelopes.append(envelope) return old_capture_envelope(envelope) - monkeypatch.setattr(test_client.transport, "capture_event", append_event) monkeypatch.setattr(test_client.transport, "capture_envelope", append_envelope) + return envelopes return inner diff --git a/tests/integrations/excepthook/test_excepthook.py b/tests/integrations/excepthook/test_excepthook.py index 18deccd76e..e8e96d8982 100644 --- a/tests/integrations/excepthook/test_excepthook.py +++ b/tests/integrations/excepthook/test_excepthook.py @@ -12,11 +12,12 @@ def test_excepthook(tmpdir): """ from sentry_sdk import init, transport - def send_event(self, event): - print("capture event was called") - print(event) + def capture_envelope(self, envelope): + print("capture_envelope was called") + for event in envelope.events: + print(event) - transport.HttpTransport._send_event = send_event + transport.HttpTransport.capture_envelope = capture_envelope init("http://foobar@localhost/123") @@ -35,7 +36,7 @@ def send_event(self, event): assert b"ZeroDivisionError" in output assert b"LOL" in output - assert b"capture event was called" in output + assert b"capture_envelope was called" in output def test_always_value_excepthook(tmpdir): @@ -47,11 +48,12 @@ def test_always_value_excepthook(tmpdir): from sentry_sdk import init, transport from sentry_sdk.integrations.excepthook import ExcepthookIntegration - def send_event(self, event): - print("capture event was called") - print(event) + def capture_envelope(self, envelope): + print("capture_envelope was called") + for event in envelope.events: + print(event) - transport.HttpTransport._send_event = send_event + transport.HttpTransport.capture_envelope = capture_envelope sys.ps1 = "always_value_test" init("http://foobar@localhost/123", @@ -73,4 +75,4 @@ def send_event(self, event): assert b"ZeroDivisionError" in output assert b"LOL" in output - assert b"capture event was called" in output + assert b"capture_envelope was called" in output diff --git a/tests/integrations/gcp/test_gcp.py b/tests/integrations/gcp/test_gcp.py index 678219dc8b..ed1204c9df 100644 --- a/tests/integrations/gcp/test_gcp.py +++ b/tests/integrations/gcp/test_gcp.py @@ -62,17 +62,9 @@ def envelope_processor(envelope): return item.get_bytes() class TestTransport(HttpTransport): - def _send_event(self, event): - event = event_processor(event) - # Writing a single string to stdout holds the GIL (seems like) and - # therefore cannot be interleaved with other threads. This is why we - # explicitly add a newline at the end even though `print` would provide - # us one. - print("\\nEVENT: {}\\n".format(json.dumps(event))) - - def _send_envelope(self, envelope): - envelope = envelope_processor(envelope) - print("\\nENVELOPE: {}\\n".format(envelope.decode(\"utf-8\"))) + def capture_envelope(self, envelope): + envelope_item = envelope_processor(envelope) + print("\\nENVELOPE: {}\\n".format(envelope_item.decode(\"utf-8\"))) def init_sdk(timeout_warning=False, **extra_init_args): @@ -93,8 +85,7 @@ def init_sdk(timeout_warning=False, **extra_init_args): @pytest.fixture def run_cloud_function(): def inner(code, subprocess_kwargs=()): - events = [] - envelopes = [] + envelope_items = [] return_value = None # STEP : Create a zip of cloud function @@ -130,12 +121,9 @@ def inner(code, subprocess_kwargs=()): for line in stream_data.splitlines(): print("GCP:", line) - if line.startswith("EVENT: "): - line = line[len("EVENT: ") :] - events.append(json.loads(line)) - elif line.startswith("ENVELOPE: "): + if line.startswith("ENVELOPE: "): line = line[len("ENVELOPE: ") :] - envelopes.append(json.loads(line)) + envelope_items.append(json.loads(line)) elif line.startswith("RETURN VALUE: "): line = line[len("RETURN VALUE: ") :] return_value = json.loads(line) @@ -144,13 +132,13 @@ def inner(code, subprocess_kwargs=()): stream.close() - return envelopes, events, return_value + return envelope_items, return_value return inner def test_handled_exception(run_cloud_function): - _, events, return_value = run_cloud_function( + envelope_items, return_value = run_cloud_function( dedent( """ functionhandler = None @@ -167,8 +155,8 @@ def cloud_function(functionhandler, event): """ ) ) - assert events[0]["level"] == "error" - (exception,) = events[0]["exception"]["values"] + assert envelope_items[0]["level"] == "error" + (exception,) = envelope_items[0]["exception"]["values"] assert exception["type"] == "Exception" assert exception["value"] == "something went wrong" @@ -177,7 +165,7 @@ def cloud_function(functionhandler, event): def test_unhandled_exception(run_cloud_function): - _, events, _ = run_cloud_function( + envelope_items, _ = run_cloud_function( dedent( """ functionhandler = None @@ -195,8 +183,8 @@ def cloud_function(functionhandler, event): """ ) ) - assert events[0]["level"] == "error" - (exception,) = events[0]["exception"]["values"] + assert envelope_items[0]["level"] == "error" + (exception,) = envelope_items[0]["exception"]["values"] assert exception["type"] == "ZeroDivisionError" assert exception["value"] == "division by zero" @@ -205,7 +193,7 @@ def cloud_function(functionhandler, event): def test_timeout_error(run_cloud_function): - _, events, _ = run_cloud_function( + envelope_items, _ = run_cloud_function( dedent( """ functionhandler = None @@ -223,8 +211,8 @@ def cloud_function(functionhandler, event): """ ) ) - assert events[0]["level"] == "error" - (exception,) = events[0]["exception"]["values"] + assert envelope_items[0]["level"] == "error" + (exception,) = envelope_items[0]["exception"]["values"] assert exception["type"] == "ServerlessTimeoutWarning" assert ( @@ -236,7 +224,7 @@ def cloud_function(functionhandler, event): def test_performance_no_error(run_cloud_function): - envelopes, _, _ = run_cloud_function( + envelope_items, _ = run_cloud_function( dedent( """ functionhandler = None @@ -254,15 +242,15 @@ def cloud_function(functionhandler, event): ) ) - assert envelopes[0]["type"] == "transaction" - assert envelopes[0]["contexts"]["trace"]["op"] == "function.gcp" - assert envelopes[0]["transaction"].startswith("Google Cloud function") - assert envelopes[0]["transaction_info"] == {"source": "component"} - assert envelopes[0]["transaction"] in envelopes[0]["request"]["url"] + assert envelope_items[0]["type"] == "transaction" + assert envelope_items[0]["contexts"]["trace"]["op"] == "function.gcp" + assert envelope_items[0]["transaction"].startswith("Google Cloud function") + assert envelope_items[0]["transaction_info"] == {"source": "component"} + assert envelope_items[0]["transaction"] in envelope_items[0]["request"]["url"] def test_performance_error(run_cloud_function): - envelopes, events, _ = run_cloud_function( + envelope_items, _ = run_cloud_function( dedent( """ functionhandler = None @@ -280,18 +268,18 @@ def cloud_function(functionhandler, event): ) ) - assert envelopes[0]["level"] == "error" - (exception,) = envelopes[0]["exception"]["values"] + assert envelope_items[0]["level"] == "error" + (exception,) = envelope_items[0]["exception"]["values"] assert exception["type"] == "Exception" assert exception["value"] == "something went wrong" assert exception["mechanism"]["type"] == "gcp" assert not exception["mechanism"]["handled"] - assert envelopes[1]["type"] == "transaction" - assert envelopes[1]["contexts"]["trace"]["op"] == "function.gcp" - assert envelopes[1]["transaction"].startswith("Google Cloud function") - assert envelopes[1]["transaction"] in envelopes[0]["request"]["url"] + assert envelope_items[1]["type"] == "transaction" + assert envelope_items[1]["contexts"]["trace"]["op"] == "function.gcp" + assert envelope_items[1]["transaction"].startswith("Google Cloud function") + assert envelope_items[1]["transaction"] in envelope_items[0]["request"]["url"] def test_traces_sampler_gets_correct_values_in_sampling_context( @@ -304,7 +292,7 @@ def test_traces_sampler_gets_correct_values_in_sampling_context( import inspect - envelopes, events, return_value = run_cloud_function( + _, return_value = run_cloud_function( dedent( """ functionhandler = None @@ -377,7 +365,7 @@ def test_error_has_new_trace_context_performance_enabled(run_cloud_function): """ Check if an 'trace' context is added to errros and transactions when performance monitoring is enabled. """ - envelopes, _, _ = run_cloud_function( + envelope_items, _ = run_cloud_function( dedent( """ functionhandler = None @@ -396,7 +384,7 @@ def cloud_function(functionhandler, event): """ ) ) - (msg_event, error_event, transaction_event) = envelopes + (msg_event, error_event, transaction_event) = envelope_items assert "trace" in msg_event["contexts"] assert "trace_id" in msg_event["contexts"]["trace"] @@ -418,7 +406,7 @@ def test_error_has_new_trace_context_performance_disabled(run_cloud_function): """ Check if an 'trace' context is added to errros and transactions when performance monitoring is disabled. """ - _, events, _ = run_cloud_function( + envelope_items, _ = run_cloud_function( dedent( """ functionhandler = None @@ -438,7 +426,7 @@ def cloud_function(functionhandler, event): ) ) - (msg_event, error_event) = events + (msg_event, error_event) = envelope_items assert "trace" in msg_event["contexts"] assert "trace_id" in msg_event["contexts"]["trace"] @@ -462,7 +450,7 @@ def test_error_has_existing_trace_context_performance_enabled(run_cloud_function parent_sampled = 1 sentry_trace_header = "{}-{}-{}".format(trace_id, parent_span_id, parent_sampled) - envelopes, _, _ = run_cloud_function( + envelope_items, _ = run_cloud_function( dedent( """ functionhandler = None @@ -486,7 +474,7 @@ def cloud_function(functionhandler, event): """ ) ) - (msg_event, error_event, transaction_event) = envelopes + (msg_event, error_event, transaction_event) = envelope_items assert "trace" in msg_event["contexts"] assert "trace_id" in msg_event["contexts"]["trace"] @@ -515,7 +503,7 @@ def test_error_has_existing_trace_context_performance_disabled(run_cloud_functio parent_sampled = 1 sentry_trace_header = "{}-{}-{}".format(trace_id, parent_span_id, parent_sampled) - _, events, _ = run_cloud_function( + envelope_items, _ = run_cloud_function( dedent( """ functionhandler = None @@ -539,7 +527,7 @@ def cloud_function(functionhandler, event): """ ) ) - (msg_event, error_event) = events + (msg_event, error_event) = envelope_items assert "trace" in msg_event["contexts"] assert "trace_id" in msg_event["contexts"]["trace"] diff --git a/tests/test_client.py b/tests/test_client.py index 530a7d8b65..b2f0133de1 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -16,7 +16,6 @@ capture_message, capture_exception, capture_event, - start_transaction, set_tag, ) from sentry_sdk.integrations.executing import ExecutingIntegration @@ -32,13 +31,13 @@ from sentry_sdk._types import Event -class EventCapturedError(Exception): +class EnvelopeCapturedError(Exception): pass class _TestTransport(Transport): - def capture_event(self, event): - raise EventCapturedError(event) + def capture_envelope(self, envelope): + raise EnvelopeCapturedError(envelope) def test_transport_option(monkeypatch): @@ -51,7 +50,7 @@ def test_transport_option(monkeypatch): assert Client().dsn is None monkeypatch.setenv("SENTRY_DSN", dsn) - transport = Transport({"dsn": dsn2}) + transport = _TestTransport({"dsn": dsn2}) assert str(transport.parsed_dsn) == dsn2 assert str(Client(transport=transport).dsn) == dsn @@ -360,7 +359,7 @@ def e(exc): e(ZeroDivisionError()) e(MyDivisionError()) - pytest.raises(EventCapturedError, lambda: e(ValueError())) + pytest.raises(EnvelopeCapturedError, lambda: e(ValueError())) def test_with_locals_deprecation_enabled(sentry_init): @@ -572,8 +571,8 @@ def test_attach_stacktrace_disabled(sentry_init, capture_events): def test_capture_event_works(sentry_init): sentry_init(transport=_TestTransport()) - pytest.raises(EventCapturedError, lambda: capture_event({})) - pytest.raises(EventCapturedError, lambda: capture_event({})) + pytest.raises(EnvelopeCapturedError, lambda: capture_event({})) + pytest.raises(EnvelopeCapturedError, lambda: capture_event({})) @pytest.mark.parametrize("num_messages", [10, 20]) @@ -585,11 +584,13 @@ def test_atexit(tmpdir, monkeypatch, num_messages): import time from sentry_sdk import init, transport, capture_message - def send_event(self, event): + def capture_envelope(self, envelope): time.sleep(0.1) - print(event["message"]) + event = next(envelope.events, dict()) + message = event.get("message", "") + print(message) - transport.HttpTransport._send_event = send_event + transport.HttpTransport.capture_envelope = capture_envelope init("http://foobar@localhost/123", shutdown_timeout={num_messages}) for _ in range({num_messages}): @@ -1000,91 +1001,6 @@ def test_init_string_types(dsn, sentry_init): ) -def test_sending_events_with_tracing(): - """ - Tests for calling the right transport method (capture_event vs - capture_envelope) from the SDK client for different data types. - """ - - envelopes = [] - events = [] - - class CustomTransport(Transport): - def capture_envelope(self, envelope): - envelopes.append(envelope) - - def capture_event(self, event): - events.append(event) - - with Hub(Client(enable_tracing=True, transport=CustomTransport())): - try: - 1 / 0 - except Exception: - event_id = capture_exception() - - # Assert error events get passed in via capture_envelope - assert not events - envelope = envelopes.pop() - (item,) = envelope.items - assert item.data_category == "error" - assert item.headers.get("type") == "event" - assert item.get_event()["event_id"] == event_id - - with start_transaction(name="foo"): - pass - - # Assert transactions get passed in via capture_envelope - assert not events - envelope = envelopes.pop() - - (item,) = envelope.items - assert item.data_category == "transaction" - assert item.headers.get("type") == "transaction" - - assert not envelopes - assert not events - - -def test_sending_events_with_no_tracing(): - """ - Tests for calling the right transport method (capture_event vs - capture_envelope) from the SDK client for different data types. - """ - - envelopes = [] - events = [] - - class CustomTransport(Transport): - def capture_envelope(self, envelope): - envelopes.append(envelope) - - def capture_event(self, event): - events.append(event) - - with Hub(Client(enable_tracing=False, transport=CustomTransport())): - try: - 1 / 0 - except Exception: - event_id = capture_exception() - - # Assert error events get passed in via capture_event - assert not envelopes - event = events.pop() - - assert event["event_id"] == event_id - assert "type" not in event - - with start_transaction(name="foo"): - pass - - # Assert transactions get passed in via capture_envelope - assert not events - assert not envelopes - - assert not envelopes - assert not events - - @pytest.mark.parametrize( "sdk_options, expected_breadcrumbs", [({}, DEFAULT_MAX_BREADCRUMBS), ({"max_breadcrumbs": 50}, 50)], diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 42d600ebbb..ad94ce95bd 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -10,10 +10,7 @@ class HealthyTestTransport(Transport): - def _send_event(self, event): - pass - - def _send_envelope(self, envelope): + def capture_envelope(self, _): pass def is_healthy(self): diff --git a/tests/test_transport.py b/tests/test_transport.py index c888b56803..0e21f4b292 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -128,7 +128,7 @@ def test_transport_works( assert capturing_server.captured assert capturing_server.captured[0].compressed == (compressionlevel > 0) - assert any("Sending event" in record.msg for record in caplog.records) == debug + assert any("Sending envelope" in record.msg for record in caplog.records) == debug @pytest.mark.parametrize( @@ -273,7 +273,7 @@ def record_lost_event(reason, data_category=None, item=None): client.flush() assert len(capturing_server.captured) == 1 - assert capturing_server.captured[0].path == "/api/132/store/" + assert capturing_server.captured[0].path == "/api/132/envelope/" assert captured_outcomes == [ ("ratelimit_backoff", "transaction"), @@ -352,7 +352,8 @@ def intercepting_fetch(*args, **kwargs): assert len(capturing_server.captured) == 2 - event = capturing_server.captured[0].event + assert len(capturing_server.captured[0].envelope.items) == 1 + event = capturing_server.captured[0].envelope.items[0].get_event() assert event["type"] == "error" assert event["release"] == "foo" From 7e3f22bd400ada7f66e1767127c5398ac4094a7b Mon Sep 17 00:00:00 2001 From: Daniel Szoke Date: Fri, 19 Jan 2024 14:51:23 +0100 Subject: [PATCH 2/2] (WIP): Refactor transport.py --- sentry_sdk/transport.py | 173 ++++++++++++++++++++++++---------------- 1 file changed, 105 insertions(+), 68 deletions(-) diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 449f58cad8..dcc300d055 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -40,6 +40,82 @@ from urllib import getproxies # type: ignore +class LostEventRecorder: + """ + Captures lost events from the Transport, and converts them into a client report. + """ + + def __init__(self, options: Dict[str, Any]) -> None: + self._discarded_events = defaultdict( + int + ) # type: DefaultDict[Tuple[str, str], int] + self._last_client_report_generated = time.time() + self._options = options + + def record_lost_event( + self, + reason, # type: str + data_category=None, # type: Optional[str] + item=None, # type: Optional[Item] + ): + # type: (...) -> None + """This increments a counter for event loss by reason and + data category. + """ + if not self._options["send_client_reports"]: + return + + quantity = 1 + if item is not None: + data_category = item.data_category + if data_category == "attachment": + # quantity of 0 is actually 1 as we do not want to count + # empty attachments as actually empty. + quantity = len(item.get_bytes()) or 1 + elif data_category is None: + raise TypeError("data category not provided") + + self._discarded_events[data_category, reason] += quantity + + def fetch_pending_client_report(self, force=False, interval=60): + # type: (bool, int) -> Optional[Item] + """ + If the SDK is configured to send client reports, this method will generate an `Item` containing + information about discarded events that were recorded with `self.record_lost_event`. The report + is only generated when called with `force=True` or when the last report was generated more than + `interval` seconds ago; otherwise, this method returns `None`. The discarded events are cleared + afterwards, so that they are not included in the next report. + """ + if not self._options["send_client_reports"]: + return None + + if not (force or self._last_client_report_generated < time.time() - interval): + return None + + discarded_events = self._discarded_events + self._discarded_events = defaultdict(int) + self._last_client_report_generated = time.time() + + if not discarded_events: + return None + + return Item( + PayloadRef( + json={ + "timestamp": time.time(), + "discarded_events": [ + {"reason": reason, "category": category, "quantity": quantity} + for ( + (category, reason), + quantity, + ) in discarded_events.items() + ], + } + ), + type="client_report", + ) + + class Transport(ABC): """Baseclass for all transports. @@ -55,15 +131,25 @@ class Transport(ABC): parsed_dsn = None # type: Optional[Dsn] def __init__( - self, options=None # type: Optional[Dict[str, Any]] + self, + options: Dict[str, Any], + lost_event_recorder: Optional[LostEventRecorder] = None, ): # type: (...) -> None + """ + Initializes a transport. Two op + """ self.options = options if options and options["dsn"] is not None and options["dsn"]: self.parsed_dsn = Dsn(options["dsn"]) else: self.parsed_dsn = None + if lost_event_recorder is None: + lost_event_recorder = LostEventRecorder(options or {}) + + self.lost_event_recorder = lost_event_recorder + def capture_event( self, event # type: Event ): @@ -98,6 +184,19 @@ def capture_envelope( """ pass + def flush_client_reports(self, force=False): + # type: (bool) -> None + """ + Calls `self.fetch_pending_client_report` and checks the return value. If the return value is an + `Item`, then this method will capture an envelope containing the `Item`. + """ + client_report = self.lost_event_recorder.fetch_pending_client_report( + force=force + ) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) + + @abstractmethod def flush( self, timeout, # type: float @@ -107,6 +206,7 @@ def flush( """Wait `timeout` seconds for the current events to be sent out.""" pass + @abstractmethod def kill(self): # type: () -> None """Forcefully kills the transport.""" @@ -122,7 +222,7 @@ def record_lost_event( """This increments a counter for event loss by reason and data category. """ - return None + self.lost_event_recorder.record_lost_event(reason, data_category, item) def is_healthy(self): # type: () -> bool @@ -167,10 +267,6 @@ def __init__( self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) self._disabled_until = {} # type: Dict[DataCategory, datetime] self._retry = urllib3.util.Retry() - self._discarded_events = defaultdict( - int - ) # type: DefaultDict[Tuple[str, str], int] - self._last_client_report_sent = time.time() compresslevel = options.get("_experiments", {}).get( "transport_zlib_compression_level" @@ -192,28 +288,6 @@ def __init__( self.hub_cls = Hub - def record_lost_event( - self, - reason, # type: str - data_category=None, # type: Optional[str] - item=None, # type: Optional[Item] - ): - # type: (...) -> None - if not self.options["send_client_reports"]: - return - - quantity = 1 - if item is not None: - data_category = item.data_category - if data_category == "attachment": - # quantity of 0 is actually 1 as we do not want to count - # empty attachments as actually empty. - quantity = len(item.get_bytes()) or 1 - elif data_category is None: - raise TypeError("data category not provided") - - self._discarded_events[data_category, reason] += quantity - def _update_rate_limits(self, response): # type: (urllib3.BaseHTTPResponse) -> None @@ -294,43 +368,6 @@ def on_dropped_event(self, reason): # type: (str) -> None return None - def _fetch_pending_client_report(self, force=False, interval=60): - # type: (bool, int) -> Optional[Item] - if not self.options["send_client_reports"]: - return None - - if not (force or self._last_client_report_sent < time.time() - interval): - return None - - discarded_events = self._discarded_events - self._discarded_events = defaultdict(int) - self._last_client_report_sent = time.time() - - if not discarded_events: - return None - - return Item( - PayloadRef( - json={ - "timestamp": time.time(), - "discarded_events": [ - {"reason": reason, "category": category, "quantity": quantity} - for ( - (category, reason), - quantity, - ) in discarded_events.items() - ], - } - ), - type="client_report", - ) - - def _flush_client_reports(self, force=False): - # type: (bool) -> None - client_report = self._fetch_pending_client_report(force=force, interval=60) - if client_report is not None: - self.capture_envelope(Envelope(items=[client_report])) - def _check_disabled(self, category): # type: (str) -> bool def _disabled(bucket): @@ -381,7 +418,7 @@ def _send_envelope( # can attach it to this enveloped scheduled for sending. This will # currently typically attach the client report to the most recent # session update. - client_report_item = self._fetch_pending_client_report(interval=30) + client_report_item = self.fetch_pending_client_report(interval=30) if client_report_item is not None: envelope.items.append(client_report_item) @@ -493,7 +530,7 @@ def send_envelope_wrapper(): with hub: with capture_internal_exceptions(): self._send_envelope(envelope) - self._flush_client_reports() + self.flush_client_reports() if not self._worker.submit(send_envelope_wrapper): self.on_dropped_event("full_queue") @@ -509,7 +546,7 @@ def flush( logger.debug("Flushing HTTP transport") if timeout > 0: - self._worker.submit(lambda: self._flush_client_reports(force=True)) + self._worker.submit(lambda: self.flush_client_reports(force=True)) self._worker.flush(timeout, callback) def kill(self):