From 88348d93d76e73d25394dfc62b9099bbc8f72911 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 20 Mar 2024 14:59:22 +0100 Subject: [PATCH] fix(metric-stats): Prevent producing multiple metric stats for the same bucket (#3290) Metric stats facade now takes ownership of a bucket to prevent mistakenly producing metric stats multiple times for the same bucket. --- relay-server/src/metric_stats.rs | 27 +++++++++++----------- relay-server/src/services/store.rs | 22 +++++++++++++++--- tests/integration/test_metric_stats.py | 31 ++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 17 deletions(-) diff --git a/relay-server/src/metric_stats.rs b/relay-server/src/metric_stats.rs index 15e3f82eb2..9d9280cd93 100644 --- a/relay-server/src/metric_stats.rs +++ b/relay-server/src/metric_stats.rs @@ -3,8 +3,7 @@ use std::sync::{Arc, OnceLock}; use relay_config::Config; use relay_metrics::{ - Aggregator, Bucket, BucketValue, BucketView, MergeBuckets, MetricResourceIdentifier, - UnixTimestamp, + Aggregator, Bucket, BucketValue, MergeBuckets, MetricResourceIdentifier, UnixTimestamp, }; use relay_quotas::Scoping; use relay_system::Addr; @@ -48,19 +47,19 @@ impl MetricStats { } /// Tracks the metric volume and outcome for the bucket. - pub fn track(&self, scoping: Scoping, bucket: &BucketView<'_>, outcome: Outcome) { + pub fn track(&self, scoping: Scoping, bucket: Bucket, outcome: Outcome) { if !self.config.processing_enabled() || !self.is_rolled_out(scoping.organization_id) { return; } - let Some(volume) = self.to_volume_metric(bucket, &outcome) else { + let Some(volume) = self.to_volume_metric(&bucket, &outcome) else { return; }; relay_log::trace!( "Tracking volume of {} for mri '{}': {}", - bucket.metadata().merges.get(), - bucket.name(), + bucket.metadata.merges.get(), + bucket.name, outcome ); self.aggregator @@ -77,13 +76,13 @@ impl MetricStats { is_rolled_out(organization_id, rate) } - fn to_volume_metric(&self, bucket: &BucketView<'_>, outcome: &Outcome) -> Option { - let volume = bucket.metadata().merges.get(); + fn to_volume_metric(&self, bucket: &Bucket, outcome: &Outcome) -> Option { + let volume = bucket.metadata.merges.get(); if volume == 0 { return None; } - let namespace = MetricResourceIdentifier::parse(bucket.name()) + let namespace = MetricResourceIdentifier::parse(&bucket.name) .ok()? .namespace; if !namespace.has_metric_stats() { @@ -91,7 +90,7 @@ impl MetricStats { } let mut tags = BTreeMap::from([ - ("mri".to_owned(), bucket.name().to_string()), + ("mri".to_owned(), bucket.name.to_string()), ("mri.namespace".to_owned(), namespace.to_string()), ( "outcome.id".to_owned(), @@ -166,12 +165,12 @@ mod tests { let scoping = scoping(); let mut bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap(); - ms.track(scoping, &BucketView::from(&bucket), Outcome::Accepted); + ms.track(scoping, bucket.clone(), Outcome::Accepted); bucket.metadata.merges = bucket.metadata.merges.saturating_add(41); ms.track( scoping, - &BucketView::from(&bucket), + bucket, Outcome::RateLimited(Some(ReasonCode::new("foobar"))), ); @@ -227,7 +226,7 @@ mod tests { let scoping = scoping(); let bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap(); - ms.track(scoping, &BucketView::from(&bucket), Outcome::Accepted); + ms.track(scoping, bucket, Outcome::Accepted); drop(ms); @@ -241,7 +240,7 @@ mod tests { let scoping = scoping(); let bucket = Bucket::parse(b"transactions/rt@millisecond:57|d", UnixTimestamp::now()).unwrap(); - ms.track(scoping, &BucketView::from(&bucket), Outcome::Accepted); + ms.track(scoping, bucket, Outcome::Accepted); drop(ms); diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 6f1d06dd5d..2cd776dbd9 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -362,10 +362,14 @@ impl StoreService { for mut bucket in buckets { let namespace = encoder.prepare(&mut bucket); + let mut has_success = false; // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce // each bucket separately, we only need to split buckets that exceed the size, but not // batches. - for view in BucketsView::new(&[bucket]).by_size(batch_size).flatten() { + for view in BucketsView::new(std::slice::from_ref(&bucket)) + .by_size(batch_size) + .flatten() + { let message = self.create_metric_message( scoping.organization_id, scoping.project_id, @@ -380,7 +384,7 @@ impl StoreService { match result { Ok(()) => { - self.metric_stats.track(scoping, &view, Outcome::Accepted); + has_success = true; } Err(e) => { error.get_or_insert(e); @@ -388,6 +392,18 @@ impl StoreService { } } } + + // Tracking the volume here is slightly off, only one of the multiple bucket views can + // fail to produce. Since the views are sliced from the original bucket we cannot + // correctly attribute the amount of merges (volume) to the amount of slices that + // succeeded or not. -> Attribute the entire volume if at least one slice successfully + // produced. + // + // This logic will be improved iterated on and change once we move serialization logic + // back into the processor service. + if has_success { + self.metric_stats.track(scoping, bucket, Outcome::Accepted); + } } if let Some(error) = error { @@ -834,7 +850,7 @@ impl StoreService { ); message.and_then(|message| self.send_metric_message(namespace, message))?; - self.metric_stats.track(scoping, &view, Outcome::Accepted); + self.metric_stats.track(scoping, bucket, Outcome::Accepted); } Ok(()) diff --git a/tests/integration/test_metric_stats.py b/tests/integration/test_metric_stats.py index 5c3522dc06..d7b73fa0d7 100644 --- a/tests/integration/test_metric_stats.py +++ b/tests/integration/test_metric_stats.py @@ -1,3 +1,4 @@ +import copy from typing import Any import pytest from attr import dataclass @@ -78,3 +79,33 @@ def test_metric_stats_simple( "outcome.id": "0", } assert len(metrics.other) == 2 + + +def test_metric_stats_max_flush_bytes( + mini_sentry, relay_with_processing, metrics_consumer +): + mini_sentry.global_config["options"]["relay.metric-stats.rollout-rate"] = 1.0 + + metrics_consumer = metrics_consumer() + + relay_config = copy.deepcopy(TEST_CONFIG) + relay_config["aggregator"]["max_flush_bytes"] = 150 + + relay = relay_with_processing(options=relay_config) + + project_id = 42 + project_config = mini_sentry.add_basic_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:custom-metrics", + "organizations:metric-stats", + ] + + # Metric is big enough to be split into multiple smaller metrics when emitting to Kafka, + # make sure the volume counted is still just 1. + relay.send_metrics( + project_id, "custom/foo:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17:18:19:20|d" + ) + + metrics = metric_stats_by_mri(metrics_consumer, 3) + assert metrics.volume["d:custom/foo@none"]["value"] == 1.0 + assert len(metrics.other) == 2