Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Add received_at timestamp in bucket metadata #3488

Merged
merged 32 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- Extract `frames.delay` metric from mobile spans. ([#3472](https://github.com/getsentry/relay/pull/3472))
- Consider "Bearer" (case-insensitive) a password. PII will scrub all strings matching that substring. ([#3484](https://github.com/getsentry/relay/pull/3484))
- Add support for `CF-Connecting-IP` header. ([#3496](https://github.com/getsentry/relay/pull/3496))
- Add `received_at` timestamp to `BucketMetadata` to measure the oldest received timestamp of the `Bucket`. ([#3488](https://github.com/getsentry/relay/pull/3488))

**Internal**:

Expand Down
9 changes: 9 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,13 +524,17 @@ struct SentryMetrics {
///
/// Defaults to 5.
pub meta_locations_max: usize,
/// Whether to override the `received_at` field in the `BucketMetadata` with the current
/// receive time of the instance.
pub override_received_at_metadata: bool,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In production, we would like to set this option to true only for pop relays, since we want to have the received_at metadata field set to the outermost relay timestamp that receives a given bucket.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have that handling in the processor as keep_metadata for example in handle_process_metrics which should already do the right thing.

}

impl Default for SentryMetrics {
fn default() -> Self {
Self {
meta_locations_expiry: 15 * 24 * 60 * 60,
meta_locations_max: 5,
override_received_at_metadata: false,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default we do not want to override the metadata to avoid unexpected behaviors due to the missing config param.

}
}
}
Expand Down Expand Up @@ -1947,6 +1951,11 @@ impl Config {
Duration::from_secs(self.values.sentry_metrics.meta_locations_expiry)
}

/// Returns whether we want to override the `received_at` field of `BucketMetadata`.
pub fn metrics_override_received_at_metadata(&self) -> bool {
self.values.sentry_metrics.override_received_at_metadata
}

/// Returns the default timeout for all upstream HTTP requests.
pub fn http_timeout(&self) -> Duration {
Duration::from_secs(self.values.http.timeout.into())
Expand Down
69 changes: 45 additions & 24 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,14 +899,15 @@ mod tests {
}
}

fn some_bucket() -> Bucket {
fn some_bucket(timestamp: Option<UnixTimestamp>) -> Bucket {
let timestamp = timestamp.map_or(UnixTimestamp::from_secs(999994711), |t| t);
Bucket {
timestamp: UnixTimestamp::from_secs(999994711),
timestamp,
width: 0,
name: "c:transactions/foo".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
metadata: BucketMetadata::new(timestamp),
}
}

Expand All @@ -916,7 +917,7 @@ mod tests {
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut aggregator = Aggregator::new(test_config());

let bucket1 = some_bucket();
let bucket1 = some_bucket(None);

let mut bucket2 = bucket1.clone();
bucket2.value = BucketValue::counter(43.into());
Expand Down Expand Up @@ -1006,7 +1007,7 @@ mod tests {
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut aggregator = Aggregator::new(config);

let bucket1 = some_bucket();
let bucket1 = some_bucket(None);

let mut bucket2 = bucket1.clone();
bucket2.timestamp = UnixTimestamp::from_secs(999994712);
Expand Down Expand Up @@ -1069,8 +1070,12 @@ mod tests {
let mut aggregator = Aggregator::new(config);

// It's OK to have same metric with different projects:
aggregator.merge(project_key1, some_bucket(), None).unwrap();
aggregator.merge(project_key2, some_bucket(), None).unwrap();
aggregator
.merge(project_key1, some_bucket(None), None)
.unwrap();
aggregator
.merge(project_key2, some_bucket(None), None)
.unwrap();

assert_eq!(aggregator.buckets.len(), 2);
}
Expand Down Expand Up @@ -1151,13 +1156,14 @@ mod tests {
let mut aggregator = Aggregator::new(test_config());
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

let timestamp = UnixTimestamp::from_secs(999994711);
let bucket = Bucket {
timestamp: UnixTimestamp::from_secs(999994711),
timestamp,
width: 0,
name: "c:transactions/foo@none".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
metadata: BucketMetadata::new(timestamp),
};
let bucket_key = BucketKey {
project_key,
Expand Down Expand Up @@ -1407,13 +1413,14 @@ mod tests {

#[test]
fn test_aggregator_cost_enforcement_total() {
let timestamp = UnixTimestamp::from_secs(999994711);
let bucket = Bucket {
timestamp: UnixTimestamp::from_secs(999994711),
timestamp,
width: 0,
name: "c:transactions/foo".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
metadata: BucketMetadata::new(timestamp),
};

let mut aggregator = Aggregator::new(test_config());
Expand All @@ -1438,13 +1445,14 @@ mod tests {
let mut config = test_config();
config.max_project_key_bucket_bytes = Some(1);

let timestamp = UnixTimestamp::from_secs(999994711);
let bucket = Bucket {
timestamp: UnixTimestamp::from_secs(999994711),
timestamp,
width: 0,
name: "c:transactions/foo".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
metadata: BucketMetadata::new(timestamp),
};

let mut aggregator = Aggregator::new(config);
Expand Down Expand Up @@ -1475,23 +1483,36 @@ mod tests {
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut aggregator = Aggregator::new(config);

let bucket1 = some_bucket();
let bucket2 = some_bucket();
let bucket1 = some_bucket(Some(UnixTimestamp::from_secs(999994711)));
let bucket2 = some_bucket(Some(UnixTimestamp::from_secs(999994711)));

// Create a bucket with already 3 merges.
let mut bucket3 = some_bucket();
bucket3.metadata.merge(BucketMetadata::new());
bucket3.metadata.merge(BucketMetadata::new());
// We create a bucket with 3 merges and monotonically increasing timestamps.
let mut bucket3 = some_bucket(Some(UnixTimestamp::from_secs(999994711)));
bucket3
.metadata
.merge(BucketMetadata::new(UnixTimestamp::from_secs(999997811)));
bucket3
.metadata
.merge(BucketMetadata::new(UnixTimestamp::from_secs(999999811)));

aggregator.merge(project_key, bucket1, None).unwrap();
aggregator.merge(project_key, bucket2, None).unwrap();
aggregator.merge(project_key, bucket3, None).unwrap();
aggregator
.merge(project_key, bucket1.clone(), None)
.unwrap();
aggregator
.merge(project_key, bucket2.clone(), None)
.unwrap();
aggregator
.merge(project_key, bucket3.clone(), None)
.unwrap();

let buckets: Vec<_> = aggregator.buckets.values().map(|v| &v.metadata).collect();
insta::assert_debug_snapshot!(buckets, @r###"
let buckets_metadata: Vec<_> = aggregator.buckets.values().map(|v| &v.metadata).collect();
insta::assert_debug_snapshot!(buckets_metadata, @r###"
[
BucketMetadata {
merges: 5,
received_at: Some(
UnixTimestamp(999994711),
),
},
]
"###);
Expand Down
5 changes: 3 additions & 2 deletions relay-metrics/src/aggregatorservice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,14 @@ mod tests {
}

fn some_bucket() -> Bucket {
let timestamp = UnixTimestamp::from_secs(999994711);
Bucket {
timestamp: UnixTimestamp::from_secs(999994711),
timestamp,
width: 0,
name: "c:transactions/foo".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
metadata: BucketMetadata::new(timestamp),
}
}

Expand Down
55 changes: 47 additions & 8 deletions relay-metrics/src/bucket.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::min;
use std::collections::{BTreeMap, BTreeSet};
use std::hash::Hash;
use std::iter::FusedIterator;
Expand Down Expand Up @@ -502,7 +503,7 @@ pub struct Bucket {
/// # Statsd Format
///
/// In statsd, timestamps are part of the `|`-separated list following values. Timestamps start
/// with with the literal character `'T'` followed by the UNIX timestamp.
/// with the literal character `'T'` followed by the UNIX timestamp.
///
/// The timestamp must be a positive integer in decimal notation representing the value of the
/// UNIX timestamp.
Expand Down Expand Up @@ -748,33 +749,55 @@ pub struct BucketMetadata {
/// For example: Merging two un-merged buckets will yield a total
/// of `2` merges.
pub merges: NonZeroU32,

/// Received timestamp of the first metric in this bucket.
///
/// This field should be set to the time in which the first metric of a specific bucket was
/// received in the outermost internal Relay.
pub received_at: Option<UnixTimestamp>,
}

impl BucketMetadata {
/// Creates a fresh metadata instance.
///
/// The new metadata is initialized with `1` merge.
pub fn new() -> Self {
/// The new metadata is initialized with `1` merge and a given `received_at` timestamp.
pub fn new(received_at: UnixTimestamp) -> Self {
Self {
merges: NonZeroU32::MIN,
received_at: Some(received_at),
}
}

/// Whether the metadata does not contain more information than the default.
pub fn is_default(&self) -> bool {
let Self { merges } = self;
*merges == NonZeroU32::MIN
let Self {
merges,
received_at,
} = self;

*merges == NonZeroU32::MIN && received_at.is_none()
}

/// Merges another metadata object into the current one.
pub fn merge(&mut self, other: Self) {
self.merges = self.merges.saturating_add(other.merges.get());
self.received_at = match (self.received_at, other.received_at) {
(Some(received_at), None) => Some(received_at),
(None, Some(received_at)) => Some(received_at),
(Some(left_received_at), Some(right_received_at)) => {
Some(min(left_received_at, right_received_at))
}
(None, None) => None,
};
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl Default for BucketMetadata {
fn default() -> Self {
Self::new()
Self {
merges: NonZeroU32::MIN,
received_at: None,
}
}
}
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -907,6 +930,7 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
},
}
"###);
Expand Down Expand Up @@ -940,6 +964,7 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
},
}
"###);
Expand Down Expand Up @@ -991,6 +1016,7 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
},
}
"###);
Expand Down Expand Up @@ -1050,6 +1076,7 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
},
}
"###);
Expand Down Expand Up @@ -1079,6 +1106,7 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
},
}
"###);
Expand All @@ -1102,6 +1130,7 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
},
}
"###);
Expand Down Expand Up @@ -1275,7 +1304,9 @@ mod tests {
}
]"#;

let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
let mut buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
buckets[0].metadata = BucketMetadata::new(UnixTimestamp::from_secs(1615889440));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just put this into the json, right?


insta::assert_debug_snapshot!(buckets, @r###"
[
Bucket {
Expand All @@ -1297,6 +1328,9 @@ mod tests {
},
metadata: BucketMetadata {
merges: 1,
received_at: Some(
UnixTimestamp(1615889440),
),
},
},
]
Expand All @@ -1315,7 +1349,9 @@ mod tests {
}
]"#;

let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
let mut buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
buckets[0].metadata = BucketMetadata::new(UnixTimestamp::from_secs(1615889440));

insta::assert_debug_snapshot!(buckets, @r###"
[
Bucket {
Expand All @@ -1330,6 +1366,9 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: Some(
UnixTimestamp(1615889440),
),
},
},
]
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/endpoints/batch_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub async fn handle(
keep_metadata: body.relay.internal,
start_time: start_time.into_inner(),
sent_at: None,
override_received_at_metadata: state.config().metrics_override_received_at_metadata(),
});

(StatusCode::ACCEPTED, axum::Json(SendMetricsResponse {})).into_response()
Expand Down
3 changes: 3 additions & 0 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ fn queue_envelope(
sent_at: envelope.sent_at(),
project_key: envelope.meta().public_key(),
keep_metadata: envelope.meta().is_from_internal_relay(),
override_received_at_metadata: state
.config()
.metrics_override_received_at_metadata(),
});
}

Expand Down