Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ dependencies = [
# [end] jsonschema format validators
"sentry-arroyo>=2.25.5",
"sentry-forked-email-reply-parser>=0.5.12.post1",
"sentry-kafka-schemas>=2.1.3",
"sentry-kafka-schemas>=2.1.6",
"sentry-ophio>=1.1.3",
"sentry-protos>=0.4.0",
"sentry-redis-tools>=0.5.0",
Expand Down
14 changes: 6 additions & 8 deletions src/sentry/insights/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ def from_span_v1(cls, span: dict[str, Any]) -> "FilterSpan":
)

@classmethod
def from_span_data(cls, data: dict[str, Any]) -> "FilterSpan":
"""Get relevant fields from `span.data`.

This will later be replaced by `from_span_attributes` or `from_span_v2`."""
def from_span_attributes(cls, attributes: dict[str, Any]) -> "FilterSpan":
"""Get relevant fields from `span.attributes`."""
return cls(
op=data.get("sentry.op"),
category=data.get("sentry.category"),
description=data.get("sentry.description"),
transaction_op=data.get("sentry.transaction_op"),
op=(attributes.get("sentry.op") or {}).get("value"),
category=(attributes.get("sentry.category") or {}).get("value"),
description=(attributes.get("sentry.description") or {}).get("value"),
transaction_op=(attributes.get("sentry.transaction_op") or {}).get("value"),
)


Expand Down
29 changes: 18 additions & 11 deletions src/sentry/spans/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@

from sentry import options
from sentry.processing.backpressure.memory import ServiceMemory, iter_cluster_memory_usage
from sentry.spans.consumers.process_segments.types import attribute_value
from sentry.utils import metrics, redis

# SegmentKey is an internal identifier used by the redis buffer that is also
Expand Down Expand Up @@ -129,7 +130,7 @@ class Span(NamedTuple):
segment_id: str | None
project_id: int
payload: bytes
end_timestamp_precise: float
end_timestamp: float
is_segment_span: bool = False

