Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3340,16 +3340,6 @@
default=False,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"spans.buffer.flusher-cumulative-logger-enabled",
default=False,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"spans.buffer.flusher.log-flushed-segments",
default=False,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)

# List of trace_ids to enable debug logging for. Empty = debug off.
# When set, logs detailed metrics about zunionstore set sizes, key existence, and trace structure.
Expand Down
38 changes: 0 additions & 38 deletions src/sentry/spans/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
import itertools
import logging
import math
import time
import uuid
from collections.abc import Generator, MutableMapping, Sequence
from hashlib import blake2b
Expand All @@ -115,8 +114,6 @@
from sentry.spans.buffer_logger import (
BufferLogger,
EvalshaData,
FlusherLogEntry,
FlusherLogger,
emit_observability_metrics,
)
from sentry.spans.consumers.process_segments.types import attribute_value
Expand Down Expand Up @@ -242,12 +239,10 @@ def __init__(self, assigned_shards: list[int], slice_id: int | None = None):
self.slice_id = slice_id
self.add_buffer_sha: str | None = None
self.any_shard_at_limit = False
self._last_decompress_latency_ms = 0
self._current_compression_level = None
self._zstd_compressor: zstandard.ZstdCompressor | None = None
self._zstd_decompressor = zstandard.ZstdDecompressor()
self._buffer_logger = BufferLogger()
self._flusher_logger = FlusherLogger()
self._debug_trace_logger: DebugTraceLogger | None = None

@cached_property
Expand Down Expand Up @@ -636,10 +631,8 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]:
queue_keys = []
shard_factor = max(1, len(self.assigned_shards))
max_flush_segments = options.get("spans.buffer.max-flush-segments")
flusher_logger_enabled = options.get("spans.buffer.flusher-cumulative-logger-enabled")
max_segments_per_shard = math.ceil(max_flush_segments / shard_factor)

ids_start = time.monotonic()
with metrics.timer("spans.buffer.flush_segments.load_segment_ids"):
with self.client.pipeline(transaction=False) as p:
for shard in self.assigned_shards:
Expand All @@ -650,7 +643,6 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]:
queue_keys.append(key)

result = p.execute()
load_ids_latency_ms = int((time.monotonic() - ids_start) * 1000)

segment_keys: list[tuple[int, QueueKey, SegmentKey, float]] = []
for shard, queue_key, keys_with_scores in zip(self.assigned_shards, queue_keys, result):
Expand All @@ -660,7 +652,6 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]:
acquired_locks = self._acquire_flush_locks([k for _, _, k, _ in segment_keys])
segment_keys = [entry for entry in segment_keys if entry[2] in acquired_locks]

data_start = time.monotonic()
with metrics.timer("spans.buffer.flush_segments.load_segment_data"):
# Pass queue mapping to enable TTL expiration detection
segment_to_queue = {
Expand All @@ -671,12 +662,10 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]:
segment_to_queue,
now,
)
load_data_latency_ms = int((time.monotonic() - data_start) * 1000)

return_segments = {}
num_has_root_spans = 0
any_shard_at_limit = False
flusher_log_entries: list[FlusherLogEntry] = []

for shard, queue_key, segment_key, score in segment_keys:
segment_span_id = segment_key_to_span_id(segment_key).decode("ascii")
Expand Down Expand Up @@ -735,25 +724,6 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]:
except Exception:
logger.exception("flush_segments: Failed to log debug trace flush info")

if flusher_logger_enabled and segment:
project_id, trace_id, _ = parse_segment_key(segment_key)
project_and_trace = f"{project_id.decode('ascii')}:{trace_id.decode('ascii')}"
flusher_log_entries.append(
FlusherLogEntry(
project_and_trace,
len(segment),
sum(len(s) for s in segment),
)
)

if flusher_logger_enabled and flusher_log_entries:
self._flusher_logger.log(
flusher_log_entries,
load_ids_latency_ms,
load_data_latency_ms,
self._last_decompress_latency_ms,
)

metrics.timing("spans.buffer.flush_segments.num_segments", len(return_segments))
metrics.timing("spans.buffer.flush_segments.has_root_span", num_has_root_spans)

