Skip to content

Commit

Permalink
feat(cardinality): Create outcomes for cardinality limited metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Jan 16, 2024
1 parent 0af0a8c commit ace8aeb
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- Validate error_id and trace_id vectors in replay deserializer. ([#2931](https://github.com/getsentry/relay/pull/2931))
- Add a data category for indexed spans. ([#2937](https://github.com/getsentry/relay/pull/2937))
- Add nested Android app start span ops to span ingestion ([#2927](https://github.com/getsentry/relay/pull/2927))
- Create rate limited outcomes for cardinality limited metrics ([#2947](https://github.com/getsentry/relay/pull/2947))

## 23.12.1

Expand Down
23 changes: 15 additions & 8 deletions relay-cardinality/src/limiter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Relay Cardinality Limiter

use std::collections::HashSet;
use std::collections::BTreeSet;
use std::fmt::{Debug, Display};
use std::hash::Hash;

Expand Down Expand Up @@ -123,9 +123,7 @@ impl<T: Limiter> CardinalityLimiter<T> {
);

let rejections = match result {
Ok(rejections) => rejections
.map(|rejection| rejection.id.0)
.collect::<HashSet<usize>>(),
Ok(rejections) => rejections.map(|rejection| rejection.id.0).collect(),
Err(err) => return Err((items, err)),
};

Expand All @@ -141,10 +139,15 @@ impl<T: Limiter> CardinalityLimiter<T> {
#[derive(Debug)]
pub struct CardinalityLimits<T> {
source: Vec<T>,
rejections: HashSet<usize>,
rejections: BTreeSet<usize>,
}

impl<T> CardinalityLimits<T> {
/// Returns an iterator yielding only rejected items.
pub fn rejected(&self) -> impl Iterator<Item = &T> {
self.rejections.iter().filter_map(|&i| self.source.get(i))
}

/// Consumes the result and returns an iterator over all accepted items.
pub fn into_accepted(self) -> Vec<T> {
if self.rejections.is_empty() {
Expand Down Expand Up @@ -202,20 +205,24 @@ mod tests {
fn test_accepted() {
let limits = CardinalityLimits {
source: vec!['a', 'b', 'c', 'd', 'e'],
rejections: HashSet::from([0, 1, 3]),
rejections: BTreeSet::from([0, 1, 3]),
};
dbg!(limits.rejected().collect::<Vec<_>>());
assert!(limits.rejected().eq(['a', 'b', 'd'].iter()));
assert_eq!(limits.into_accepted(), vec!['c', 'e']);

let limits = CardinalityLimits {
source: vec!['a', 'b', 'c', 'd', 'e'],
rejections: HashSet::from([]),
rejections: BTreeSet::from([]),
};
assert!(limits.rejected().eq([].iter()));
assert_eq!(limits.into_accepted(), vec!['a', 'b', 'c', 'd', 'e']);

let limits = CardinalityLimits {
source: vec!['a', 'b', 'c', 'd', 'e'],
rejections: HashSet::from([0, 1, 2, 3, 4]),
rejections: BTreeSet::from([0, 1, 2, 3, 4]),
};
assert!(limits.rejected().eq(['a', 'b', 'c', 'd', 'e'].iter()));
assert!(limits.into_accepted().is_empty());
}

Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/actors/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ pub enum Outcome {
/// The event has been rate limited.
RateLimited(Option<ReasonCode>),

/// The event/metric has been cardinality limited.
CardinalityLimited,

/// The event has been discarded because of invalid data.
Invalid(DiscardReason),

Expand All @@ -180,6 +183,7 @@ impl Outcome {
match self {
Outcome::Filtered(_) | Outcome::FilteredSampling(_) => OutcomeId::FILTERED,
Outcome::RateLimited(_) => OutcomeId::RATE_LIMITED,
Outcome::CardinalityLimited => OutcomeId::RATE_LIMITED,
Outcome::Invalid(_) => OutcomeId::INVALID,
Outcome::Abuse => OutcomeId::ABUSE,
Outcome::ClientDiscard(_) => OutcomeId::CLIENT_DISCARD,
Expand All @@ -197,6 +201,7 @@ impl Outcome {
Outcome::RateLimited(code_opt) => code_opt
.as_ref()
.map(|code| Cow::Owned(code.as_str().into())),
Outcome::CardinalityLimited => Some(Cow::Borrowed("cardinality_limited")),
Outcome::ClientDiscard(ref discard_reason) => Some(Cow::Borrowed(discard_reason)),
Outcome::Abuse => None,
Outcome::Accepted => None,
Expand Down Expand Up @@ -227,6 +232,7 @@ impl fmt::Display for Outcome {
Outcome::FilteredSampling(rule_ids) => write!(f, "sampling rule {rule_ids}"),
Outcome::RateLimited(None) => write!(f, "rate limited"),
Outcome::RateLimited(Some(reason)) => write!(f, "rate limited with reason {reason}"),
Outcome::CardinalityLimited => write!(f, "cardinality limited"),
Outcome::Invalid(DiscardReason::Internal) => write!(f, "internal error"),
Outcome::Invalid(reason) => write!(f, "invalid data ({reason})"),
Outcome::Abuse => write!(f, "abuse limit reached"),
Expand Down
55 changes: 39 additions & 16 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1802,6 +1802,41 @@ impl EnvelopeProcessorService {
false
}

/// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
#[cfg(feature = "processing")]
fn cardinality_limit_buckets(
&self,
scoping: Scoping,
buckets: Vec<Bucket>,
mode: ExtractionMode,
) -> Vec<Bucket> {
let Some(ref limiter) = self.inner.cardinality_limiter else {
return buckets;
};

let limits = match limiter.check_cardinality_limits(scoping.organization_id, buckets) {
Ok(limits) => limits,
Err((buckets, error)) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"cardinality limiter failed"
);

return buckets;
}
};

// Log outcomes for rejected buckets.
utils::reject_metrics(
&self.inner.outcome_aggregator,
utils::extract_metric_quantities(limits.rejected(), mode),
scoping,
Outcome::CardinalityLimited,
);

limits.into_accepted()
}

/// Processes metric buckets and sends them to kafka.
///
/// This function runs the following steps:
Expand All @@ -1819,27 +1854,15 @@ impl EnvelopeProcessorService {
project_state,
} = message;

if project_state.has_feature(Feature::CardinalityLimiter) {
if let Some(ref limiter) = self.inner.cardinality_limiter {
let org = scoping.organization_id;
buckets = match limiter.check_cardinality_limits(org, buckets) {
Ok(limits) => limits.into_accepted(),
Err((buckets, error)) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"cardinality limiter failed"
);
buckets
}
};
}
}

let mode = match project_state.config.transaction_metrics {
Some(ErrorBoundary::Ok(ref c)) if c.usage_metric() => ExtractionMode::Usage,
_ => ExtractionMode::Duration,
};

if project_state.has_feature(Feature::CardinalityLimiter) {
buckets = self.cardinality_limit_buckets(scoping, buckets, mode);
}

if self.rate_limit_batches(scoping, &buckets, &project_state, mode) {
return;
}
Expand Down

0 comments on commit ace8aeb

Please sign in to comment.