Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Add received_at timestamp in bucket metadata #3488

Merged
merged 32 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- Extract `frames.delay` metric from mobile spans. ([#3472](https://github.com/getsentry/relay/pull/3472))
- Consider "Bearer" (case-insensitive) a password. PII will scrub all strings matching that substring. ([#3484](https://github.com/getsentry/relay/pull/3484))
- Add support for `CF-Connecting-IP` header. ([#3496](https://github.com/getsentry/relay/pull/3496))
- Add `received_at` timestamp to `BucketMetadata` to measure the oldest received timestamp of the `Bucket`. ([#3488](https://github.com/getsentry/relay/pull/3488))

**Internal**:

Expand Down
69 changes: 45 additions & 24 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,14 +899,15 @@ mod tests {
}
}

fn some_bucket() -> Bucket {
fn some_bucket(timestamp: Option<UnixTimestamp>) -> Bucket {
let timestamp = timestamp.map_or(UnixTimestamp::from_secs(999994711), |t| t);
Bucket {
timestamp: UnixTimestamp::from_secs(999994711),
timestamp,
width: 0,
name: "c:transactions/foo".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
metadata: BucketMetadata::new(timestamp),
}
}

Expand All @@ -916,7 +917,7 @@ mod tests {
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut aggregator = Aggregator::new(test_config());

let bucket1 = some_bucket();
let bucket1 = some_bucket(None);

let mut bucket2 = bucket1.clone();
bucket2.value = BucketValue::counter(43.into());
Expand Down Expand Up @@ -1006,7 +1007,7 @@ mod tests {
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut aggregator = Aggregator::new(config);

let bucket1 = some_bucket();
let bucket1 = some_bucket(None);

let mut bucket2 = bucket1.clone();
bucket2.timestamp = UnixTimestamp::from_secs(999994712);
Expand Down Expand Up @@ -1069,8 +1070,12 @@ mod tests {
let mut aggregator = Aggregator::new(config);

// It's OK to have same metric with different projects:
aggregator.merge(project_key1, some_bucket(), None).unwrap();
aggregator.merge(project_key2, some_bucket(), None).unwrap();
aggregator
.merge(project_key1, some_bucket(None), None)
.unwrap();
aggregator
.merge(project_key2, some_bucket(None), None)
.unwrap();

assert_eq!(aggregator.buckets.len(), 2);
}
Expand Down Expand Up @@ -1151,13 +1156,14 @@ mod tests {
let mut aggregator = Aggregator::new(test_config());
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

let timestamp = UnixTimestamp::from_secs(999994711);
let bucket = Bucket {
timestamp: UnixTimestamp::from_secs(999994711),
timestamp,
width: 0,
name: "c:transactions/foo@none".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
metadata: BucketMetadata::new(timestamp),
};
let bucket_key = BucketKey {
project_key,
Expand Down Expand Up @@ -1407,13 +1413,14 @@ mod tests {

#[test]
fn test_aggregator_cost_enforcement_total() {
let timestamp = UnixTimestamp::from_secs(999994711);
let bucket = Bucket {
timestamp: UnixTimestamp::from_secs(999994711),
timestamp,
width: 0,
name: "c:transactions/foo".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
metadata: BucketMetadata::new(timestamp),
};

let mut aggregator = Aggregator::new(test_config());
Expand All @@ -1438,13 +1445,14 @@ mod tests {
let mut config = test_config();
config.max_project_key_bucket_bytes = Some(1);

let timestamp = UnixTimestamp::from_secs(999994711);
let bucket = Bucket {
timestamp: UnixTimestamp::from_secs(999994711),
timestamp,
width: 0,
name: "c:transactions/foo".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
metadata: BucketMetadata::new(timestamp),
};

let mut aggregator = Aggregator::new(config);
Expand Down Expand Up @@ -1475,23 +1483,36 @@ mod tests {
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut aggregator = Aggregator::new(config);

let bucket1 = some_bucket();
let bucket2 = some_bucket();
let bucket1 = some_bucket(Some(UnixTimestamp::from_secs(999994711)));
let bucket2 = some_bucket(Some(UnixTimestamp::from_secs(999994711)));

// Create a bucket with already 3 merges.
let mut bucket3 = some_bucket();
bucket3.metadata.merge(BucketMetadata::new());
bucket3.metadata.merge(BucketMetadata::new());
// We create a bucket with 3 merges and monotonically increasing timestamps.
let mut bucket3 = some_bucket(Some(UnixTimestamp::from_secs(999994711)));
bucket3
.metadata
.merge(BucketMetadata::new(UnixTimestamp::from_secs(999997811)));
bucket3
.metadata
.merge(BucketMetadata::new(UnixTimestamp::from_secs(999999811)));

aggregator.merge(project_key, bucket1, None).unwrap();
aggregator.merge(project_key, bucket2, None).unwrap();
aggregator.merge(project_key, bucket3, None).unwrap();
aggregator
.merge(project_key, bucket1.clone(), None)
.unwrap();
aggregator
.merge(project_key, bucket2.clone(), None)
.unwrap();
aggregator
.merge(project_key, bucket3.clone(), None)
.unwrap();

let buckets: Vec<_> = aggregator.buckets.values().map(|v| &v.metadata).collect();
insta::assert_debug_snapshot!(buckets, @r###"
let buckets_metadata: Vec<_> = aggregator.buckets.values().map(|v| &v.metadata).collect();
insta::assert_debug_snapshot!(buckets_metadata, @r###"
[
BucketMetadata {
merges: 5,
received_at: Some(
UnixTimestamp(999994711),
),
},
]
"###);
Expand Down
5 changes: 3 additions & 2 deletions relay-metrics/src/aggregatorservice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,14 @@ mod tests {
}

fn some_bucket() -> Bucket {
let timestamp = UnixTimestamp::from_secs(999994711);
Bucket {
timestamp: UnixTimestamp::from_secs(999994711),
timestamp,
width: 0,
name: "c:transactions/foo".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
metadata: BucketMetadata::new(timestamp),
}
}

Expand Down
86 changes: 78 additions & 8 deletions relay-metrics/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ pub struct Bucket {
/// # Statsd Format
///
/// In statsd, timestamps are part of the `|`-separated list following values. Timestamps start
/// with with the literal character `'T'` followed by the UNIX timestamp.
/// with the literal character `'T'` followed by the UNIX timestamp.
///
/// The timestamp must be a positive integer in decimal notation representing the value of the
/// UNIX timestamp.
Expand Down Expand Up @@ -748,33 +748,52 @@ pub struct BucketMetadata {
/// For example: Merging two un-merged buckets will yield a total
/// of `2` merges.
pub merges: NonZeroU32,

/// Received timestamp of the first metric in this bucket.
///
/// This field should be set to the time in which the first metric of a specific bucket was
/// received in the outermost internal Relay.
pub received_at: Option<UnixTimestamp>,
}

impl BucketMetadata {
/// Creates a fresh metadata instance.
///
/// The new metadata is initialized with `1` merge.
pub fn new() -> Self {
/// The new metadata is initialized with `1` merge and a given `received_at` timestamp.
pub fn new(received_at: UnixTimestamp) -> Self {
Self {
merges: NonZeroU32::MIN,
received_at: Some(received_at),
}
}

/// Whether the metadata does not contain more information than the default.
pub fn is_default(&self) -> bool {
let Self { merges } = self;
*merges == NonZeroU32::MIN
let Self {
merges,
received_at,
} = self;

*merges == NonZeroU32::MIN && received_at.is_none()
}

/// Merges another metadata object into the current one.
pub fn merge(&mut self, other: Self) {
self.merges = self.merges.saturating_add(other.merges.get());
self.received_at = match (self.received_at, other.received_at) {
(Some(received_at), None) => Some(received_at),
(None, Some(received_at)) => Some(received_at),
(left, right) => left.min(right),
};
}
}

impl Default for BucketMetadata {
fn default() -> Self {
Self::new()
Self {
merges: NonZeroU32::MIN,
received_at: None,
}
}
}
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -907,6 +926,7 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
},
}
"###);
Expand Down Expand Up @@ -940,6 +960,7 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
},
}
"###);
Expand Down Expand Up @@ -991,6 +1012,7 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
},
}
"###);
Expand Down Expand Up @@ -1050,6 +1072,7 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
},
}
"###);
Expand Down Expand Up @@ -1079,6 +1102,7 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
},
}
"###);
Expand All @@ -1102,6 +1126,7 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: None,
},
}
"###);
Expand Down Expand Up @@ -1275,7 +1300,9 @@ mod tests {
}
]"#;

let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
let mut buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
buckets[0].metadata = BucketMetadata::new(UnixTimestamp::from_secs(1615889440));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just put this into the json, right?


insta::assert_debug_snapshot!(buckets, @r###"
[
Bucket {
Expand All @@ -1297,6 +1324,9 @@ mod tests {
},
metadata: BucketMetadata {
merges: 1,
received_at: Some(
UnixTimestamp(1615889440),
),
},
},
]
Expand All @@ -1315,7 +1345,9 @@ mod tests {
}
]"#;

let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
let mut buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
buckets[0].metadata = BucketMetadata::new(UnixTimestamp::from_secs(1615889440));

insta::assert_debug_snapshot!(buckets, @r###"
[
Bucket {
Expand All @@ -1330,6 +1362,9 @@ mod tests {
tags: {},
metadata: BucketMetadata {
merges: 1,
received_at: Some(
UnixTimestamp(1615889440),
),
},
},
]
Expand Down Expand Up @@ -1407,4 +1442,39 @@ mod tests {
let serialized = serde_json::to_string_pretty(&buckets).unwrap();
assert_eq!(json, serialized);
}

#[test]
fn test_bucket_metadata_merge() {
let mut metadata = BucketMetadata::default();

let other_metadata = BucketMetadata::default();
metadata.merge(other_metadata);
assert_eq!(
metadata,
BucketMetadata {
merges: NonZeroU32::new(2).unwrap(),
received_at: None
}
);

let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(10));
metadata.merge(other_metadata);
assert_eq!(
metadata,
BucketMetadata {
merges: NonZeroU32::new(3).unwrap(),
received_at: Some(UnixTimestamp::from_secs(10))
}
);

let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(20));
metadata.merge(other_metadata);
assert_eq!(
metadata,
BucketMetadata {
merges: NonZeroU32::new(4).unwrap(),
received_at: Some(UnixTimestamp::from_secs(10))
}
);
}
}