diff --git a/relay-metrics/src/aggregation.rs b/relay-metrics/src/aggregation.rs index d76187d731..27b75d4557 100644 --- a/relay-metrics/src/aggregation.rs +++ b/relay-metrics/src/aggregation.rs @@ -850,6 +850,10 @@ impl MetricsContainer for Bucket { fn len(&self) -> usize { self.value.len() } + + fn tag(&self, name: &str) -> Option<&str> { + self.tags.get(name).map(|s| s.as_str()) + } } /// Any error that may occur during aggregation. diff --git a/relay-metrics/src/protocol.rs b/relay-metrics/src/protocol.rs index 2b1b0635cd..a926bdc4c3 100644 --- a/relay-metrics/src/protocol.rs +++ b/relay-metrics/src/protocol.rs @@ -588,6 +588,9 @@ pub trait MetricsContainer { fn is_empty(&self) -> bool { self.len() == 0 } + + /// Returns the value of the given tag, if present. + fn tag(&self, name: &str) -> Option<&str>; } impl MetricsContainer for Metric { @@ -598,6 +601,10 @@ impl MetricsContainer for Metric { fn len(&self) -> usize { 1 } + + fn tag(&self, name: &str) -> Option<&str> { + self.tags.get(name).map(|s| s.as_str()) + } } /// Iterator over parsed metrics returned from [`Metric::parse_all`]. diff --git a/relay-server/src/utils/metrics_rate_limits.rs b/relay-server/src/utils/metrics_rate_limits.rs index cf4c549b3f..121b141c06 100644 --- a/relay-server/src/utils/metrics_rate_limits.rs +++ b/relay-server/src/utils/metrics_rate_limits.rs @@ -24,6 +24,9 @@ pub struct MetricsLimiter> = Vec /// The number of transactions contributing to these metrics. transaction_count: usize, + + /// The number of profiles contained in these metrics. + profile_count: usize, } impl>> MetricsLimiter { @@ -31,7 +34,7 @@ impl>> MetricsLimiter { /// /// Returns Ok if `metrics` contain transaction metrics, `metrics` otherwise. pub fn create(buckets: Vec, quotas: Q, scoping: Scoping) -> Result> { - let transaction_counts: Vec<_> = buckets + let counts: Vec<_> = buckets .iter() .map(|metric| { let mri = match MetricResourceIdentifier::parse(metric.name()) { @@ -51,31 +54,35 @@ impl>> MetricsLimiter { // The "duration" metric is extracted exactly once for every processed // transaction, so we can use it to count the number of transactions. let count = metric.len(); - Some(count) + let has_profile = metric.tag("has_profile") == Some("true"); + Some((count, has_profile)) } else { // For any other metric in the transaction namespace, we check the limit with // quantity=0 so transactions are not double counted against the quota. - Some(0) + Some((0, false)) } }) .collect(); // Accumulate the total transaction count: - let transaction_count = transaction_counts - .iter() - .fold(None, |acc, transaction_count| match transaction_count { - Some(count) => Some(acc.unwrap_or(0) + count), - None => acc, - }); + let mut total_counts: Option<(usize, usize)> = None; + for (tx_count, has_profile) in counts.iter().flatten() { + let (total_txs, total_profiles) = total_counts.get_or_insert((0, 0)); + *total_txs += tx_count; + if *has_profile { + *total_profiles += tx_count; + } + } - if let Some(transaction_count) = transaction_count { - let transaction_buckets = transaction_counts.iter().map(Option::is_some).collect(); + if let Some((transaction_count, profile_count)) = total_counts { + let transaction_buckets = counts.iter().map(Option::is_some).collect(); Ok(Self { metrics: buckets, quotas, scoping, transaction_buckets, transaction_count, + profile_count, }) } else { Err(buckets) @@ -115,12 +122,24 @@ impl>> MetricsLimiter { outcome_aggregator.send(TrackOutcome { timestamp, scoping: self.scoping, - outcome, + outcome: outcome.clone(), event_id: None, remote_addr: None, category: DataCategory::Transaction, quantity: self.transaction_count as u32, }); + + if self.profile_count > 0 { + outcome_aggregator.send(TrackOutcome { + timestamp, + scoping: self.scoping, + outcome, + event_id: None, + remote_addr: None, + category: DataCategory::Profile, + quantity: self.profile_count as u32, + }); + } } } @@ -173,3 +192,81 @@ impl>> MetricsLimiter { self.metrics } } + +#[cfg(test)] +mod tests { + use relay_common::{ProjectId, ProjectKey}; + use relay_metrics::{Metric, MetricValue}; + use relay_quotas::{Quota, QuotaScope}; + use smallvec::smallvec; + + use super::*; + + #[test] + fn profiles_limits_are_reported() { + let metrics = vec![ + Metric { + // transaction without profile + timestamp: UnixTimestamp::now(), + name: "d:transactions/duration@millisecond".to_string(), + tags: Default::default(), + value: MetricValue::Distribution(123.0), + }, + Metric { + // transaction with profile + timestamp: UnixTimestamp::now(), + name: "d:transactions/duration@millisecond".to_string(), + tags: [("has_profile".to_string(), "true".to_string())].into(), + value: MetricValue::Distribution(456.0), + }, + Metric { + // unrelated metric + timestamp: UnixTimestamp::now(), + name: "something_else".to_string(), + tags: [("has_profile".to_string(), "true".to_string())].into(), + value: MetricValue::Distribution(123.0), + }, + ]; + let quotas = vec![Quota { + id: None, + categories: smallvec![DataCategory::Transaction], + scope: QuotaScope::Organization, + scope_id: None, + limit: Some(0), + window: None, + reason_code: None, + }]; + let (outcome_sink, mut rx) = Addr::custom(); + + let mut limiter = MetricsLimiter::create( + metrics, + quotas, + Scoping { + organization_id: 1, + project_id: ProjectId::new(1), + project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + key_id: None, + }, + ) + .unwrap(); + + limiter.enforce_limits(Ok(&RateLimits::new()), outcome_sink); + + rx.close(); + + let outcomes: Vec<_> = (0..) + .map(|_| rx.blocking_recv()) + .take_while(|o| o.is_some()) + .flatten() + .map(|o| (o.outcome, o.category, o.quantity)) + .collect(); + + assert_eq!( + outcomes, + vec![ + (Outcome::RateLimited(None), DataCategory::Transaction, 2), + (Outcome::RateLimited(None), DataCategory::Profile, 1) + ] + ); + } +} diff --git a/relay-system/src/service.rs b/relay-system/src/service.rs index f2b2bf2302..8017fe8943 100644 --- a/relay-system/src/service.rs +++ b/relay-system/src/service.rs @@ -748,6 +748,18 @@ impl Addr { inner: Box::new(self), } } + + /// Dummy address used for testing. + pub fn custom() -> (Self, mpsc::UnboundedReceiver) { + let (tx, rx) = mpsc::unbounded_channel(); + ( + Addr { + tx, + queue_size: Default::default(), + }, + rx, + ) + } } impl fmt::Debug for Addr {