Skip to content

Commit

Permalink
fix(metric-stats): Prevent producing multiple metric stats for the sa…
Browse files Browse the repository at this point in the history
…me bucket (#3290)

Metric stats facade now takes ownership of a bucket to prevent
mistakenly producing metric stats multiple times for the same bucket.
  • Loading branch information
Dav1dde committed Mar 20, 2024
1 parent a5e8e33 commit 88348d9
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 17 deletions.
27 changes: 13 additions & 14 deletions relay-server/src/metric_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use std::sync::{Arc, OnceLock};

use relay_config::Config;
use relay_metrics::{
Aggregator, Bucket, BucketValue, BucketView, MergeBuckets, MetricResourceIdentifier,
UnixTimestamp,
Aggregator, Bucket, BucketValue, MergeBuckets, MetricResourceIdentifier, UnixTimestamp,
};
use relay_quotas::Scoping;
use relay_system::Addr;
Expand Down Expand Up @@ -48,19 +47,19 @@ impl MetricStats {
}

/// Tracks the metric volume and outcome for the bucket.
pub fn track(&self, scoping: Scoping, bucket: &BucketView<'_>, outcome: Outcome) {
pub fn track(&self, scoping: Scoping, bucket: Bucket, outcome: Outcome) {
if !self.config.processing_enabled() || !self.is_rolled_out(scoping.organization_id) {
return;
}

let Some(volume) = self.to_volume_metric(bucket, &outcome) else {
let Some(volume) = self.to_volume_metric(&bucket, &outcome) else {
return;
};

relay_log::trace!(
"Tracking volume of {} for mri '{}': {}",
bucket.metadata().merges.get(),
bucket.name(),
bucket.metadata.merges.get(),
bucket.name,
outcome
);
self.aggregator
Expand All @@ -77,21 +76,21 @@ impl MetricStats {
is_rolled_out(organization_id, rate)
}

fn to_volume_metric(&self, bucket: &BucketView<'_>, outcome: &Outcome) -> Option<Bucket> {
let volume = bucket.metadata().merges.get();
fn to_volume_metric(&self, bucket: &Bucket, outcome: &Outcome) -> Option<Bucket> {
let volume = bucket.metadata.merges.get();
if volume == 0 {
return None;
}

let namespace = MetricResourceIdentifier::parse(bucket.name())
let namespace = MetricResourceIdentifier::parse(&bucket.name)
.ok()?
.namespace;
if !namespace.has_metric_stats() {
return None;
}

let mut tags = BTreeMap::from([
("mri".to_owned(), bucket.name().to_string()),
("mri".to_owned(), bucket.name.to_string()),
("mri.namespace".to_owned(), namespace.to_string()),
(
"outcome.id".to_owned(),
Expand Down Expand Up @@ -166,12 +165,12 @@ mod tests {
let scoping = scoping();
let mut bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap();

ms.track(scoping, &BucketView::from(&bucket), Outcome::Accepted);
ms.track(scoping, bucket.clone(), Outcome::Accepted);

bucket.metadata.merges = bucket.metadata.merges.saturating_add(41);
ms.track(
scoping,
&BucketView::from(&bucket),
bucket,
Outcome::RateLimited(Some(ReasonCode::new("foobar"))),
);

Expand Down Expand Up @@ -227,7 +226,7 @@ mod tests {

let scoping = scoping();
let bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
ms.track(scoping, &BucketView::from(&bucket), Outcome::Accepted);
ms.track(scoping, bucket, Outcome::Accepted);

drop(ms);

Expand All @@ -241,7 +240,7 @@ mod tests {
let scoping = scoping();
let bucket =
Bucket::parse(b"transactions/rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
ms.track(scoping, &BucketView::from(&bucket), Outcome::Accepted);
ms.track(scoping, bucket, Outcome::Accepted);

drop(ms);

Expand Down
22 changes: 19 additions & 3 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,14 @@ impl StoreService {
for mut bucket in buckets {
let namespace = encoder.prepare(&mut bucket);

let mut has_success = false;
// Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce
// each bucket separately, we only need to split buckets that exceed the size, but not
// batches.
for view in BucketsView::new(&[bucket]).by_size(batch_size).flatten() {
for view in BucketsView::new(std::slice::from_ref(&bucket))
.by_size(batch_size)
.flatten()
{
let message = self.create_metric_message(
scoping.organization_id,
scoping.project_id,
Expand All @@ -380,14 +384,26 @@ impl StoreService {

match result {
Ok(()) => {
self.metric_stats.track(scoping, &view, Outcome::Accepted);
has_success = true;
}
Err(e) => {
error.get_or_insert(e);
dropped += utils::extract_metric_quantities([view], mode);
}
}
}

// Tracking the volume here is slightly off, only one of the multiple bucket views can
// fail to produce. Since the views are sliced from the original bucket we cannot
// correctly attribute the amount of merges (volume) to the amount of slices that
// succeeded or not. -> Attribute the entire volume if at least one slice successfully
// produced.
//
// This logic will be improved iterated on and change once we move serialization logic
// back into the processor service.
if has_success {
self.metric_stats.track(scoping, bucket, Outcome::Accepted);
}
}

if let Some(error) = error {
Expand Down Expand Up @@ -834,7 +850,7 @@ impl StoreService {
);

message.and_then(|message| self.send_metric_message(namespace, message))?;
self.metric_stats.track(scoping, &view, Outcome::Accepted);
self.metric_stats.track(scoping, bucket, Outcome::Accepted);
}

Ok(())
Expand Down
31 changes: 31 additions & 0 deletions tests/integration/test_metric_stats.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
from typing import Any
import pytest
from attr import dataclass
Expand Down Expand Up @@ -78,3 +79,33 @@ def test_metric_stats_simple(
"outcome.id": "0",
}
assert len(metrics.other) == 2


def test_metric_stats_max_flush_bytes(
mini_sentry, relay_with_processing, metrics_consumer
):
mini_sentry.global_config["options"]["relay.metric-stats.rollout-rate"] = 1.0

metrics_consumer = metrics_consumer()

relay_config = copy.deepcopy(TEST_CONFIG)
relay_config["aggregator"]["max_flush_bytes"] = 150

relay = relay_with_processing(options=relay_config)

project_id = 42
project_config = mini_sentry.add_basic_project_config(project_id)
project_config["config"]["features"] = [
"organizations:custom-metrics",
"organizations:metric-stats",
]

# Metric is big enough to be split into multiple smaller metrics when emitting to Kafka,
# make sure the volume counted is still just 1.
relay.send_metrics(
project_id, "custom/foo:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17:18:19:20|d"
)

metrics = metric_stats_by_mri(metrics_consumer, 3)
assert metrics.volume["d:custom/foo@none"]["value"] == 1.0
assert len(metrics.other) == 2

0 comments on commit 88348d9

Please sign in to comment.