Skip to content

Commit

Permalink
(WIP): Refactor transport.py
Browse files Browse the repository at this point in the history
  • Loading branch information
szokeasaurusrex committed Jan 19, 2024
1 parent b4d5997 commit 7e3f22b
Showing 1 changed file with 105 additions and 68 deletions.
173 changes: 105 additions & 68 deletions sentry_sdk/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,82 @@
from urllib import getproxies # type: ignore


class LostEventRecorder:
"""
Captures lost events from the Transport, and converts them into a client report.
"""

def __init__(self, options: Dict[str, Any]) -> None:
self._discarded_events = defaultdict(
int
) # type: DefaultDict[Tuple[str, str], int]
self._last_client_report_generated = time.time()
self._options = options

def record_lost_event(
self,
reason, # type: str
data_category=None, # type: Optional[str]
item=None, # type: Optional[Item]
):
# type: (...) -> None
"""This increments a counter for event loss by reason and
data category.
"""
if not self._options["send_client_reports"]:
return

quantity = 1
if item is not None:
data_category = item.data_category
if data_category == "attachment":
# quantity of 0 is actually 1 as we do not want to count
# empty attachments as actually empty.
quantity = len(item.get_bytes()) or 1
elif data_category is None:
raise TypeError("data category not provided")

self._discarded_events[data_category, reason] += quantity

def fetch_pending_client_report(self, force=False, interval=60):
# type: (bool, int) -> Optional[Item]
"""
If the SDK is configured to send client reports, this method will generate an `Item` containing
information about discarded events that were recorded with `self.record_lost_event`. The report
is only generated when called with `force=True` or when the last report was generated more than
`interval` seconds ago; otherwise, this method returns `None`. The discarded events are cleared
afterwards, so that they are not included in the next report.
"""
if not self._options["send_client_reports"]:
return None

if not (force or self._last_client_report_generated < time.time() - interval):
return None

discarded_events = self._discarded_events
self._discarded_events = defaultdict(int)
self._last_client_report_generated = time.time()

if not discarded_events:
return None

return Item(
PayloadRef(
json={
"timestamp": time.time(),
"discarded_events": [
{"reason": reason, "category": category, "quantity": quantity}
for (
(category, reason),
quantity,
) in discarded_events.items()
],
}
),
type="client_report",
)


class Transport(ABC):
"""Baseclass for all transports.
Expand All @@ -55,15 +131,25 @@ class Transport(ABC):
parsed_dsn = None # type: Optional[Dsn]

def __init__(
self, options=None # type: Optional[Dict[str, Any]]
self,
options: Dict[str, Any],
lost_event_recorder: Optional[LostEventRecorder] = None,
):
# type: (...) -> None
"""
Initializes a transport. Two op
"""
self.options = options
if options and options["dsn"] is not None and options["dsn"]:
self.parsed_dsn = Dsn(options["dsn"])
else:
self.parsed_dsn = None

if lost_event_recorder is None:
lost_event_recorder = LostEventRecorder(options or {})

self.lost_event_recorder = lost_event_recorder

def capture_event(
self, event # type: Event
):
Expand Down Expand Up @@ -98,6 +184,19 @@ def capture_envelope(
"""
pass

def flush_client_reports(self, force=False):
# type: (bool) -> None
"""
Calls `self.fetch_pending_client_report` and checks the return value. If the return value is an
`Item`, then this method will capture an envelope containing the `Item`.
"""
client_report = self.lost_event_recorder.fetch_pending_client_report(
force=force
)
if client_report is not None:
self.capture_envelope(Envelope(items=[client_report]))

@abstractmethod
def flush(
self,
timeout, # type: float
Expand All @@ -107,6 +206,7 @@ def flush(
"""Wait `timeout` seconds for the current events to be sent out."""
pass

@abstractmethod
def kill(self):
# type: () -> None
"""Forcefully kills the transport."""
Expand All @@ -122,7 +222,7 @@ def record_lost_event(
"""This increments a counter for event loss by reason and
data category.
"""
return None
self.lost_event_recorder.record_lost_event(reason, data_category, item)

def is_healthy(self):
# type: () -> bool
Expand Down Expand Up @@ -167,10 +267,6 @@ def __init__(
self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
self._disabled_until = {} # type: Dict[DataCategory, datetime]
self._retry = urllib3.util.Retry()
self._discarded_events = defaultdict(
int
) # type: DefaultDict[Tuple[str, str], int]
self._last_client_report_sent = time.time()

compresslevel = options.get("_experiments", {}).get(
"transport_zlib_compression_level"
Expand All @@ -192,28 +288,6 @@ def __init__(

self.hub_cls = Hub

def record_lost_event(
self,
reason, # type: str
data_category=None, # type: Optional[str]
item=None, # type: Optional[Item]
):
# type: (...) -> None
if not self.options["send_client_reports"]:
return

quantity = 1
if item is not None:
data_category = item.data_category
if data_category == "attachment":
# quantity of 0 is actually 1 as we do not want to count
# empty attachments as actually empty.
quantity = len(item.get_bytes()) or 1
elif data_category is None:
raise TypeError("data category not provided")

self._discarded_events[data_category, reason] += quantity

def _update_rate_limits(self, response):
# type: (urllib3.BaseHTTPResponse) -> None

Expand Down Expand Up @@ -294,43 +368,6 @@ def on_dropped_event(self, reason):
# type: (str) -> None
return None

def _fetch_pending_client_report(self, force=False, interval=60):
# type: (bool, int) -> Optional[Item]
if not self.options["send_client_reports"]:
return None

if not (force or self._last_client_report_sent < time.time() - interval):
return None

discarded_events = self._discarded_events
self._discarded_events = defaultdict(int)
self._last_client_report_sent = time.time()

if not discarded_events:
return None

return Item(
PayloadRef(
json={
"timestamp": time.time(),
"discarded_events": [
{"reason": reason, "category": category, "quantity": quantity}
for (
(category, reason),
quantity,
) in discarded_events.items()
],
}
),
type="client_report",
)

def _flush_client_reports(self, force=False):
# type: (bool) -> None
client_report = self._fetch_pending_client_report(force=force, interval=60)
if client_report is not None:
self.capture_envelope(Envelope(items=[client_report]))

def _check_disabled(self, category):
# type: (str) -> bool
def _disabled(bucket):
Expand Down Expand Up @@ -381,7 +418,7 @@ def _send_envelope(
# can attach it to this enveloped scheduled for sending. This will
# currently typically attach the client report to the most recent
# session update.
client_report_item = self._fetch_pending_client_report(interval=30)
client_report_item = self.fetch_pending_client_report(interval=30)
if client_report_item is not None:
envelope.items.append(client_report_item)

Expand Down Expand Up @@ -493,7 +530,7 @@ def send_envelope_wrapper():
with hub:
with capture_internal_exceptions():
self._send_envelope(envelope)
self._flush_client_reports()
self.flush_client_reports()

if not self._worker.submit(send_envelope_wrapper):
self.on_dropped_event("full_queue")
Expand All @@ -509,7 +546,7 @@ def flush(
logger.debug("Flushing HTTP transport")

if timeout > 0:
self._worker.submit(lambda: self._flush_client_reports(force=True))
self._worker.submit(lambda: self.flush_client_reports(force=True))
self._worker.flush(timeout, callback)

def kill(self):
Expand Down

0 comments on commit 7e3f22b

Please sign in to comment.