diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e487490d9..f275bc1c8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ - Copy transaction tags to the profile. ([#1982](https://github.com/getsentry/relay/pull/1982)) - Lower default max compressed replay recording segment size to 10 MiB. ([#2031](https://github.com/getsentry/relay/pull/2031)) - Increase chunking limit to 15MB for replay recordings. ([#2032](https://github.com/getsentry/relay/pull/2032)) -- Add a data category for indexed profiles. ([#2051](https://github.com/getsentry/relay/pull/2051)) +- Add a data category for indexed profiles. ([#2051](https://github.com/getsentry/relay/pull/2051), [#2071](https://github.com/getsentry/relay/pull/2071)) - Differentiate between `Profile` and `ProfileIndexed` outcomes. ([#2054](https://github.com/getsentry/relay/pull/2054)) - Split dynamic sampling implementation before refactoring. ([#2047](https://github.com/getsentry/relay/pull/2047)) - Refactor dynamic sampling implementation across `relay-server` and `relay-sampling`. ([#2066](https://github.com/getsentry/relay/pull/2066)) diff --git a/relay-server/src/actors/outcome.rs b/relay-server/src/actors/outcome.rs index 1b9c97a245..3728324a95 100644 --- a/relay-server/src/actors/outcome.rs +++ b/relay-server/src/actors/outcome.rs @@ -895,8 +895,6 @@ impl OutcomeBroker { /// Returns true if the outcome represents profiles dropped by dynamic sampling. #[cfg(feature = "processing")] fn is_sampled_profile(outcome: &TrackRawOutcome) -> bool { - // Older external Relays will still emit a `Profile` outcome. - // Newer Relays will emit a `ProfileIndexed` outcome. (outcome.category == Some(DataCategory::Profile as u8) || outcome.category == Some(DataCategory::ProfileIndexed as u8)) && outcome.outcome == OutcomeId::FILTERED diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index 180ccb27c6..68f0efbebd 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -1097,10 +1097,13 @@ impl EnvelopeProcessorService { fn count_processed_profiles(&self, state: &mut ProcessEnvelopeState) { let profile_count: usize = state .managed_envelope - .envelope() - .items() + .envelope_mut() + .items_mut() .filter(|item| item.ty() == &ItemType::Profile) - .map(|item| item.quantity()) + .map(|item| { + item.set_profile_counted_as_processed(); + item.quantity() + }) .sum(); if profile_count == 0 { @@ -1115,7 +1118,11 @@ impl EnvelopeProcessorService { remote_addr: None, category: DataCategory::Profile, quantity: profile_count as u32, // truncates to `u32::MAX` - }) + }); + + // TODO: At this point, we should also ensure that the envelope summary gets recomputed. + // But recomputing the summary after extracting the event is currently problematic, because it + // sets the envelope type to `None`. This needs to be solved in a follow-up. } /// Process profiles and set the profile ID in the profile context on the transaction if successful diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 5b004b4b07..b22deffdf2 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -481,6 +481,13 @@ pub struct ItemHeaders { #[serde(default, skip_serializing_if = "is_false")] metrics_extracted: bool, + /// Internal flag to signal that the profile has been counted toward `DataCategory::Profile`. + /// + /// If `true`, outcomes for this item should be reported as `DataCategory::ProfileIndexed` + /// instead. + #[serde(skip)] + profile_counted_as_processed: bool, + /// Other attributes for forward compatibility. #[serde(flatten)] other: BTreeMap, @@ -507,6 +514,7 @@ impl Item { timestamp: None, other: BTreeMap::new(), metrics_extracted: false, + profile_counted_as_processed: false, }, payload: Bytes::new(), } @@ -550,7 +558,9 @@ impl Item { ItemType::Metrics | ItemType::MetricBuckets => None, ItemType::FormData => None, ItemType::UserReport => None, - ItemType::Profile => Some(if indexed { + // For profiles, do not used the `indexed` flag, because it depends + // on whether metrics were extracted from the _event_. + ItemType::Profile => Some(if self.headers.profile_counted_as_processed { DataCategory::ProfileIndexed } else { DataCategory::Profile @@ -653,6 +663,21 @@ impl Item { self.headers.metrics_extracted } + /// Returns `true` if the profiles in the envelope have been counted towards `DataCategory::Profile`. + /// + /// If so, count them towards `DataCategory::ProfileIndexed` instead. + pub fn profile_counted_as_processed(&self) -> bool { + self.headers.profile_counted_as_processed + } + + /// Mark the item as "counted towards `DataCategory::Profile`". + #[cfg(feature = "processing")] + pub fn set_profile_counted_as_processed(&mut self) { + if self.ty() == &ItemType::Profile { + self.headers.profile_counted_as_processed = true; + } + } + /// Sets the metrics extracted flag. pub fn set_metrics_extracted(&mut self, metrics_extracted: bool) { self.headers.metrics_extracted = metrics_extracted; diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index c45e8ca978..84111f4c0f 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -312,7 +312,7 @@ impl ManagedEnvelope { if self.context.summary.profile_quantity > 0 { self.track_outcome( outcome, - if self.context.summary.event_metrics_extracted { + if self.context.summary.profile_counted_as_processed { DataCategory::ProfileIndexed } else { DataCategory::Profile diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 3bc0da73f4..167f1ea24f 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -143,6 +143,11 @@ pub struct EnvelopeSummary { /// Whether the envelope contains an event which already had the metrics extracted. pub event_metrics_extracted: bool, + /// Whether profiles in the envelope have been counted towards `DataCategory::Profile`. + /// + /// If `true`, count them towards `DataCategory::ProfileIndexed` instead. + pub profile_counted_as_processed: bool, + /// The payload size of this envelope. pub payload_size: usize, } @@ -169,6 +174,10 @@ impl EnvelopeSummary { summary.event_metrics_extracted = true; } + if *item.ty() == ItemType::Profile && item.profile_counted_as_processed() { + summary.profile_counted_as_processed = true; + } + // If the item has been rate limited before, the quota has been consumed and outcomes // emitted. We can skip it here. if item.rate_limited() { @@ -510,7 +519,7 @@ where // It makes no sense to store profiles without transactions, so if the event // is rate limited, rate limit profiles as well. - let profile_category = if summary.event_metrics_extracted { + let profile_category = if summary.profile_counted_as_processed { DataCategory::ProfileIndexed } else { DataCategory::Profile @@ -553,7 +562,7 @@ where let item_scoping = scoping.item(DataCategory::Profile); let profile_limits = (self.check)(item_scoping, summary.profile_quantity)?; enforcement.profiles = CategoryLimit::new( - if summary.event_metrics_extracted { + if summary.profile_counted_as_processed { DataCategory::ProfileIndexed } else { DataCategory::Profile diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index b2449f84f7..20c999c1c8 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -1212,13 +1212,11 @@ def make_envelope(): assert outcomes == expected_outcomes, outcomes -@pytest.mark.parametrize("metrics_already_extracted", [False, True]) @pytest.mark.parametrize("quota_category", ["transaction", "profile"]) def test_profile_outcomes_rate_limited( mini_sentry, relay_with_processing, outcomes_consumer, - metrics_already_extracted, quota_category, ): """ @@ -1268,7 +1266,7 @@ def test_profile_outcomes_rate_limited( Item( payload=PayloadRef(bytes=json.dumps(payload).encode()), type="transaction", - headers={"metrics_extracted": metrics_already_extracted}, + headers={"metrics_extracted": True}, ) ) envelope.add_item(Item(payload=PayloadRef(bytes=profile), type="profile")) @@ -1294,7 +1292,7 @@ def test_profile_outcomes_rate_limited( expected_outcomes += [ { - "category": 11 if metrics_already_extracted else 6, + "category": 6, "key_id": 123, "org_id": 1, "outcome": 2, # RateLimited