Skip to content

Commit

Permalink
fix(profiles): Only use ProfileIndexed after counting as Profile (#…
Browse files Browse the repository at this point in the history
…2071)

In #2056, #2060 and #2061, I wrongly assumed that dropped profiles
should always be counted as `ProfileIndexed` after metrics extraction
and dynamic sampling, in order to not double-count towards `Profile`.
This is wrong, because as the PR description of #2054 clearly states,
profiles that are not dropped by dynamic sampling are only counted as
accepted in processing relays, _after_ dynamic sampling and rate
limiting:

> In processing Relays, if an envelope still contains profiles after
dynamic sampling, log an Accepted outcome for the "processed" category.
By restricting this to processing Relays, we can be sure that every
profile is only counted once.

Instead of checking the `metrics_extracted` flag, introduce a new flag
that explicitly states whether a profile was already counted towards
`DataCategory::Profile` or not, and evaluate that flag instead of
`metrics_extracted`.
  • Loading branch information
jjbayer committed Apr 26, 2023
1 parent 3970043 commit 8c499b5
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 15 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
- Copy transaction tags to the profile. ([#1982](https://github.com/getsentry/relay/pull/1982))
- Lower default max compressed replay recording segment size to 10 MiB. ([#2031](https://github.com/getsentry/relay/pull/2031))
- Increase chunking limit to 15MB for replay recordings. ([#2032](https://github.com/getsentry/relay/pull/2032))
- Add a data category for indexed profiles. ([#2051](https://github.com/getsentry/relay/pull/2051))
- Add a data category for indexed profiles. ([#2051](https://github.com/getsentry/relay/pull/2051), [#2071](https://github.com/getsentry/relay/pull/2071))
- Differentiate between `Profile` and `ProfileIndexed` outcomes. ([#2054](https://github.com/getsentry/relay/pull/2054))
- Split dynamic sampling implementation before refactoring. ([#2047](https://github.com/getsentry/relay/pull/2047))
- Refactor dynamic sampling implementation across `relay-server` and `relay-sampling`. ([#2066](https://github.com/getsentry/relay/pull/2066))
Expand Down
2 changes: 0 additions & 2 deletions relay-server/src/actors/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,6 @@ impl OutcomeBroker {
/// Returns true if the outcome represents profiles dropped by dynamic sampling.
#[cfg(feature = "processing")]
fn is_sampled_profile(outcome: &TrackRawOutcome) -> bool {
// Older external Relays will still emit a `Profile` outcome.
// Newer Relays will emit a `ProfileIndexed` outcome.
(outcome.category == Some(DataCategory::Profile as u8)
|| outcome.category == Some(DataCategory::ProfileIndexed as u8))
&& outcome.outcome == OutcomeId::FILTERED
Expand Down
15 changes: 11 additions & 4 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,10 +1097,13 @@ impl EnvelopeProcessorService {
fn count_processed_profiles(&self, state: &mut ProcessEnvelopeState) {
let profile_count: usize = state
.managed_envelope
.envelope()
.items()
.envelope_mut()
.items_mut()
.filter(|item| item.ty() == &ItemType::Profile)
.map(|item| item.quantity())
.map(|item| {
item.set_profile_counted_as_processed();
item.quantity()
})
.sum();

if profile_count == 0 {
Expand All @@ -1115,7 +1118,11 @@ impl EnvelopeProcessorService {
remote_addr: None,
category: DataCategory::Profile,
quantity: profile_count as u32, // truncates to `u32::MAX`
})
});

// TODO: At this point, we should also ensure that the envelope summary gets recomputed.
// But recomputing the summary after extracting the event is currently problematic, because it
// sets the envelope type to `None`. This needs to be solved in a follow-up.
}

/// Process profiles and set the profile ID in the profile context on the transaction if successful
Expand Down
27 changes: 26 additions & 1 deletion relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,13 @@ pub struct ItemHeaders {
#[serde(default, skip_serializing_if = "is_false")]
metrics_extracted: bool,

/// Internal flag to signal that the profile has been counted toward `DataCategory::Profile`.
///
/// If `true`, outcomes for this item should be reported as `DataCategory::ProfileIndexed`
/// instead.
#[serde(skip)]
profile_counted_as_processed: bool,

/// Other attributes for forward compatibility.
#[serde(flatten)]
other: BTreeMap<String, Value>,
Expand All @@ -507,6 +514,7 @@ impl Item {
timestamp: None,
other: BTreeMap::new(),
metrics_extracted: false,
profile_counted_as_processed: false,
},
payload: Bytes::new(),
}
Expand Down Expand Up @@ -550,7 +558,9 @@ impl Item {
ItemType::Metrics | ItemType::MetricBuckets => None,
ItemType::FormData => None,
ItemType::UserReport => None,
ItemType::Profile => Some(if indexed {
// For profiles, do not used the `indexed` flag, because it depends
// on whether metrics were extracted from the _event_.
ItemType::Profile => Some(if self.headers.profile_counted_as_processed {
DataCategory::ProfileIndexed
} else {
DataCategory::Profile
Expand Down Expand Up @@ -653,6 +663,21 @@ impl Item {
self.headers.metrics_extracted
}

/// Returns `true` if the profiles in the envelope have been counted towards `DataCategory::Profile`.
///
/// If so, count them towards `DataCategory::ProfileIndexed` instead.
pub fn profile_counted_as_processed(&self) -> bool {
self.headers.profile_counted_as_processed
}

/// Mark the item as "counted towards `DataCategory::Profile`".
#[cfg(feature = "processing")]
pub fn set_profile_counted_as_processed(&mut self) {
if self.ty() == &ItemType::Profile {
self.headers.profile_counted_as_processed = true;
}
}

/// Sets the metrics extracted flag.
pub fn set_metrics_extracted(&mut self, metrics_extracted: bool) {
self.headers.metrics_extracted = metrics_extracted;
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/utils/managed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl ManagedEnvelope {
if self.context.summary.profile_quantity > 0 {
self.track_outcome(
outcome,
if self.context.summary.event_metrics_extracted {
if self.context.summary.profile_counted_as_processed {
DataCategory::ProfileIndexed
} else {
DataCategory::Profile
Expand Down
13 changes: 11 additions & 2 deletions relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ pub struct EnvelopeSummary {
/// Whether the envelope contains an event which already had the metrics extracted.
pub event_metrics_extracted: bool,

/// Whether profiles in the envelope have been counted towards `DataCategory::Profile`.
///
/// If `true`, count them towards `DataCategory::ProfileIndexed` instead.
pub profile_counted_as_processed: bool,

/// The payload size of this envelope.
pub payload_size: usize,
}
Expand All @@ -169,6 +174,10 @@ impl EnvelopeSummary {
summary.event_metrics_extracted = true;
}

if *item.ty() == ItemType::Profile && item.profile_counted_as_processed() {
summary.profile_counted_as_processed = true;
}

// If the item has been rate limited before, the quota has been consumed and outcomes
// emitted. We can skip it here.
if item.rate_limited() {
Expand Down Expand Up @@ -510,7 +519,7 @@ where

// It makes no sense to store profiles without transactions, so if the event
// is rate limited, rate limit profiles as well.
let profile_category = if summary.event_metrics_extracted {
let profile_category = if summary.profile_counted_as_processed {
DataCategory::ProfileIndexed
} else {
DataCategory::Profile
Expand Down Expand Up @@ -553,7 +562,7 @@ where
let item_scoping = scoping.item(DataCategory::Profile);
let profile_limits = (self.check)(item_scoping, summary.profile_quantity)?;
enforcement.profiles = CategoryLimit::new(
if summary.event_metrics_extracted {
if summary.profile_counted_as_processed {
DataCategory::ProfileIndexed
} else {
DataCategory::Profile
Expand Down
6 changes: 2 additions & 4 deletions tests/integration/test_outcome.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,13 +1212,11 @@ def make_envelope():
assert outcomes == expected_outcomes, outcomes


@pytest.mark.parametrize("metrics_already_extracted", [False, True])
@pytest.mark.parametrize("quota_category", ["transaction", "profile"])
def test_profile_outcomes_rate_limited(
mini_sentry,
relay_with_processing,
outcomes_consumer,
metrics_already_extracted,
quota_category,
):
"""
Expand Down Expand Up @@ -1268,7 +1266,7 @@ def test_profile_outcomes_rate_limited(
Item(
payload=PayloadRef(bytes=json.dumps(payload).encode()),
type="transaction",
headers={"metrics_extracted": metrics_already_extracted},
headers={"metrics_extracted": True},
)
)
envelope.add_item(Item(payload=PayloadRef(bytes=profile), type="profile"))
Expand All @@ -1294,7 +1292,7 @@ def test_profile_outcomes_rate_limited(

expected_outcomes += [
{
"category": 11 if metrics_already_extracted else 6,
"category": 6,
"key_id": 123,
"org_id": 1,
"outcome": 2, # RateLimited
Expand Down

0 comments on commit 8c499b5

Please sign in to comment.