From 2a75501215805d83934392fa45559d26325c2dae Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 3 Mar 2022 14:28:45 +0100 Subject: [PATCH 1/3] ref(metrics): Tag backdated bucket creations in statsd --- relay-metrics/src/aggregation.rs | 51 ++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/relay-metrics/src/aggregation.rs b/relay-metrics/src/aggregation.rs index 9a89c6e94b..381824515a 100644 --- a/relay-metrics/src/aggregation.rs +++ b/relay-metrics/src/aggregation.rs @@ -834,32 +834,41 @@ impl AggregatorConfig { Ok(output_timestamp) } - /// Returns the instant at which a bucket should be flushed. + /// Returns the instant at which a bucket is initially flushed. /// - /// Recent buckets are flushed after a grace period of `initial_delay`. Backdated buckets, that - /// is, buckets that lie in the past, are flushed after the shorter `debounce_delay`. - fn get_flush_time(&self, bucket_timestamp: UnixTimestamp, project_key: ProjectKey) -> Instant { - let now = Instant::now(); - + /// This instant is in the past if the bucket timestamp is backdated. + fn get_initial_flush(&self, bucket_timestamp: UnixTimestamp) -> Option { if let MonotonicResult::Instant(instant) = bucket_timestamp.to_instant() { let bucket_end = instant + self.bucket_interval(); let initial_flush = bucket_end + self.initial_delay(); // If the initial flush is still pending, use that. - if initial_flush > now { - // Shift deterministically within one bucket interval based on the project key. This - // distributes buckets over time while also flushing all buckets of the same project - // key together. - let mut hasher = FnvHasher::default(); - hasher.write(project_key.as_str().as_bytes()); - let shift_millis = u64::from(hasher.finish()) % (self.bucket_interval * 1000); - - return initial_flush + Duration::from_millis(shift_millis); + if initial_flush > Instant::now() { + return Some(initial_flush); } } + None + } + + /// Returns the instant at which a bucket should be flushed. + /// + /// Recent buckets are flushed after a grace period of `initial_delay`. Backdated buckets, that + /// is, buckets that lie in the past, are flushed after the shorter `debounce_delay`. + fn get_flush_time(&self, bucket_timestamp: UnixTimestamp, project_key: ProjectKey) -> Instant { + if let Some(initial_flush) = self.get_initial_flush(bucket_timestamp) { + // Shift deterministically within one bucket interval based on the project key. This + // distributes buckets over time while also flushing all buckets of the same project + // key together. + let mut hasher = FnvHasher::default(); + hasher.write(project_key.as_str().as_bytes()); + let shift_millis = u64::from(hasher.finish()) % (self.bucket_interval * 1000); + + return initial_flush + Duration::from_millis(shift_millis); + } + // If the initial flush time has passed or cannot be represented, debounce future flushes // with the `debounce_delay` starting now. - now + self.debounce_delay() + Instant::now() + self.debounce_delay() } } @@ -1043,20 +1052,24 @@ impl Aggregator { relay_statsd::metric!( counter(MetricCounters::MergeHit) += 1, metric_type = entry.key().metric_type.as_str(), - metric_name = &entry.key().metric_name + metric_name = &entry.key().metric_name, ); value.merge_into(&mut entry.get_mut().value)?; } Entry::Vacant(entry) => { + let backdated = self.config.get_initial_flush(timestamp).is_none(); + relay_statsd::metric!( counter(MetricCounters::MergeMiss) += 1, metric_type = entry.key().metric_type.as_str(), - metric_name = &entry.key().metric_name + metric_name = &entry.key().metric_name, + backdated = if backdated { "true" } else { "false" }, ); relay_statsd::metric!( set(MetricSets::UniqueBucketsCreated) = entry.key().as_integer_lossy(), metric_type = entry.key().metric_type.as_str(), - metric_name = &entry.key().metric_name + metric_name = &entry.key().metric_name, + backdated = if backdated { "true" } else { "false" }, ); let flush_at = self.config.get_flush_time(timestamp, project_key); From 347ca1b83da93845a61e7496fbbb632c4d100abb Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 3 Mar 2022 14:32:15 +0100 Subject: [PATCH 2/3] meta: Docs and changelog --- CHANGELOG.md | 2 +- relay-metrics/src/statsd.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d4b84fa46..1334190eb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ **Internal**: - Spread out metric aggregation over the aggregation window to avoid concentrated waves of metrics requests to the upstream every 10 seconds. Relay now applies jitter to `initial_delay` to spread out requests more evenly over time. ([#1185](https://github.com/getsentry/relay/pull/1185)) -- Add new statsd metrics for bucketing efficiency ([#1199](https://github.com/getsentry/relay/pull/1199), [#1192](https://github.com/getsentry/relay/pull/1192)) +- Add new statsd metrics for bucketing efficiency ([#1199](https://github.com/getsentry/relay/pull/1199), [#1192](https://github.com/getsentry/relay/pull/1192), [#1200](https://github.com/getsentry/relay/pull/1200)) ## 22.2.0 diff --git a/relay-metrics/src/statsd.rs b/relay-metrics/src/statsd.rs index f70c7e5e38..de969f51d0 100644 --- a/relay-metrics/src/statsd.rs +++ b/relay-metrics/src/statsd.rs @@ -35,7 +35,7 @@ pub enum MetricCounters { /// Incremented every time a bucket is created. /// - /// Tagged by metric type and name. + /// Tagged by metric type, name, and a `backdated` flag. MergeMiss, /// Incremented every time a bucket is dropped. From 08e1279c4ad4fe15ddedcbcad75dd1224768cc39 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 4 Mar 2022 08:51:56 +0100 Subject: [PATCH 3/3] ref(metrics): Switch to a histogram of bucket delays instead --- relay-metrics/src/aggregation.rs | 59 +++++++++++++++----------------- relay-metrics/src/statsd.rs | 15 +++++++- 2 files changed, 42 insertions(+), 32 deletions(-) diff --git a/relay-metrics/src/aggregation.rs b/relay-metrics/src/aggregation.rs index 381824515a..8fa4163df6 100644 --- a/relay-metrics/src/aggregation.rs +++ b/relay-metrics/src/aggregation.rs @@ -834,41 +834,42 @@ impl AggregatorConfig { Ok(output_timestamp) } - /// Returns the instant at which a bucket is initially flushed. + /// Returns the instant at which a bucket should be flushed. /// - /// This instant is in the past if the bucket timestamp is backdated. - fn get_initial_flush(&self, bucket_timestamp: UnixTimestamp) -> Option { + /// Recent buckets are flushed after a grace period of `initial_delay`. Backdated buckets, that + /// is, buckets that lie in the past, are flushed after the shorter `debounce_delay`. + fn get_flush_time(&self, bucket_timestamp: UnixTimestamp, project_key: ProjectKey) -> Instant { + let now = Instant::now(); + let mut flush = None; + if let MonotonicResult::Instant(instant) = bucket_timestamp.to_instant() { let bucket_end = instant + self.bucket_interval(); let initial_flush = bucket_end + self.initial_delay(); // If the initial flush is still pending, use that. - if initial_flush > Instant::now() { - return Some(initial_flush); + if initial_flush > now { + // Shift deterministically within one bucket interval based on the project key. This + // distributes buckets over time while also flushing all buckets of the same project + // key together. + let mut hasher = FnvHasher::default(); + hasher.write(project_key.as_str().as_bytes()); + let shift_millis = u64::from(hasher.finish()) % (self.bucket_interval * 1000); + + flush = Some(initial_flush + Duration::from_millis(shift_millis)); } } - None - } - - /// Returns the instant at which a bucket should be flushed. - /// - /// Recent buckets are flushed after a grace period of `initial_delay`. Backdated buckets, that - /// is, buckets that lie in the past, are flushed after the shorter `debounce_delay`. - fn get_flush_time(&self, bucket_timestamp: UnixTimestamp, project_key: ProjectKey) -> Instant { - if let Some(initial_flush) = self.get_initial_flush(bucket_timestamp) { - // Shift deterministically within one bucket interval based on the project key. This - // distributes buckets over time while also flushing all buckets of the same project - // key together. - let mut hasher = FnvHasher::default(); - hasher.write(project_key.as_str().as_bytes()); - let shift_millis = u64::from(hasher.finish()) % (self.bucket_interval * 1000); - - return initial_flush + Duration::from_millis(shift_millis); - } + let delay = UnixTimestamp::now().as_secs() as i64 - bucket_timestamp.as_secs() as i64; + relay_statsd::metric!( + histogram(MetricHistograms::BucketsDelay) = delay as f64, + backedated = if flush.is_none() { "true" } else { "false" }, + ); // If the initial flush time has passed or cannot be represented, debounce future flushes // with the `debounce_delay` starting now. - Instant::now() + self.debounce_delay() + match flush { + Some(initial_flush) => initial_flush, + None => now + self.debounce_delay(), + } } } @@ -1052,24 +1053,20 @@ impl Aggregator { relay_statsd::metric!( counter(MetricCounters::MergeHit) += 1, metric_type = entry.key().metric_type.as_str(), - metric_name = &entry.key().metric_name, + metric_name = &entry.key().metric_name ); value.merge_into(&mut entry.get_mut().value)?; } Entry::Vacant(entry) => { - let backdated = self.config.get_initial_flush(timestamp).is_none(); - relay_statsd::metric!( counter(MetricCounters::MergeMiss) += 1, metric_type = entry.key().metric_type.as_str(), - metric_name = &entry.key().metric_name, - backdated = if backdated { "true" } else { "false" }, + metric_name = &entry.key().metric_name ); relay_statsd::metric!( set(MetricSets::UniqueBucketsCreated) = entry.key().as_integer_lossy(), metric_type = entry.key().metric_type.as_str(), - metric_name = &entry.key().metric_name, - backdated = if backdated { "true" } else { "false" }, + metric_name = &entry.key().metric_name ); let flush_at = self.config.get_flush_time(timestamp, project_key); diff --git a/relay-metrics/src/statsd.rs b/relay-metrics/src/statsd.rs index de969f51d0..539a530730 100644 --- a/relay-metrics/src/statsd.rs +++ b/relay-metrics/src/statsd.rs @@ -35,7 +35,7 @@ pub enum MetricCounters { /// Incremented every time a bucket is created. /// - /// Tagged by metric type, name, and a `backdated` flag. + /// Tagged by metric type and name. MergeMiss, /// Incremented every time a bucket is dropped. @@ -91,6 +91,18 @@ pub enum MetricHistograms { /// BucketRelativeSize measures how many distinct values are in a bucket and therefore /// BucketRelativeSize gives you a measurement of the bucket size and complexity. BucketRelativeSize, + + /// The reporting delay at which a bucket arrives in Relay. + /// + /// A positive delay indicates the bucket arrives after its stated timestamp. Large delays + /// indicate backdating, particularly all delays larger than `bucket_interval + initial_delay`. + /// Negative delays indicate that the bucket is dated into the future, likely due to clock drift + /// on the client. + /// + /// This metric is tagged with: + /// - `backdated`: A flag indicating whether the metric was reported within the `initial_delay` + /// time period (`false`) or after the initial delay has expired (`true`). + BucketsDelay, } impl HistogramMetric for MetricHistograms { @@ -99,6 +111,7 @@ impl HistogramMetric for MetricHistograms { Self::BucketsFlushed => "metrics.buckets.flushed", Self::BucketsFlushedPerProject => "metrics.buckets.flushed_per_project", Self::BucketRelativeSize => "metrics.buckets.relative_bucket_size", + Self::BucketsDelay => "metrics.buckets.delay", } } }