Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(outcomes): Report profiles when transaction metrics dropped #2187

Merged
merged 2 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,10 @@ impl MetricsContainer for Bucket {
fn len(&self) -> usize {
self.value.len()
}

fn tag(&self, name: &str) -> Option<&str> {
self.tags.get(name).map(|s| s.as_str())
}
}

/// Any error that may occur during aggregation.
Expand Down
7 changes: 7 additions & 0 deletions relay-metrics/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ pub trait MetricsContainer {
fn is_empty(&self) -> bool {
self.len() == 0
}

/// Returns the value of the given tag, if present.
fn tag(&self, name: &str) -> Option<&str>;
}

impl MetricsContainer for Metric {
Expand All @@ -598,6 +601,10 @@ impl MetricsContainer for Metric {
fn len(&self) -> usize {
1
}

fn tag(&self, name: &str) -> Option<&str> {
self.tags.get(name).map(|s| s.as_str())
}
}

/// Iterator over parsed metrics returned from [`Metric::parse_all`].
Expand Down
121 changes: 109 additions & 12 deletions relay-server/src/utils/metrics_rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ pub struct MetricsLimiter<M: MetricsContainer, Q: AsRef<Vec<Quota>> = Vec<Quota>

/// The number of transactions contributing to these metrics.
transaction_count: usize,

/// The number of profiles contained in these metrics.
profile_count: usize,
}

impl<M: MetricsContainer, Q: AsRef<Vec<Quota>>> MetricsLimiter<M, Q> {
/// Create a new limiter instance.
///
/// Returns Ok if `metrics` contain transaction metrics, `metrics` otherwise.
pub fn create(buckets: Vec<M>, quotas: Q, scoping: Scoping) -> Result<Self, Vec<M>> {
let transaction_counts: Vec<_> = buckets
let counts: Vec<_> = buckets
.iter()
.map(|metric| {
let mri = match MetricResourceIdentifier::parse(metric.name()) {
Expand All @@ -51,31 +54,35 @@ impl<M: MetricsContainer, Q: AsRef<Vec<Quota>>> MetricsLimiter<M, Q> {
// The "duration" metric is extracted exactly once for every processed
// transaction, so we can use it to count the number of transactions.
let count = metric.len();
Some(count)
let has_profile = metric.tag("has_profile") == Some("true");
Some((count, has_profile))
} else {
// For any other metric in the transaction namespace, we check the limit with
// quantity=0 so transactions are not double counted against the quota.
Some(0)
Some((0, false))
}
})
.collect();

// Accumulate the total transaction count:
let transaction_count = transaction_counts
.iter()
.fold(None, |acc, transaction_count| match transaction_count {
Some(count) => Some(acc.unwrap_or(0) + count),
None => acc,
});
let mut total_counts: Option<(usize, usize)> = None;
for (tx_count, has_profile) in counts.iter().flatten() {
let (total_txs, total_profiles) = total_counts.get_or_insert((0, 0));
*total_txs += tx_count;
if *has_profile {
*total_profiles += tx_count;
}
}

if let Some(transaction_count) = transaction_count {
let transaction_buckets = transaction_counts.iter().map(Option::is_some).collect();
if let Some((transaction_count, profile_count)) = total_counts {
let transaction_buckets = counts.iter().map(Option::is_some).collect();
Ok(Self {
metrics: buckets,
quotas,
scoping,
transaction_buckets,
transaction_count,
profile_count,
})
} else {
Err(buckets)
Expand Down Expand Up @@ -115,12 +122,24 @@ impl<M: MetricsContainer, Q: AsRef<Vec<Quota>>> MetricsLimiter<M, Q> {
outcome_aggregator.send(TrackOutcome {
timestamp,
scoping: self.scoping,
outcome,
outcome: outcome.clone(),
event_id: None,
remote_addr: None,
category: DataCategory::Transaction,
quantity: self.transaction_count as u32,
});

if self.profile_count > 0 {
outcome_aggregator.send(TrackOutcome {
timestamp,
scoping: self.scoping,
outcome,
event_id: None,
remote_addr: None,
category: DataCategory::Profile,
quantity: self.profile_count as u32,
});
}
}
}

Expand Down Expand Up @@ -173,3 +192,81 @@ impl<M: MetricsContainer, Q: AsRef<Vec<Quota>>> MetricsLimiter<M, Q> {
self.metrics
}
}

#[cfg(test)]
mod tests {
use relay_common::{ProjectId, ProjectKey};
use relay_metrics::{Metric, MetricValue};
use relay_quotas::{Quota, QuotaScope};
use smallvec::smallvec;

use super::*;

#[test]
fn profiles_limits_are_reported() {
let metrics = vec![
Metric {
// transaction without profile
timestamp: UnixTimestamp::now(),
name: "d:transactions/duration@millisecond".to_string(),
tags: Default::default(),
value: MetricValue::Distribution(123.0),
},
Metric {
// transaction with profile
timestamp: UnixTimestamp::now(),
name: "d:transactions/duration@millisecond".to_string(),
tags: [("has_profile".to_string(), "true".to_string())].into(),
value: MetricValue::Distribution(456.0),
},
Metric {
// unrelated metric
timestamp: UnixTimestamp::now(),
name: "something_else".to_string(),
tags: [("has_profile".to_string(), "true".to_string())].into(),
value: MetricValue::Distribution(123.0),
},
];
let quotas = vec![Quota {
id: None,
categories: smallvec![DataCategory::Transaction],
scope: QuotaScope::Organization,
scope_id: None,
limit: Some(0),
window: None,
reason_code: None,
}];
let (outcome_sink, mut rx) = Addr::custom();

let mut limiter = MetricsLimiter::create(
metrics,
quotas,
Scoping {
organization_id: 1,
project_id: ProjectId::new(1),
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: None,
},
)
.unwrap();

limiter.enforce_limits(Ok(&RateLimits::new()), outcome_sink);

rx.close();

let outcomes: Vec<_> = (0..)
.map(|_| rx.blocking_recv())
.take_while(|o| o.is_some())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL

.flatten()
.map(|o| (o.outcome, o.category, o.quantity))
.collect();

assert_eq!(
outcomes,
vec![
(Outcome::RateLimited(None), DataCategory::Transaction, 2),
(Outcome::RateLimited(None), DataCategory::Profile, 1)
]
);
}
}
12 changes: 12 additions & 0 deletions relay-system/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,18 @@ impl<I: Interface> Addr<I> {
inner: Box::new(self),
}
}

/// Dummy address used for testing.
pub fn custom() -> (Self, mpsc::UnboundedReceiver<I>) {
let (tx, rx) = mpsc::unbounded_channel();
(
Addr {
tx,
queue_size: Default::default(),
},
rx,
)
}
}

impl<I: Interface> fmt::Debug for Addr<I> {
Expand Down
Loading