Expand All @@ -780,8 +750,6 @@ def _load_segment_data(

payloads: dict[SegmentKey, list[bytes]] = {key: [] for key in segment_keys}
payload_keys_map: dict[SegmentKey, list[PayloadKey]] = {key: [] for key in segment_keys}
self._last_decompress_latency_ms = 0
decompress_latency_ms = 0.0

# Maps each payload key back to the segment it belongs to.
# Multiple distributed payload keys map to one segment.
Expand Down Expand Up @@ -812,11 +780,7 @@ def _add_spans(key: SegmentKey, raw_data: bytes):
"""
Decompress and add spans to the segment.
"""
nonlocal decompress_latency_ms

decompress_start = time.monotonic()
decompressed = self._decompress_batch(raw_data)
decompress_latency_ms += (time.monotonic() - decompress_start) * 1000
payloads[key].extend(decompressed)

while cursors:
Expand Down Expand Up @@ -923,8 +887,6 @@ def _add_spans(key: SegmentKey, raw_data: bytes):
# worst-case.
metrics.incr("spans.buffer.empty_segments")

self._last_decompress_latency_ms = int(decompress_latency_ms)

return payloads, payload_keys_map

def done_flush_segments(self, segment_keys: dict[SegmentKey, FlushedSegment]):
Expand Down
93 changes: 0 additions & 93 deletions src/sentry/spans/buffer_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,6 @@
LOGGING_INTERVAL = 60 # 1 minute in seconds


class FlusherLogEntry(NamedTuple):
"""
Represents a single flush operation for a given project and trace.
"""

project_and_trace: str
span_count: int
bytes_flushed: int


class BufferAggregate(NamedTuple):
"""
Tracks the number of operations and cumulative latency for a given project and trace.
Expand All @@ -33,16 +23,6 @@ class BufferAggregate(NamedTuple):
cumulative_latency_ms: int


class FlusherAggregate(NamedTuple):
"""
Tracks the number of segments, spans, and bytes flushed for a given project and trace.
"""

segment_count: int
span_count: int
bytes_flushed: int


TAggregate = TypeVar("TAggregate", bound=tuple[Any, ...])


Expand Down Expand Up @@ -143,79 +123,6 @@ def log(self, entries: list[tuple[str, int]]) -> None:
)


class FlusherLogger:
"""
Tracks per-trace flush operations and logs the dominant traces by
cumulative bytes flushed.

This logger keeps a bounded map (max 50 entries) of project_and_trace keys
to their segment counts, span counts, and cumulative bytes.
Every minute the top 50 traces by cumulative bytes are logged at INFO level,
along with the cumulative per-phase latencies over the logging interval.
"""

def __init__(self) -> None:
self._metrics_per_trace: dict[str, FlusherAggregate] = {}
self._cumulative_load_ids_latency_ms: int = 0
self._cumulative_load_data_latency_ms: int = 0
self._cumulative_decompress_latency_ms: int = 0
self._last_log_time: float | None = None

def log(
self,
entries: list[FlusherLogEntry],
load_ids_latency_ms: int,
load_data_latency_ms: int,
decompress_latency_ms: int,
) -> None:
"""
Record a batch of flush operations and periodically log the top traces sorted by
cumulative bytes flushed.
"""

if not options.get("spans.buffer.flusher-cumulative-logger-enabled"):
return

self._cumulative_load_ids_latency_ms += load_ids_latency_ms
self._cumulative_load_data_latency_ms += load_data_latency_ms
self._cumulative_decompress_latency_ms += decompress_latency_ms

for entry in entries:
if entry.project_and_trace in self._metrics_per_trace:
aggregate = self._metrics_per_trace[entry.project_and_trace]
self._metrics_per_trace[entry.project_and_trace] = FlusherAggregate(
aggregate.segment_count + 1,
aggregate.span_count + entry.span_count,
aggregate.bytes_flushed + entry.bytes_flushed,
)
else:
self._metrics_per_trace[entry.project_and_trace] = FlusherAggregate(
1,
entry.span_count,
entry.bytes_flushed,
)

self._last_log_time = _prune_and_maybe_log(
self._metrics_per_trace,
self._last_log_time,
sort_index=2,
log_message="spans.buffer.top_flush_operations_by_bytes",
entries_key="top_flush_operations",
format_entry=lambda key, val: (
f"{key}:{val.segment_count}:{val.span_count}:{val.bytes_flushed}"
),
extra={
"cumulative_load_ids_latency_ms": self._cumulative_load_ids_latency_ms,
"cumulative_load_data_latency_ms": self._cumulative_load_data_latency_ms,
"cumulative_decompress_latency_ms": self._cumulative_decompress_latency_ms,
},
)
if self._last_log_time is None:
self._cumulative_load_ids_latency_ms = 0
self._cumulative_load_data_latency_ms = 0
self._cumulative_decompress_latency_ms = 0


type DataPoint = tuple[bytes, float]
type EvalshaData = list[DataPoint]

Expand Down
15 changes: 0 additions & 15 deletions src/sentry/spans/consumers/process/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,25 +330,10 @@ def produce(project_id: int, payload: KafkaPayload, dropped: int) -> None:
continue

with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}):
log_flushed_segments = options.get("spans.buffer.flusher.log-flushed-segments")

for segment_key, flushed_segment in flushed_segments.items():
if not flushed_segment.spans:
continue

if log_flushed_segments:
logger.info(
"spans.buffer.flushed_segment",
extra={
"segment_key": segment_key.decode("utf-8", errors="replace"),
"queue_key": flushed_segment.queue_key.decode(
"utf-8", errors="replace"
),
"span_count": len(flushed_segment.spans),
"project_id": flushed_segment.project_id,
},
)

for message in flushed_segment.to_messages():
kafka_payload = KafkaPayload(None, orjson.dumps(message), [])
metrics.timing(
Expand Down
4 changes: 1 addition & 3 deletions tests/sentry/spans/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@
"spans.buffer.flusher.max-unhealthy-seconds": 60,
"spans.buffer.flusher.use-stuck-detector": False,
"spans.buffer.flusher.flush-lock-ttl": 0,
"spans.buffer.flusher-cumulative-logger-enabled": False,
"spans.buffer.flusher.log-flushed-segments": False,
Comment thread
cursor[bot] marked this conversation as resolved.
"spans.buffer.done-flush-conditional-zrem": True,
"spans.buffer.compression.level": 0,
"spans.buffer.pipeline-batch-size": 0,
"spans.buffer.max-spans-per-evalsha": 0,
"spans.buffer.evalsha-latency-threshold": 100,
"spans.buffer.debug-traces": [],
"spans.buffer.evalsha-cumulative-logger-enabled": True,
"spans.buffer.debug-traces": [],
"spans.process-segments.schema-validation": 1.0,
}

Expand Down
Loading
Loading