Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@

- No longer writes Spans as trace items. ([#5152](https://github.com/getsentry/relay/pull/5152))
- Produce spans to `ingest-spans` by default. ([#5163](https://github.com/getsentry/relay/pull/5163))
- Add ability to produce Span V2 Kafka messages. ([#5151](https://github.com/getsentry/relay/pull/5151))
- Add `retentions` to the project configuration and use them for logs. ([#5135](https://github.com/getsentry/relay/pull/5135))
- Produce Span V2 Kafka messages. ([#5151](https://github.com/getsentry/relay/pull/5151), [#5173](https://github.com/getsentry/relay/pull/5173), [#5199](https://github.com/getsentry/relay/pull/5199))
- Modernize session processing and move to Relay's new processing framework. ([#5201](https://github.com/getsentry/relay/pull/5201))
- Add ability to produce Span V2 Kafka messages. ([#5151](https://github.com/getsentry/relay/pull/5151), [#5173](https://github.com/getsentry/relay/pull/5173))

## 25.9.0

Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ sentry = { version = "0.41.0", default-features = false, features = [
"transport",
] }
sentry-core = "0.41.0"
sentry-kafka-schemas = { version = "2.1.1", default-features = false }
sentry-kafka-schemas = { version = "2.1.6", default-features = false }
sentry-release-parser = { version = "1.3.2", default-features = false }
sentry-types = "0.41.0"
sentry_protos = "0.3.3"
Expand Down
8 changes: 0 additions & 8 deletions relay-dynamic-config/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,6 @@ pub struct Options {
)]
pub replay_relay_snuba_publish_disabled_sample_rate: f32,

/// Fraction of spans that are produced as backward-compatible Span V2 kafka messages.
#[serde(
rename = "relay.kafka.span-v2.sample-rate",
deserialize_with = "default_on_error",
skip_serializing_if = "is_default"
)]
pub span_kafka_v2_sample_rate: f32,

/// All other unknown options.
#[serde(flatten)]
other: HashMap<String, Value>,
Expand Down
8 changes: 7 additions & 1 deletion relay-server/src/managed/counted.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use relay_event_schema::protocol::{
OurLog, SessionAggregateItem, SessionAggregates, SessionUpdate, Span, SpanV2,
CompatSpan, OurLog, SessionAggregateItem, SessionAggregates, SessionUpdate, Span, SpanV2,
};
use relay_protocol::Annotated;
use relay_quotas::DataCategory;
Expand Down Expand Up @@ -105,6 +105,12 @@ impl Counted for Annotated<Span> {
}
}

impl Counted for Annotated<CompatSpan> {
fn quantities(&self) -> Quantities {
smallvec::smallvec![(DataCategory::Span, 1), (DataCategory::SpanIndexed, 1)]
}
}

impl Counted for ExtractedMetrics {
fn quantities(&self) -> Quantities {
// We only consider project metrics, sampling project metrics should never carry outcomes,
Expand Down
47 changes: 26 additions & 21 deletions relay-server/src/processing/spans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::sync::Arc;

use relay_event_normalization::GeoIpLookup;
use relay_event_schema::processor::ProcessingAction;
#[cfg(feature = "processing")]
use relay_event_schema::protocol::CompatSpan;
use relay_event_schema::protocol::SpanV2;
use relay_quotas::{DataCategory, RateLimits};

Expand Down Expand Up @@ -197,14 +199,24 @@ impl Forward for SpanOutput {
let envelope = spans.map(|spans, records| {
let mut items = Items::with_capacity(spans.spans.len());
for span in spans.spans {
let mut span = span.value.map_value(relay_spans::span_v2_to_span_v1);
use relay_protocol::Annotated;

let mut span = match span.value.map_value(CompatSpan::try_from) {
Annotated(Some(Result::Err(error)), _) => {
// TODO: Use records.internal_error(error, span)
relay_log::error!(
error = &error as &dyn std::error::Error,
"Failed to create CompatSpan"
);
continue;
}
Annotated(Some(Result::Ok(compat_span)), meta) => {
Annotated(Some(compat_span), meta)
}
Annotated(None, meta) => Annotated(None, meta),
};
if let Some(span) = span.value_mut() {
inject_server_sample_rate(span, spans.server_sample_rate);

// TODO: this needs to be done in a normalization step, which is yet to be
// implemented.
span.received =
relay_event_schema::protocol::Timestamp(chrono::Utc::now()).into();
inject_server_sample_rate(&mut span.span_v2, spans.server_sample_rate);
}

let mut item = Item::new(ItemType::Span);
Expand All @@ -215,7 +227,10 @@ impl Forward for SpanOutput {
continue;
}
};
item.set_payload(ContentType::Json, payload);
item.set_payload(ContentType::CompatSpan, payload);
if let Some(trace_id) = span.value().and_then(|s| s.span_v2.trace_id.value()) {
item.set_routing_hint(*trace_id.as_ref());
}
items.push(item);
}

Expand All @@ -238,24 +253,14 @@ impl Forward for SpanOutput {
/// Ideally we forward a proper data structure to the store instead, then we don't
/// have to inject the sample rate into a measurement.
#[cfg(feature = "processing")]
fn inject_server_sample_rate(
span: &mut relay_event_schema::protocol::Span,
server_sample_rate: Option<f64>,
) {
fn inject_server_sample_rate(span: &mut SpanV2, server_sample_rate: Option<f64>) {
let Some(server_sample_rate) = server_sample_rate.and_then(relay_protocol::FiniteF64::new)
else {
return;
};

let measurements = span.measurements.get_or_insert_with(Default::default);
measurements.0.insert(
"server_sample_rate".to_owned(),
relay_event_schema::protocol::Measurement {
value: server_sample_rate.into(),
unit: None.into(),
}
.into(),
);
let attributes = span.attributes.get_or_insert_with(Default::default);
attributes.insert("sentry.server_sample_rate", server_sample_rate.to_f64());
}

/// Spans in their serialized state, as transported in an envelope.
Expand Down
26 changes: 4 additions & 22 deletions relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ pub async fn process(
geo_lookup,
);

let org_id = managed_envelope.scoping().organization_id.value();
let client_ip = managed_envelope.envelope().meta().client_addr();
let filter_settings = &project_info.config.filter_settings;
let sampling_decision = sampling_result.decision();
Expand Down Expand Up @@ -230,8 +229,7 @@ pub async fn process(
}
};

let Ok(mut new_item) = create_span_item(annotated_span, &config, global_config, org_id)
else {
let Ok(mut new_item) = create_span_item(annotated_span, &config) else {
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
};

Expand All @@ -254,14 +252,9 @@ pub async fn process(
}
}

fn create_span_item(
span: Annotated<Span>,
config: &Config,
global_config: &GlobalConfig,
org_id: u64,
) -> Result<Item, ()> {
fn create_span_item(span: Annotated<Span>, config: &Config) -> Result<Item, ()> {
let mut new_item = Item::new(ItemType::Span);
if produce_compat_spans(config, global_config, org_id) {
if cfg!(feature = "processing") && config.processing_enabled() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: JSON Serialization Error Causes Data Loss

The create_span_item function serializes spans as ContentType::Json when processing is disabled. This creates an inconsistency because the store service rejects all ContentType::Json spans, causing them to be silently dropped and leading to data loss.

Fix in Cursor Fix in Web

let span_v2 = span.map_value(relay_spans::span_v1_to_span_v2);
let compat_span = match span_v2.map_value(CompatSpan::try_from) {
Annotated(Some(Result::Err(err)), _) => {
Expand Down Expand Up @@ -297,15 +290,6 @@ fn create_span_item(
Ok(new_item)
}

/// Whether or not to convert spans into backward-compatible V2 spans.
///
/// This only makes sense when we forward the envelope to Kafka.
fn produce_compat_spans(config: &Config, global_config: &GlobalConfig, org_id: u64) -> bool {
cfg!(feature = "processing")
&& config.processing_enabled()
&& utils::is_rolled_out(org_id, global_config.options.span_kafka_v2_sample_rate).is_keep()
}

fn add_sample_rate(measurements: &mut Annotated<Measurements>, name: &str, value: Option<f64>) {
let value = match value {
Some(value) if value > 0.0 => value,
Expand Down Expand Up @@ -352,8 +336,6 @@ pub fn extract_from_event(
.dsc()
.and_then(|ctx| ctx.sample_rate);

let org_id = managed_envelope.scoping().organization_id.value();

let mut add_span = |mut span: Span| {
add_sample_rate(
&mut span.measurements,
Expand Down Expand Up @@ -387,7 +369,7 @@ pub fn extract_from_event(
}
};

let Ok(mut item) = create_span_item(span, &config, global_config, org_id) else {
let Ok(mut item) = create_span_item(span, &config) else {
managed_envelope.track_outcome(
Outcome::Invalid(DiscardReason::InvalidSpan),
relay_quotas::DataCategory::SpanIndexed,
Expand Down
Loading
Loading