Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Send metrics via the global endpoint #2902

Merged
merged 10 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Use a Lua script and in-memory cache for the cardinality limiting to reduce load on Redis. ([#2849](https://github.com/getsentry/relay/pull/2849))
- Extract metrics for file spans. ([#2874](https://github.com/getsentry/relay/pull/2874))
- Add an internal endpoint that allows Relays to submit metrics from multiple projects in a single request. ([#2869](https://github.com/getsentry/relay/pull/2869))
- Introduce the configuration option `http.global_metrics`. When enabled, Relay submits metric buckets not through regular project-scoped Envelopes, but instead through the global endpoint. When this Relay serves a high number of projects, this can reduce the overall request volume. ([#2902](https://github.com/getsentry/relay/pull/2902))
- Emit a `processor.message.duration` metric to assess the throughput of the internal CPU pool. ([#2877](https://github.com/getsentry/relay/pull/2877))

## 23.12.0
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 3 additions & 60 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,9 @@ impl BucketKey {
///
/// This is used for partition key computation and statsd logging.
fn hash64(&self) -> u64 {
BucketKeyRef {
project_key: self.project_key,
timestamp: self.timestamp,
metric_name: &self.metric_name,
tags: &self.tags,
}
.hash64()
let mut hasher = FnvHasher::default();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BucketKeyRef is no longer required. The hash created here and the hash used for partitioning do not have to be the same, which is why we can hash differently.

std::hash::Hash::hash(self, &mut hasher);
hasher.finish()
}

/// Estimates the number of bytes needed to encode the bucket key.
Expand All @@ -97,29 +93,6 @@ impl BucketKey {
}
}

/// Pendant to [`BucketKey`] for referenced data, not owned data.
///
/// This makes it possible to compute a hash for a [`Bucket`]
/// without destructing the bucket into a [`BucketKey`].
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
struct BucketKeyRef<'a> {
project_key: ProjectKey,
timestamp: UnixTimestamp,
metric_name: &'a str,
tags: &'a BTreeMap<String, String>,
}

impl<'a> BucketKeyRef<'a> {
/// Creates a 64-bit hash of the bucket key using FnvHasher.
///
/// This is used for partition key computation and statsd logging.
fn hash64(&self) -> u64 {
let mut hasher = FnvHasher::default();
std::hash::Hash::hash(self, &mut hasher);
hasher.finish()
}
}

/// Estimates the number of bytes needed to encode the tags.
///
/// Note that this does not necessarily match the exact memory footprint of the tags,
Expand Down Expand Up @@ -888,36 +861,6 @@ impl fmt::Debug for Aggregator {
}
}

/// Splits buckets into N logical partitions, determined by the bucket key.
pub fn partition_buckets(
project_key: ProjectKey,
buckets: impl IntoIterator<Item = Bucket>,
flush_partitions: Option<u64>,
) -> BTreeMap<Option<u64>, Vec<Bucket>> {
let flush_partitions = match flush_partitions {
None => return BTreeMap::from([(None, buckets.into_iter().collect())]),
Some(x) => x.max(1), // handle 0,
};
let mut partitions = BTreeMap::<_, Vec<Bucket>>::new();
for bucket in buckets {
let key = BucketKeyRef {
project_key,
timestamp: bucket.timestamp,
metric_name: &bucket.name,
tags: &bucket.tags,
};

let partition_key = key.hash64() % flush_partitions;
partitions
.entry(Some(partition_key))
.or_default()
.push(bucket);

relay_statsd::metric!(histogram(MetricHistograms::PartitionKeys) = partition_key);
}
partitions
}

#[cfg(test)]
mod tests {

Expand Down
8 changes: 0 additions & 8 deletions relay-metrics/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,6 @@ pub enum MetricHistograms {
/// time period (`false`) or after the initial delay has expired (`true`).
BucketsDelay,

///
/// Distribution of flush buckets over partition keys.
///
/// The distribution of buckets should be even.
/// If it is not, this metric should expose it.
PartitionKeys,

/// Distribution of invalid bucket timestamps observed, relative to the time of observation.
///
/// This is a temporary metric to better understand why we see so many invalid timestamp errors.
Expand All @@ -143,7 +136,6 @@ impl HistogramMetric for MetricHistograms {
Self::BucketsFlushed => "metrics.buckets.flushed",
Self::BucketsFlushedPerProject => "metrics.buckets.flushed_per_project",
Self::BucketsDelay => "metrics.buckets.delay",
Self::PartitionKeys => "metrics.buckets.partition_keys",
Self::InvalidBucketTimestamp => "metrics.buckets.invalid_timestamp",
}
}
Expand Down
90 changes: 59 additions & 31 deletions relay-metrics/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ use crate::BucketValue;
/// and buckets larger will be split up.
const BUCKET_SPLIT_FACTOR: usize = 32;

/// The base size of a serialized bucket in bytes.
///
/// This is the size of a bucket's fixed fields in JSON format, excluding the value and tags.
const BUCKET_SIZE: usize = 50;

/// The average size of values when serialized.
const AVG_VALUE_SIZE: usize = 8;

Expand Down Expand Up @@ -276,14 +281,10 @@ impl<'a> Iterator for BucketsViewBySizeIter<'a> {
}
SplitDecision::MoveToNextBatch => break,
SplitDecision::Split(at) => {
// Only certain buckets can be split, if the bucket can't be split,
// move it to the next batch.
if bucket.can_split() {
self.current = Index {
slice: self.current.slice,
bucket: self.current.bucket + at,
};
}
self.current = Index {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for whether the bucket can split has been moved into split_at. Invariants for splitting are still checked in the inner iterator.

slice: self.current.slice,
bucket: self.current.bucket + at,
};
break;
}
}
Expand Down Expand Up @@ -332,6 +333,7 @@ impl<'a> Serialize for BucketsView<'a> {
/// ```
///
/// A view can be split again into multiple smaller views.
#[derive(Clone)]
pub struct BucketView<'a> {
/// The source bucket.
inner: &'a Bucket,
Expand Down Expand Up @@ -427,6 +429,46 @@ impl<'a> BucketView<'a> {
Some(self)
}

/// Estimates the number of bytes needed to serialize the bucket without value.
///
/// Note that this does not match the exact size of the serialized payload. Instead, the size is
/// approximated through tags and a static overhead.
fn estimated_base_size(&self) -> usize {
BUCKET_SIZE + self.name().len() + aggregator::tags_cost(self.tags())
}

/// Estimates the number of bytes needed to serialize the bucket.
///
/// Note that this does not match the exact size of the serialized payload. Instead, the size is
/// approximated through the number of contained values, assuming an average size of serialized
/// values.
pub fn estimated_size(&self) -> usize {
self.estimated_base_size() + self.len() * AVG_VALUE_SIZE
}

/// Calculates a split for this bucket if its estimated serialization size exceeds a threshold.
///
/// There are three possible return values:
/// - `(Some, None)` if the bucket fits entirely into the size budget. There is no split.
/// - `(None, Some)` if the size budget cannot even hold the bucket name and tags. There is no
/// split, the entire bucket is moved.
/// - `(Some, Some)` if the bucket fits partially. Remaining values are moved into a new bucket
/// with all other information cloned.
///
/// This is an approximate function. The bucket is not actually serialized, but rather its
/// footprint is estimated through the number of data points contained. See
/// [`estimated_size`](Self::estimated_size) for more information.
pub fn split(self, size: usize, max_size: Option<usize>) -> (Option<Self>, Option<Self>) {
match split_at(&self, size, max_size.unwrap_or(0) / BUCKET_SPLIT_FACTOR) {
SplitDecision::BucketFits(_) => (Some(self), None),
SplitDecision::MoveToNextBatch => (None, Some(self)),
SplitDecision::Split(at) => {
let Range { start, end } = self.range.clone();
(self.clone().select(start..at), self.select(at..end))
}
}
}

/// Whether the bucket can be split into multiple.
///
/// Only set and distribution buckets can be split.
Expand Down Expand Up @@ -624,14 +666,18 @@ enum SplitDecision {
/// `estimate_size` for more information.
fn split_at(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) -> SplitDecision {
// If there's enough space for the entire bucket, do not perform a split.
let bucket_size = estimate_size(bucket);
let bucket_size = bucket.estimated_size();
if max_size >= bucket_size {
return SplitDecision::BucketFits(bucket_size);
}

if !bucket.can_split() {
return SplitDecision::MoveToNextBatch;
}

// If the bucket key can't even fit into the remaining length, move the entire bucket into
// the right-hand side.
let own_size = estimate_base_size(bucket);
let own_size = bucket.estimated_base_size();
if max_size < (own_size + AVG_VALUE_SIZE) {
// split_at must not be zero
return SplitDecision::MoveToNextBatch;
Expand All @@ -644,27 +690,9 @@ fn split_at(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) ->
// Perform a split with the remaining space after adding the key. We assume an average
// length of 8 bytes per value and compute the number of items fitting into the left side.
let split_at = (max_size - own_size) / AVG_VALUE_SIZE;

SplitDecision::Split(split_at)
}

/// Estimates the number of bytes needed to serialize the bucket without value.
///
/// Note that this does not match the exact size of the serialized payload. Instead, the size is
/// approximated through tags and a static overhead.
fn estimate_base_size(bucket: &BucketView<'_>) -> usize {
50 + bucket.name().len() + aggregator::tags_cost(bucket.tags())
}

/// Estimates the number of bytes needed to serialize the bucket.
///
/// Note that this does not match the exact size of the serialized payload. Instead, the size is
/// approximated through the number of contained values, assuming an average size of serialized
/// values.
fn estimate_size(bucket: &BucketView<'_>) -> usize {
estimate_base_size(bucket) + bucket.len() * AVG_VALUE_SIZE
}

#[cfg(test)]
mod tests {
use insta::assert_json_snapshot;
Expand Down Expand Up @@ -919,7 +947,7 @@ b3:42:75|s"#;
.by_size(100)
.map(|bv| {
let len: usize = bv.iter().map(|b| b.len()).sum();
let size: usize = bv.iter().map(|b| estimate_size(&b)).sum();
let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
(len, size)
})
.collect::<Vec<_>>();
Expand All @@ -945,7 +973,7 @@ b3:42:75|s"#;
.by_size(250)
.map(|bv| {
let len: usize = bv.iter().map(|b| b.len()).sum();
let size: usize = bv.iter().map(|b| estimate_size(&b)).sum();
let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
(len, size)
})
.collect::<Vec<_>>();
Expand All @@ -971,7 +999,7 @@ b3:42:75|s"#;
.by_size(500)
.map(|bv| {
let len: usize = bv.iter().map(|b| b.len()).sum();
let size: usize = bv.iter().map(|b| estimate_size(&b)).sum();
let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
(len, size)
})
.collect::<Vec<_>>();
Expand Down
3 changes: 2 additions & 1 deletion relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ bytes = { version = "1.4.0" }
chrono = { workspace = true, features = ["clock"] }
data-encoding = "2.3.3"
flate2 = "1.0.19"
fnv = "1.0.7"
futures = { workspace = true }
hash32 = { workspace = true }
hashbrown = { workspace = true }
Expand Down Expand Up @@ -95,7 +96,7 @@ rmp-serde = "1.1.1"
rust-embed = { version = "8.0.0", optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
smallvec = { workspace = true, features = ["drain_filter"] }
smallvec = { workspace = true, features = ["drain_filter"] }
sqlx = { version = "0.7.0", features = [
"macros",
"migrate",
Expand Down