Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Actually batch by partition, set request header [INGEST-1562] #1440

Merged
merged 6 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- Defer dropping of projects to a background thread to speed up project cache eviction. ([#1410](https://github.com/getsentry/relay/pull/1410))
- Update store service to use generic Addr and minor changes to generic Addr. ([#1415](https://github.com/getsentry/relay/pull/1415))
- Added new Register for the Services that is initialized later than the current. ([#1421](https://github.com/getsentry/relay/pull/1421))
- Batch metrics buckets into logical partitions before sending them as envelopes. ([#1440](https://github.com/getsentry/relay/pull/1440))

**Features**:

Expand Down
202 changes: 84 additions & 118 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::max;
use std::collections::{btree_map, hash_map::Entry, BTreeMap, BTreeSet, HashMap};

use std::fmt;
Expand Down Expand Up @@ -940,7 +941,7 @@ pub struct AggregatorConfig {
/// The number of logical partitions that can receive flushed buckets.
///
/// If set, buckets are partitioned by (bucket key % flush_partitions), and routed
/// by setting the header `X-Relay-Shard`.
/// by setting the header `X-Sentry-Relay-Shard`.
pub flush_partitions: Option<u64>,

/// The age in seconds of the oldest allowed bucket timestamp.
Expand Down Expand Up @@ -1148,36 +1149,18 @@ pub struct HashedBucket {

/// A message containing a vector of buckets to be flushed.
///
/// Use [`into_buckets`](Self::into_buckets) to access the raw [`Bucket`]s. Handlers must respond to
/// this message with a `Result`:
/// Handlers must respond to this message with a `Result`:
/// - If flushing has succeeded or the buckets should be dropped for any reason, respond with `Ok`.
/// - If flushing fails and should be retried at a later time, respond with `Err` containing the
/// failed buckets. They will be merged back into the aggregator and flushed at a later time.
#[derive(Clone, Debug)]
pub struct FlushBuckets {
/// the project key
project_key: ProjectKey,
buckets: Vec<Bucket>,
}

impl FlushBuckets {
/// Creates a new message by consuming a vector of buckets.
pub fn new(project_key: ProjectKey, buckets: Vec<Bucket>) -> Self {
Self {
project_key,
buckets,
}
}

/// Consumes the buckets contained in this message.
pub fn into_buckets(self) -> Vec<Bucket> {
self.buckets
}

/// Returns the project key (formally project public key)
pub fn project_key(&self) -> ProjectKey {
self.project_key
}
/// The project key.
pub project_key: ProjectKey,
/// The logical partition to send this batch to.
pub partition_key: u64,
/// The buckets to be flushed.
pub buckets: Vec<Bucket>,
}

impl Message for FlushBuckets {
Expand Down Expand Up @@ -1413,7 +1396,7 @@ impl<T: Iterator<Item = Bucket>> FusedIterator for CappedBucketIter<T> {}
///
/// fn handle(&mut self, msg: FlushBuckets, _ctx: &mut Self::Context) -> Self::Result {
/// // Return `Ok` to consume the buckets or `Err` to send them back
/// Err(msg.into_buckets())
/// Err(msg.buckets)
/// }
/// }
/// ```
Expand Down Expand Up @@ -1749,6 +1732,7 @@ impl Aggregator {
buckets: Vec<HashedBucket>,
flush_partitions: u64,
) -> BTreeMap<u64, Vec<Bucket>> {
let flush_partitions = max(1, flush_partitions); // handle 0
let mut partitions = BTreeMap::<u64, Vec<Bucket>>::new();
for bucket in buckets {
let partition_key = bucket.hashed_key % flush_partitions;
Expand All @@ -1763,36 +1747,32 @@ impl Aggregator {
/// Split the provided buckets into batches and process each batch with the given function.
///
/// For each batch, log a histogram metric.
/// NOTE: This function can be inlined again once we are done with the dry run.
fn process_batches<F>(
&self,
buckets: impl IntoIterator<Item = Bucket>,
partition_key: Option<u64>,
partition_key: u64,
mut process: F,
) where
F: FnMut(Vec<Bucket>),
{
let capped_batches =
CappedBucketIter::new(buckets.into_iter(), self.config.max_flush_bytes);
let partition_tag = match partition_key {
Some(partition_key) => partition_key.to_string(),
None => "none".to_owned(),
};
let capped_batches: Vec<_> = capped_batches.collect();
if partition_tag != "none" {
relay_statsd::metric!(
histogram(MetricHistograms::BatchesPerPartition) = capped_batches.len() as f64,
partition_key = partition_tag.as_str(),
);
}
let partition_tag = partition_key.to_string();

for batch in capped_batches.into_iter() {
relay_statsd::metric!(
histogram(MetricHistograms::BucketsPerBatch) = batch.len() as f64,
partition_key = partition_tag.as_str(),
);
process(batch);
}
let num_batches = capped_batches
.map(|batch| {
relay_statsd::metric!(
histogram(MetricHistograms::BucketsPerBatch) = batch.len() as f64,
partition_key = partition_tag.as_str(),
);
process(batch);
})
.count();

relay_statsd::metric!(
histogram(MetricHistograms::BatchesPerPartition) = num_batches as f64,
partition_key = partition_tag.as_str(),
);
}

/// Sends the [`FlushBuckets`] message to the receiver.
Expand All @@ -1816,46 +1796,35 @@ impl Aggregator {
);
total_bucket_count += bucket_count;

// Simulate the behavior of partitioning buckets by logical key:
let project_buckets: Box<dyn Iterator<Item = Bucket>> = if let Some(num_partitions) =
self.config.flush_partitions
{
let partitioned_buckets = self.partition_buckets(project_buckets, num_partitions);
let mut all_project_batches = Vec::<Vec<Bucket>>::new();
for (partition_key, buckets) in partitioned_buckets {
self.process_batches(buckets, Some(partition_key), |batch| {
// This is just a dry run. Put the buckets back into the vector
all_project_batches.push(batch);
});
}
Box::new(all_project_batches.into_iter().flatten())
} else {
// Simply send buckets as before.
Box::new(project_buckets.into_iter().map(|x| x.bucket))
};

// Actually flush buckets:
self.process_batches(project_buckets, None, |batch| {
let fut = self
.receiver
.send(FlushBuckets::new(project_key, batch))
.into_actor(self)
.and_then(move |result, slf, _ctx| {
if let Err(buckets) = result {
relay_log::trace!(
"returned {} buckets from receiver, merging back",
buckets.len()
);
slf.merge_all(project_key, buckets).ok();
}
fut::ok(())
})
.drop_err();

if let Some(context) = context.as_deref_mut() {
fut.spawn(context);
}
});
let num_partitions = self.config.flush_partitions.unwrap_or(1);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that we always set the X-Sentry-Relay-Shard header, even though no partitioning was requested by config. We could easily change this behavior but I thought this would keep the code nice and simple.

let partitioned_buckets = self.partition_buckets(project_buckets, num_partitions);
for (partition_key, buckets) in partitioned_buckets {
self.process_batches(buckets, partition_key, |batch| {
let fut = self
.receiver
.send(FlushBuckets {
project_key,
partition_key,
buckets: batch,
})
.into_actor(self)
.and_then(move |result, slf, _ctx| {
if let Err(buckets) = result {
relay_log::trace!(
"returned {} buckets from receiver, merging back",
buckets.len()
);
slf.merge_all(project_key, buckets).ok();
}
fut::ok(())
})
.drop_err();

if let Some(context) = context.as_deref_mut() {
fut.spawn(context);
}
});
}
}

relay_statsd::metric!(histogram(MetricHistograms::BucketsFlushed) = total_bucket_count);
Expand Down Expand Up @@ -2043,7 +2012,7 @@ mod tests {
type Result = Result<(), Vec<Bucket>>;

fn handle(&mut self, msg: FlushBuckets, _ctx: &mut Self::Context) -> Self::Result {
let buckets = msg.into_buckets();
let buckets = msg.buckets;
relay_log::debug!("received buckets: {:#?}", buckets);
if self.reject_all {
return Err(buckets);
Expand Down Expand Up @@ -3062,7 +3031,8 @@ mod tests {
);
}

fn run_test_bucket_partitioning(flush_partitions: Option<u64>, expected: Vec<String>) {
#[must_use]
fn run_test_bucket_partitioning(flush_partitions: Option<u64>) -> Vec<String> {
jjbayer marked this conversation as resolved.
Show resolved Hide resolved
let config = AggregatorConfig {
max_flush_bytes: 1000,
flush_partitions,
Expand Down Expand Up @@ -3093,38 +3063,34 @@ mod tests {
aggregator.try_flush(None);
});

assert_eq!(
captures
.into_iter()
.filter(|x| x.contains("per_batch") || x.contains("batches_per_partition"))
.collect::<Vec<_>>(),
expected
);
captures
.into_iter()
.filter(|x| x.contains("per_batch") || x.contains("batches_per_partition"))
.collect::<Vec<_>>()
}

#[test]
fn test_bucket_partitioning() {
// TODO: Also test with different `max_flush_bytes`.
// It currently looks like setting a small max_flush_bytes leads to no buckets being
// flushed at all.
for (flush_partitions, expected) in [
(
None,
vec!["metrics.buckets.per_batch:2|h|#partition_key:none".to_owned()],
),
(
Some(5),
vec![
"metrics.buckets.batches_per_partition:1|h|#partition_key:0".to_owned(),
"metrics.buckets.per_batch:1|h|#partition_key:0".to_owned(),
"metrics.buckets.batches_per_partition:1|h|#partition_key:3".to_owned(),
"metrics.buckets.per_batch:1|h|#partition_key:3".to_owned(),
"metrics.buckets.per_batch:2|h|#partition_key:none".to_owned(),
],
),
] {
run_test_bucket_partitioning(flush_partitions, expected)
}
fn test_bucket_partitioning_dummy() {
let output = run_test_bucket_partitioning(None);
insta::assert_debug_snapshot!(output, @r###"
[
"metrics.buckets.per_batch:2|h|#partition_key:0",
"metrics.buckets.batches_per_partition:1|h|#partition_key:0",
]
"###);
}

#[test]
fn test_bucket_partitioning_128() {
let output = run_test_bucket_partitioning(Some(128));
insta::assert_debug_snapshot!(output, @r###"
[
"metrics.buckets.per_batch:1|h|#partition_key:59",
"metrics.buckets.batches_per_partition:1|h|#partition_key:59",
"metrics.buckets.per_batch:1|h|#partition_key:62",
"metrics.buckets.batches_per_partition:1|h|#partition_key:62",
]
"###);
}

fn test_capped_iter_completeness(max_flush_bytes: usize, expected_elements: usize) {
Expand Down
Loading