def effective_parent_id(self):
Expand Down Expand Up @@ -339,7 +340,7 @@ def _group_by_parent(self, spans: Sequence[Span]) -> dict[tuple[str, str], list[

def _prepare_payloads(self, spans: list[Span]) -> dict[str | bytes, float]:
if self._zstd_compressor is None:
return {span.payload: span.end_timestamp_precise for span in spans}
return {span.payload: span.end_timestamp for span in spans}

combined = b"\x00".join(span.payload for span in spans)
original_size = len(combined)
Expand All @@ -354,7 +355,7 @@ def _prepare_payloads(self, spans: list[Span]) -> dict[str | bytes, float]:
metrics.timing("spans.buffer.compression.compressed_size", compressed_size)
metrics.timing("spans.buffer.compression.compression_ratio", compression_ratio)

min_timestamp = min(span.end_timestamp_precise for span in spans)
min_timestamp = min(span.end_timestamp for span in spans)
return {compressed: min_timestamp}

def _decompress_batch(self, compressed_data: bytes) -> list[bytes]:
Expand Down Expand Up @@ -428,17 +429,23 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]:
has_root_span = False
metrics.timing("spans.buffer.flush_segments.num_spans_per_segment", len(segment))
for payload in segment:
val = orjson.loads(payload)

if not val.get("segment_id"):
val["segment_id"] = segment_span_id

is_segment = segment_span_id == val["span_id"]
val["is_segment"] = is_segment
span = orjson.loads(payload)

if not attribute_value(span, "sentry.segment.id"):
span.setdefault("attributes", {})["sentry.segment.id"] = {
"type": "string",
"value": segment_span_id,
}

is_segment = segment_span_id == span["span_id"]
span.setdefault("attributes", {})["sentry.is_segment"] = {
"type": "boolean",
"value": is_segment,
}
if is_segment:
has_root_span = True

output_spans.append(OutputSpan(payload=val))
output_spans.append(OutputSpan(payload=span))

metrics.incr(
"spans.buffer.flush_segments.num_segments_per_shard", tags={"shard_i": shard}
Expand Down
5 changes: 3 additions & 2 deletions src/sentry/spans/consumers/process/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from sentry import killswitches
from sentry.spans.buffer import Span, SpansBuffer
from sentry.spans.consumers.process.flusher import SpanFlusher
from sentry.spans.consumers.process_segments.types import attribute_value
from sentry.utils import metrics
from sentry.utils.arroyo import MultiprocessingPool, SetJoinTimeout, run_task_with_multiprocessing

Expand Down Expand Up @@ -182,10 +183,10 @@ def process_batch(
trace_id=val["trace_id"],
span_id=val["span_id"],
parent_span_id=val.get("parent_span_id"),
segment_id=val.get("segment_id"),
segment_id=cast(str | None, attribute_value(val, "sentry.segment.id")),
project_id=val["project_id"],
payload=payload.value,
end_timestamp_precise=val["end_timestamp_precise"],
end_timestamp=val["end_timestamp"],
is_segment_span=bool(val.get("parent_span_id") is None or val.get("is_remote")),
)
spans.append(span)
Expand Down
102 changes: 59 additions & 43 deletions src/sentry/spans/consumers/process_segments/convert.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from collections.abc import MutableMapping
from typing import Any, cast

import orjson
Expand All @@ -13,58 +12,69 @@
I64_MAX = 2**63 - 1

FIELD_TO_ATTRIBUTE = {
"description": "sentry.raw_description",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer necessary because sentry.description is instead renamed to sentry.raw_description?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly!

"duration_ms": "sentry.duration_ms",
"is_segment": "sentry.is_segment",
"exclusive_time_ms": "sentry.exclusive_time_ms",
"start_timestamp_precise": "sentry.start_timestamp_precise",
"end_timestamp_precise": "sentry.end_timestamp_precise",
"end_timestamp": "sentry.end_timestamp_precise",
"event_id": "sentry.event_id",
"hash": "sentry.hash",
"is_remote": "sentry.is_remote",
"kind": "sentry.kind",
"name": "sentry.name",
"parent_span_id": "sentry.parent_span_id",
"profile_id": "sentry.profile_id",
"segment_id": "sentry.segment_id",
"received": "sentry.received",
"origin": "sentry.origin",
"kind": "sentry.kind",
"hash": "sentry.hash",
"event_id": "sentry.event_id",
"start_timestamp": "sentry.start_timestamp_precise",
}

RENAME_ATTRIBUTES = {
"sentry.description": "sentry.raw_description",
"sentry.segment.id": "sentry.segment_id",
}


def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
attributes: MutableMapping[str, AnyValue] = {} # TODO
attributes: dict[str, AnyValue] = {}

client_sample_rate = 1.0
server_sample_rate = 1.0

# This key is ambiguous. sentry-conventions and relay interpret it as "raw description",
# sentry interprets it as normalized_description.
# See https://github.com/getsentry/sentry/blob/7f2ccd1d03e8845a833fe1ee6784bce0c7f0b935/src/sentry/search/eap/spans/attributes.py#L596.
# Delete it and relay on top-level `description` for now.
(span.get("data") or {}).pop("sentry.description", None)

for k, v in (span.get("data") or {}).items():
if v is not None:
try:
attributes[k] = _anyvalue(v)
except Exception:
sentry_sdk.capture_exception()
else:
if k == "sentry.client_sample_rate":
try:
client_sample_rate = float(v)
except ValueError:
pass
elif k == "sentry.server_sample_rate":
try:
server_sample_rate = float(v)
except ValueError:
pass
for k, attribute in (span.get("attributes") or {}).items():
if attribute is None:
continue
if (value := attribute.get("value")) is None:
continue
try:
# NOTE: This ignores the `type` field of the attribute itself
attributes[k] = _anyvalue(value)
except Exception:
sentry_sdk.capture_exception()
else:
if k == "sentry.client_sample_rate":
try:
client_sample_rate = float(value) # type:ignore[arg-type]
except ValueError:
pass
elif k == "sentry.server_sample_rate":
try:
server_sample_rate = float(value) # type:ignore[arg-type]
except ValueError:
pass

for field_name, attribute_name in FIELD_TO_ATTRIBUTE.items():
v = span.get(field_name)
if v is not None:
attributes[attribute_name] = _anyvalue(v)
attribute = span.get(field_name) # type:ignore[assignment]
if attribute is not None:
attributes[attribute_name] = _anyvalue(attribute)

# Rename some attributes from their sentry-conventions name to what the product currently expects.
# Eventually this should all be handled by deprecation policies in sentry-conventions.
for convention_name, eap_name in RENAME_ATTRIBUTES.items():
if convention_name in attributes:
attributes[eap_name] = attributes.pop(convention_name)

try:
# TODO: Move this to Relay
attributes["sentry.duration_ms"] = AnyValue(
int_value=int(1000 * (span["end_timestamp"] - span["start_timestamp"]))
)
except Exception:
sentry_sdk.capture_exception()

if links := span.get("links"):
try:
Expand All @@ -80,7 +90,7 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
trace_id=span["trace_id"],
item_id=int(span["span_id"], 16).to_bytes(16, "little"),
item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
timestamp=_timestamp(span["start_timestamp_precise"]),
timestamp=_timestamp(span["start_timestamp"]),
attributes=attributes,
client_sample_rate=client_sample_rate,
server_sample_rate=server_sample_rate,
Expand Down Expand Up @@ -132,7 +142,10 @@ def _sanitize_span_link(link: SpanLink) -> SpanLink:
# might be an intermediary state where there is a pre-existing dropped
# attributes count. Respect that count, if it's present. It should always be
# an integer.
dropped_attributes_count = attributes.get("sentry.dropped_attributes_count", 0)
try:
dropped_attributes_count = int(attributes["sentry.dropped_attributes_count"]["value"]) # type: ignore[arg-type,index]
except (KeyError, ValueError, TypeError):
dropped_attributes_count = 0

for key, value in attributes.items():
if key in ALLOWED_LINK_ATTRIBUTE_KEYS:
Expand All @@ -141,7 +154,10 @@ def _sanitize_span_link(link: SpanLink) -> SpanLink:
dropped_attributes_count += 1

if dropped_attributes_count > 0:
allowed_attributes["sentry.dropped_attributes_count"] = dropped_attributes_count
allowed_attributes["sentry.dropped_attributes_count"] = {
"type": "integer",
"value": dropped_attributes_count,
}

# Only include the `attributes` key if the key was present in the original
# link, don't create a an empty object, since there is a semantic difference
Expand Down
Loading
Loading