diff --git a/src/sentry/spans/README.md b/src/sentry/spans/README.md index 6955d8a8a502ce..e21ac13eb319ae 100644 --- a/src/sentry/spans/README.md +++ b/src/sentry/spans/README.md @@ -83,14 +83,9 @@ event types are limited in terms of frequency. breaks the structure of the trace as the missing spans may be anywhere in the tree. - - As we extract the subsegments and reassemble them, if the segment is too big - we either drop it or chunk it depending on the - `spans.buffer.chunk-oversized-segments` option: - - **Default (disabled)**: The segment is dropped entirely and an `invalid` - outcome is recorded. - - **Enabled**: The segment is kept and split into multiple Kafka messages, - each within `max-segment-bytes`, and every chunk is sent with the flag - `skip_enrichment=True`. + - As we extract the subsegments and reassemble them, if the segment size exceeds + the `max-segment-bytes` limit, we chunk it into multiple Kafka messages, each within + the above size limit, and every chunk is sent with the flag `skip_enrichment=True`. ### Flushing segments diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index b0771943012d75..3f5cbbfc6c0174 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -197,16 +197,12 @@ def to_messages(self) -> list[dict[str, Any]]: """ Build producer messages for this segment. - If chunk-oversized-segments is enabled and the segment exceeds - max_segment_bytes, the segment is split into multiple messages with - skip_enrichment=True. Otherwise, returns a single message. + If the segment size exceeds `spans.buffer.max_segment_bytes`, the segment is split + into multiple messages with skip_enrichment=True. Otherwise, returns a single message. """ - chunk_oversized_segments = options.get("spans.buffer.chunk-oversized-segments") max_segment_bytes = options.get("spans.buffer.max-segment-bytes") spans: list[SpanPayload] = [span.payload for span in self.spans] - if not chunk_oversized_segments: - return [{"spans": spans}] sizes = [len(orjson.dumps(s)) for s in spans] if sum(sizes) <= max_segment_bytes: @@ -228,7 +224,11 @@ def to_messages(self) -> list[dict[str, Any]]: messages.append({"spans": current, "skip_enrichment": True}) if len(messages) > 1: - metrics.incr("spans.buffer.oversized_segments_chunked_messages", len(messages)) + metrics.timing( + "spans.buffer.oversized_segments_chunked", + len(messages), + ) + metrics.timing("spans.buffer.oversized_segments_size", sum(sizes)) return messages @@ -764,11 +764,9 @@ def _load_segment_data( """ page_size = options.get("spans.buffer.segment-page-size") - max_segment_bytes = options.get("spans.buffer.max-segment-bytes") payloads: dict[SegmentKey, list[bytes]] = {key: [] for key in segment_keys} payload_keys_map: dict[SegmentKey, list[PayloadKey]] = {key: [] for key in segment_keys} - sizes: dict[SegmentKey, int] = {key: 0 for key in segment_keys} self._last_decompress_latency_ms = 0 decompress_latency_ms = 0.0 @@ -795,31 +793,18 @@ def _load_segment_data( cursors[payload_key] = 0 payload_keys_map[key] = segment_payload_keys - chunk_oversized_segments = options.get("spans.buffer.chunk-oversized-segments") dropped_segments: set[SegmentKey] = set() - def _add_spans(key: SegmentKey, raw_data: bytes) -> bool: + def _add_spans(key: SegmentKey, raw_data: bytes): """ - Decompress and add spans to the segment. Returns False if the - segment exceeded max_segment_bytes and was dropped. + Decompress and add spans to the segment. """ nonlocal decompress_latency_ms decompress_start = time.monotonic() decompressed = self._decompress_batch(raw_data) decompress_latency_ms += (time.monotonic() - decompress_start) * 1000 - - sizes[key] = sizes.get(key, 0) + sum(len(span) for span in decompressed) - if sizes[key] > max_segment_bytes and not chunk_oversized_segments: - metrics.incr("spans.buffer.flush_segments.segment_size_exceeded") - logger.warning("Skipping too large segment, byte size %s", sizes[key]) - payloads.pop(key, None) - sizes.pop(key, None) - dropped_segments.add(key) - return False - payloads[key].extend(decompressed) - return True while cursors: with self.client.pipeline(transaction=False) as p: @@ -836,15 +821,11 @@ def _add_spans(key: SegmentKey, raw_data: bytes) -> bool: cursors.pop(key, None) continue - size_exceeded = False for scan_value in scan_values: if segment_key in payloads: - if not _add_spans(segment_key, scan_value): - size_exceeded = True + _add_spans(segment_key, scan_value) - if size_exceeded: - cursors.pop(key, None) - elif cursor == 0: + if cursor == 0: del cursors[key] else: cursors[key] = cursor diff --git a/tests/sentry/spans/test_buffer.py b/tests/sentry/spans/test_buffer.py index 6f319f69fdaafc..1e4c8e30c8d41e 100644 --- a/tests/sentry/spans/test_buffer.py +++ b/tests/sentry/spans/test_buffer.py @@ -40,7 +40,6 @@ "spans.buffer.evalsha-cumulative-logger-enabled": True, "spans.buffer.enforce-segment-size": False, "spans.process-segments.schema-validation": 1.0, - "spans.buffer.chunk-oversized-segments": False, } @@ -838,9 +837,10 @@ def test_max_segment_spans_limit(mock_project_model, buffer: SpansBuffer) -> Non buffer.process_spans(batch2, now=0) rv = buffer.flush_segments(now=11) - # The entire segment should be dropped because it exceeds max_segment_bytes. + # The segment is kept even though it exceeds max_segment_bytes, + # because oversized segments are chunked at the message level. segment = rv[_segment_id(1, "a" * 32, "a" * 16)] - assert segment.spans == [] + assert len(segment.spans) == 5 @mock.patch("sentry.spans.buffer.Project") @@ -960,135 +960,9 @@ def test_max_segment_bytes_under_limit_merges_normally( assert span_ids == {"b" * 16, "c" * 16} -@mock.patch("sentry.spans.buffer.Project") -@mock.patch("sentry.spans.buffer.track_outcome") -@mock.patch("sentry.spans.buffer.metrics.timing") -def test_dropped_spans_emit_outcomes( - mock_metrics, mock_track_outcome, mock_project_model, buffer: SpansBuffer -) -> None: - """Test that outcomes are emitted when Redis drops spans due to size limit.""" - from sentry.constants import DataCategory - from sentry.utils.outcomes import Outcome - - # Mock the project lookup - mock_project = mock.Mock() - mock_project.id = 1 - mock_project.organization_id = 100 - mock_project_model.objects.get_from_cache.return_value = mock_project - - payload_a = _payload("a" * 16) - payload_b = _payload("b" * 16) - payload_c = _payload("c" * 16) - payload_d = _payload("d" * 16) - payload_e = _payload("e" * 16) - payload_f = _payload("f" * 16) - - # Create a segment with many spans that will exceed the Redis memory limit - batch1 = [ - Span( - payload=payload_b, - trace_id="a" * 32, - span_id="b" * 16, - parent_span_id="a" * 16, - segment_id=None, - project_id=1, - ), - Span( - payload=payload_c, - trace_id="a" * 32, - span_id="c" * 16, - parent_span_id="a" * 16, - segment_id=None, - project_id=1, - ), - Span( - payload=payload_d, - trace_id="a" * 32, - span_id="d" * 16, - parent_span_id="a" * 16, - segment_id=None, - project_id=1, - ), - ] - batch2 = [ - Span( - payload=payload_e, - trace_id="a" * 32, - span_id="e" * 16, - parent_span_id="a" * 16, - segment_id=None, - project_id=1, - ), - Span( - payload=payload_f, - trace_id="a" * 32, - span_id="f" * 16, - parent_span_id="a" * 16, - segment_id=None, - project_id=1, - ), - Span( - payload=payload_a, - trace_id="a" * 32, - span_id="a" * 16, - parent_span_id=None, - project_id=1, - segment_id=None, - is_segment_span=True, - ), - ] - - expected_bytes = sum( - len(p) for p in [payload_a, payload_b, payload_c, payload_d, payload_e, payload_f] - ) - - # Set a very small max-segment-bytes to force Redis to drop spans - with override_options({"spans.buffer.max-segment-bytes": 100}): - buffer.process_spans(batch1, now=0) - buffer.process_spans(batch2, now=0) - buffer.flush_segments(now=11) - - # Verify that track_outcome was called - assert mock_track_outcome.called, "track_outcome should be called when spans are dropped" - - # Find the call with INVALID outcome - outcome_calls = [ - call - for call in mock_track_outcome.call_args_list - if call.kwargs.get("outcome") == Outcome.INVALID - ] - assert len(outcome_calls) > 0, "Should have at least one INVALID outcome" - - # Verify the outcome details - outcome_call = outcome_calls[0] - assert outcome_call.kwargs["org_id"] == 100 - assert outcome_call.kwargs["project_id"] == 1 - assert outcome_call.kwargs["outcome"] == Outcome.INVALID - assert outcome_call.kwargs["reason"] == "segment_too_large" - assert outcome_call.kwargs["category"] == DataCategory.SPAN_INDEXED - assert outcome_call.kwargs["quantity"] > 0, "Should have dropped at least some spans" - - # Verify ingested span count and byte count metrics were emitted - ingested_spans_timing_calls = [ - call - for call in mock_metrics.call_args_list - if call.args and call.args[0] == "spans.buffer.flush_segments.ingested_spans_per_segment" - ] - assert len(ingested_spans_timing_calls) == 1, "Should emit ingested_spans_per_segment metric" - assert ingested_spans_timing_calls[0].args[1] == 6, "Should have ingested 6 spans" - - ingested_bytes_timing_calls = [ - call - for call in mock_metrics.call_args_list - if call.args and call.args[0] == "spans.buffer.flush_segments.ingested_bytes_per_segment" - ] - assert len(ingested_bytes_timing_calls) == 1, "Should emit ingested_bytes_per_segment metric" - assert ingested_bytes_timing_calls[0].args[1] == expected_bytes - - @mock.patch("sentry.spans.buffer.Project") def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> None: - """When chunk-oversized-segments is enabled, oversized segments are kept instead of dropped.""" + """Test that oversized segments are kept instead of dropped.""" mock_project = mock.Mock() mock_project.id = 1 mock_project.organization_id = 100 @@ -1140,9 +1014,7 @@ def test_flush_oversized_segments(mock_project_model, buffer: SpansBuffer) -> No ), ] - with override_options( - {"spans.buffer.max-segment-bytes": 100, "spans.buffer.chunk-oversized-segments": True} - ): + with override_options({"spans.buffer.max-segment-bytes": 100}): buffer.process_spans(batch1, now=0) buffer.process_spans(batch2, now=0) rv = buffer.flush_segments(now=11) @@ -1178,7 +1050,6 @@ def test_to_messages_under_limit(buffer: SpansBuffer) -> None: with override_options( { **DEFAULT_OPTIONS, - "spans.buffer.chunk-oversized-segments": True, "spans.buffer.max-segment-bytes": 10000, } ): @@ -1224,7 +1095,6 @@ def test_to_messages_splits_oversized(buffer: SpansBuffer) -> None: with override_options( { **DEFAULT_OPTIONS, - "spans.buffer.chunk-oversized-segments": True, "spans.buffer.max-segment-bytes": 500, } ): @@ -1254,7 +1124,6 @@ def test_to_messages_single_large_span(buffer: SpansBuffer) -> None: with override_options( { **DEFAULT_OPTIONS, - "spans.buffer.chunk-oversized-segments": True, "spans.buffer.max-segment-bytes": 10, } ): @@ -1263,25 +1132,6 @@ def test_to_messages_single_large_span(buffer: SpansBuffer) -> None: assert messages[0]["skip_enrichment"] is True -def test_to_messages_no_chunking_when_option_disabled(buffer: SpansBuffer) -> None: - """When chunk-oversized-segments is disabled, always returns a single message.""" - segment = FlushedSegment( - queue_key=b"test", - spans=[OutputSpan(payload={"span_id": "a" * 16})], - project_id=1, - ) - with override_options( - { - **DEFAULT_OPTIONS, - "spans.buffer.chunk-oversized-segments": False, - "spans.buffer.max-segment-bytes": 10, - } - ): - messages = segment.to_messages() - assert len(messages) == 1 - assert "skip_enrichment" not in messages[0] - - def test_kafka_slice_id(buffer: SpansBuffer) -> None: with override_options(DEFAULT_OPTIONS): buffer = SpansBuffer(assigned_shards=list(range(1)), slice_id=2)