diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 5e06d77e74aa..60713c9fc476 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3340,16 +3340,6 @@ default=False, flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, ) -register( - "spans.buffer.flusher-cumulative-logger-enabled", - default=False, - flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, -) -register( - "spans.buffer.flusher.log-flushed-segments", - default=False, - flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, -) # List of trace_ids to enable debug logging for. Empty = debug off. # When set, logs detailed metrics about zunionstore set sizes, key existence, and trace structure. diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index 8ce5aeafb4c7..2376d05f7b3b 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -95,7 +95,6 @@ import itertools import logging import math -import time import uuid from collections.abc import Generator, MutableMapping, Sequence from hashlib import blake2b @@ -115,8 +114,6 @@ from sentry.spans.buffer_logger import ( BufferLogger, EvalshaData, - FlusherLogEntry, - FlusherLogger, emit_observability_metrics, ) from sentry.spans.consumers.process_segments.types import attribute_value @@ -242,12 +239,10 @@ def __init__(self, assigned_shards: list[int], slice_id: int | None = None): self.slice_id = slice_id self.add_buffer_sha: str | None = None self.any_shard_at_limit = False - self._last_decompress_latency_ms = 0 self._current_compression_level = None self._zstd_compressor: zstandard.ZstdCompressor | None = None self._zstd_decompressor = zstandard.ZstdDecompressor() self._buffer_logger = BufferLogger() - self._flusher_logger = FlusherLogger() self._debug_trace_logger: DebugTraceLogger | None = None @cached_property @@ -636,10 +631,8 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]: queue_keys = [] shard_factor = max(1, len(self.assigned_shards)) max_flush_segments = options.get("spans.buffer.max-flush-segments") - flusher_logger_enabled = options.get("spans.buffer.flusher-cumulative-logger-enabled") max_segments_per_shard = math.ceil(max_flush_segments / shard_factor) - ids_start = time.monotonic() with metrics.timer("spans.buffer.flush_segments.load_segment_ids"): with self.client.pipeline(transaction=False) as p: for shard in self.assigned_shards: @@ -650,7 +643,6 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]: queue_keys.append(key) result = p.execute() - load_ids_latency_ms = int((time.monotonic() - ids_start) * 1000) segment_keys: list[tuple[int, QueueKey, SegmentKey, float]] = [] for shard, queue_key, keys_with_scores in zip(self.assigned_shards, queue_keys, result): @@ -660,7 +652,6 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]: acquired_locks = self._acquire_flush_locks([k for _, _, k, _ in segment_keys]) segment_keys = [entry for entry in segment_keys if entry[2] in acquired_locks] - data_start = time.monotonic() with metrics.timer("spans.buffer.flush_segments.load_segment_data"): # Pass queue mapping to enable TTL expiration detection segment_to_queue = { @@ -671,12 +662,10 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]: segment_to_queue, now, ) - load_data_latency_ms = int((time.monotonic() - data_start) * 1000) return_segments = {} num_has_root_spans = 0 any_shard_at_limit = False - flusher_log_entries: list[FlusherLogEntry] = [] for shard, queue_key, segment_key, score in segment_keys: segment_span_id = segment_key_to_span_id(segment_key).decode("ascii") @@ -735,25 +724,6 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]: except Exception: logger.exception("flush_segments: Failed to log debug trace flush info") - if flusher_logger_enabled and segment: - project_id, trace_id, _ = parse_segment_key(segment_key) - project_and_trace = f"{project_id.decode('ascii')}:{trace_id.decode('ascii')}" - flusher_log_entries.append( - FlusherLogEntry( - project_and_trace, - len(segment), - sum(len(s) for s in segment), - ) - ) - - if flusher_logger_enabled and flusher_log_entries: - self._flusher_logger.log( - flusher_log_entries, - load_ids_latency_ms, - load_data_latency_ms, - self._last_decompress_latency_ms, - ) - metrics.timing("spans.buffer.flush_segments.num_segments", len(return_segments)) metrics.timing("spans.buffer.flush_segments.has_root_span", num_has_root_spans) @@ -780,8 +750,6 @@ def _load_segment_data( payloads: dict[SegmentKey, list[bytes]] = {key: [] for key in segment_keys} payload_keys_map: dict[SegmentKey, list[PayloadKey]] = {key: [] for key in segment_keys} - self._last_decompress_latency_ms = 0 - decompress_latency_ms = 0.0 # Maps each payload key back to the segment it belongs to. # Multiple distributed payload keys map to one segment. @@ -812,11 +780,7 @@ def _add_spans(key: SegmentKey, raw_data: bytes): """ Decompress and add spans to the segment. """ - nonlocal decompress_latency_ms - - decompress_start = time.monotonic() decompressed = self._decompress_batch(raw_data) - decompress_latency_ms += (time.monotonic() - decompress_start) * 1000 payloads[key].extend(decompressed) while cursors: @@ -923,8 +887,6 @@ def _add_spans(key: SegmentKey, raw_data: bytes): # worst-case. metrics.incr("spans.buffer.empty_segments") - self._last_decompress_latency_ms = int(decompress_latency_ms) - return payloads, payload_keys_map def done_flush_segments(self, segment_keys: dict[SegmentKey, FlushedSegment]): diff --git a/src/sentry/spans/buffer_logger.py b/src/sentry/spans/buffer_logger.py index c0bc3b6f7d42..0d065a307cf8 100644 --- a/src/sentry/spans/buffer_logger.py +++ b/src/sentry/spans/buffer_logger.py @@ -14,16 +14,6 @@ LOGGING_INTERVAL = 60 # 1 minute in seconds -class FlusherLogEntry(NamedTuple): - """ - Represents a single flush operation for a given project and trace. - """ - - project_and_trace: str - span_count: int - bytes_flushed: int - - class BufferAggregate(NamedTuple): """ Tracks the number of operations and cumulative latency for a given project and trace. @@ -33,16 +23,6 @@ class BufferAggregate(NamedTuple): cumulative_latency_ms: int -class FlusherAggregate(NamedTuple): - """ - Tracks the number of segments, spans, and bytes flushed for a given project and trace. - """ - - segment_count: int - span_count: int - bytes_flushed: int - - TAggregate = TypeVar("TAggregate", bound=tuple[Any, ...]) @@ -143,79 +123,6 @@ def log(self, entries: list[tuple[str, int]]) -> None: ) -class FlusherLogger: - """ - Tracks per-trace flush operations and logs the dominant traces by - cumulative bytes flushed. - - This logger keeps a bounded map (max 50 entries) of project_and_trace keys - to their segment counts, span counts, and cumulative bytes. - Every minute the top 50 traces by cumulative bytes are logged at INFO level, - along with the cumulative per-phase latencies over the logging interval. - """ - - def __init__(self) -> None: - self._metrics_per_trace: dict[str, FlusherAggregate] = {} - self._cumulative_load_ids_latency_ms: int = 0 - self._cumulative_load_data_latency_ms: int = 0 - self._cumulative_decompress_latency_ms: int = 0 - self._last_log_time: float | None = None - - def log( - self, - entries: list[FlusherLogEntry], - load_ids_latency_ms: int, - load_data_latency_ms: int, - decompress_latency_ms: int, - ) -> None: - """ - Record a batch of flush operations and periodically log the top traces sorted by - cumulative bytes flushed. - """ - - if not options.get("spans.buffer.flusher-cumulative-logger-enabled"): - return - - self._cumulative_load_ids_latency_ms += load_ids_latency_ms - self._cumulative_load_data_latency_ms += load_data_latency_ms - self._cumulative_decompress_latency_ms += decompress_latency_ms - - for entry in entries: - if entry.project_and_trace in self._metrics_per_trace: - aggregate = self._metrics_per_trace[entry.project_and_trace] - self._metrics_per_trace[entry.project_and_trace] = FlusherAggregate( - aggregate.segment_count + 1, - aggregate.span_count + entry.span_count, - aggregate.bytes_flushed + entry.bytes_flushed, - ) - else: - self._metrics_per_trace[entry.project_and_trace] = FlusherAggregate( - 1, - entry.span_count, - entry.bytes_flushed, - ) - - self._last_log_time = _prune_and_maybe_log( - self._metrics_per_trace, - self._last_log_time, - sort_index=2, - log_message="spans.buffer.top_flush_operations_by_bytes", - entries_key="top_flush_operations", - format_entry=lambda key, val: ( - f"{key}:{val.segment_count}:{val.span_count}:{val.bytes_flushed}" - ), - extra={ - "cumulative_load_ids_latency_ms": self._cumulative_load_ids_latency_ms, - "cumulative_load_data_latency_ms": self._cumulative_load_data_latency_ms, - "cumulative_decompress_latency_ms": self._cumulative_decompress_latency_ms, - }, - ) - if self._last_log_time is None: - self._cumulative_load_ids_latency_ms = 0 - self._cumulative_load_data_latency_ms = 0 - self._cumulative_decompress_latency_ms = 0 - - type DataPoint = tuple[bytes, float] type EvalshaData = list[DataPoint] diff --git a/src/sentry/spans/consumers/process/flusher.py b/src/sentry/spans/consumers/process/flusher.py index d787103b9d50..c5330d450cfe 100644 --- a/src/sentry/spans/consumers/process/flusher.py +++ b/src/sentry/spans/consumers/process/flusher.py @@ -330,25 +330,10 @@ def produce(project_id: int, payload: KafkaPayload, dropped: int) -> None: continue with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): - log_flushed_segments = options.get("spans.buffer.flusher.log-flushed-segments") - for segment_key, flushed_segment in flushed_segments.items(): if not flushed_segment.spans: continue - if log_flushed_segments: - logger.info( - "spans.buffer.flushed_segment", - extra={ - "segment_key": segment_key.decode("utf-8", errors="replace"), - "queue_key": flushed_segment.queue_key.decode( - "utf-8", errors="replace" - ), - "span_count": len(flushed_segment.spans), - "project_id": flushed_segment.project_id, - }, - ) - for message in flushed_segment.to_messages(): kafka_payload = KafkaPayload(None, orjson.dumps(message), []) metrics.timing( diff --git a/tests/sentry/spans/test_buffer.py b/tests/sentry/spans/test_buffer.py index 3a1f7b5becc6..c882e60af692 100644 --- a/tests/sentry/spans/test_buffer.py +++ b/tests/sentry/spans/test_buffer.py @@ -32,15 +32,13 @@ "spans.buffer.flusher.max-unhealthy-seconds": 60, "spans.buffer.flusher.use-stuck-detector": False, "spans.buffer.flusher.flush-lock-ttl": 0, - "spans.buffer.flusher-cumulative-logger-enabled": False, - "spans.buffer.flusher.log-flushed-segments": False, "spans.buffer.done-flush-conditional-zrem": True, "spans.buffer.compression.level": 0, "spans.buffer.pipeline-batch-size": 0, "spans.buffer.max-spans-per-evalsha": 0, "spans.buffer.evalsha-latency-threshold": 100, - "spans.buffer.debug-traces": [], "spans.buffer.evalsha-cumulative-logger-enabled": True, + "spans.buffer.debug-traces": [], "spans.process-segments.schema-validation": 1.0, } diff --git a/tests/sentry/spans/test_buffer_logger.py b/tests/sentry/spans/test_buffer_logger.py index deeb364187aa..aec4529c7cef 100644 --- a/tests/sentry/spans/test_buffer_logger.py +++ b/tests/sentry/spans/test_buffer_logger.py @@ -5,8 +5,6 @@ from sentry.spans.buffer_logger import ( BufferLogger, - FlusherLogEntry, - FlusherLogger, emit_observability_metrics, ) from sentry.testutils.helpers.options import override_options @@ -128,134 +126,6 @@ def test_no_logging_when_no_data(mock_logger): assert mock_logger.info.call_count == 0 -@mock.patch("sentry.spans.buffer_logger.time") -def test_flusher_logger_accumulates_segments_and_spans(mock_time): - """ - Test that FlusherLogger accumulates segment count, span count, and bytes - across multiple calls for the same trace, and tracks cumulative flush latency. - """ - with override_options({"spans.buffer.flusher-cumulative-logger-enabled": True}): - mock_time.time.return_value = 1000.0 - - flusher_logger = FlusherLogger() - - flusher_logger.log( - [ - FlusherLogEntry("project1:trace1", 10, 500), - FlusherLogEntry("project1:trace1", 20, 800), - FlusherLogEntry("project2:trace2", 5, 200), - ], - load_ids_latency_ms=5, - load_data_latency_ms=10, - decompress_latency_ms=3, - ) - - project1 = flusher_logger._metrics_per_trace["project1:trace1"] - project2 = flusher_logger._metrics_per_trace["project2:trace2"] - assert project1.segment_count == 2 - assert project1.span_count == 30 - assert project1.bytes_flushed == 1300 - assert project2.segment_count == 1 - assert project2.span_count == 5 - assert project2.bytes_flushed == 200 - assert flusher_logger._cumulative_load_ids_latency_ms == 5 - assert flusher_logger._cumulative_load_data_latency_ms == 10 - assert flusher_logger._cumulative_decompress_latency_ms == 3 - - flusher_logger.log( - [FlusherLogEntry("project1:trace1", 15, 600)], - load_ids_latency_ms=3, - load_data_latency_ms=7, - decompress_latency_ms=2, - ) - - project1 = flusher_logger._metrics_per_trace["project1:trace1"] - assert project1.segment_count == 3 - assert project1.span_count == 45 - assert project1.bytes_flushed == 1900 - assert flusher_logger._cumulative_load_ids_latency_ms == 8 - assert flusher_logger._cumulative_load_data_latency_ms == 17 - assert flusher_logger._cumulative_decompress_latency_ms == 5 - - -@mock.patch("sentry.spans.buffer_logger.time") -def test_flusher_logger_prunes_to_top_50_by_bytes(mock_time): - """ - Test that FlusherLogger prunes to top 50 entries by cumulative bytes - when exceeding MAX_ENTRIES. - """ - with override_options({"spans.buffer.flusher-cumulative-logger-enabled": True}): - mock_time.time.return_value = 1000.0 - - flusher_logger = FlusherLogger() - - entries = [FlusherLogEntry(f"project{i}:trace{i}", 10, 1000 - i) for i in range(500)] - flusher_logger.log( - entries, load_ids_latency_ms=20, load_data_latency_ms=30, decompress_latency_ms=10 - ) - - assert len(flusher_logger._metrics_per_trace) == 50 - assert "project0:trace0" in flusher_logger._metrics_per_trace - assert "project49:trace49" in flusher_logger._metrics_per_trace - assert "project50:trace50" not in flusher_logger._metrics_per_trace - assert "project499:trace499" not in flusher_logger._metrics_per_trace - - -@mock.patch("sentry.spans.buffer_logger.logger") -@mock.patch("sentry.spans.buffer_logger.time") -def test_flusher_logger_logs_and_resets_after_interval(mock_time, mock_logger): - """ - Test that FlusherLogger logs entries after the 60s interval and resets state, - including cumulative flush latency as a top-level field. - """ - with override_options({"spans.buffer.flusher-cumulative-logger-enabled": True}): - mock_time.time.side_effect = [ - 1000.0, - 1000.0, - 1061.0, - ] - - flusher_logger = FlusherLogger() - - flusher_logger.log( - [ - FlusherLogEntry("project1:trace1", 10, 500), - FlusherLogEntry("project2:trace2", 5, 200), - ], - load_ids_latency_ms=20, - load_data_latency_ms=30, - decompress_latency_ms=8, - ) - - assert mock_logger.info.call_count == 0 - - flusher_logger.log( - [FlusherLogEntry("project1:trace1", 8, 400)], - load_ids_latency_ms=10, - load_data_latency_ms=20, - decompress_latency_ms=5, - ) - - assert mock_logger.info.call_count == 1 - call_args = mock_logger.info.call_args - assert call_args[0][0] == "spans.buffer.top_flush_operations_by_bytes" - - extra = call_args[1]["extra"] - entries_list = extra["top_flush_operations"] - assert len(entries_list) == 2 - assert entries_list[0] == "project1:trace1:2:18:900" - assert entries_list[1] == "project2:trace2:1:5:200" - assert extra["cumulative_load_ids_latency_ms"] == 30 - assert extra["cumulative_load_data_latency_ms"] == 50 - assert extra["cumulative_decompress_latency_ms"] == 13 - - assert len(flusher_logger._metrics_per_trace) == 0 - assert flusher_logger._cumulative_load_ids_latency_ms == 0 - assert flusher_logger._cumulative_load_data_latency_ms == 0 - assert flusher_logger._cumulative_decompress_latency_ms == 0 - assert flusher_logger._last_log_time is None - - class TestEmitObservabilityMetrics: def data(self): return {