Skip to content

Commit

Permalink
feat(metrics): Actually batch by partition, set request header (#1440)
Browse files Browse the repository at this point in the history
Convert the dry run implemented in #1425 into an actual batching
mechanism that splits metrics buckets into logical partitions.

The partition_key has to be passed through ProjectCache, EnvelopeManager
and UpstreamRelay to be set as a header on the outgoing envelope
request.
  • Loading branch information
jjbayer committed Aug 31, 2022
1 parent 7892216 commit 14d431f
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 137 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- Defer dropping of projects to a background thread to speed up project cache eviction. ([#1410](https://github.com/getsentry/relay/pull/1410))
- Update store service to use generic Addr and minor changes to generic Addr. ([#1415](https://github.com/getsentry/relay/pull/1415))
- Added new Register for the Services that is initialized later than the current. ([#1421](https://github.com/getsentry/relay/pull/1421))
- Batch metrics buckets into logical partitions before sending them as envelopes. ([#1440](https://github.com/getsentry/relay/pull/1440))

**Features**:

Expand Down
202 changes: 84 additions & 118 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::max;
use std::collections::{btree_map, hash_map::Entry, BTreeMap, BTreeSet, HashMap};

use std::fmt;
Expand Down Expand Up @@ -940,7 +941,7 @@ pub struct AggregatorConfig {
/// The number of logical partitions that can receive flushed buckets.
///
/// If set, buckets are partitioned by (bucket key % flush_partitions), and routed
/// by setting the header `X-Relay-Shard`.
/// by setting the header `X-Sentry-Relay-Shard`.
pub flush_partitions: Option<u64>,

/// The age in seconds of the oldest allowed bucket timestamp.
Expand Down Expand Up @@ -1148,36 +1149,18 @@ pub struct HashedBucket {

/// A message containing a vector of buckets to be flushed.
///
/// Use [`into_buckets`](Self::into_buckets) to access the raw [`Bucket`]s. Handlers must respond to
/// this message with a `Result`:
/// Handlers must respond to this message with a `Result`:
/// - If flushing has succeeded or the buckets should be dropped for any reason, respond with `Ok`.
/// - If flushing fails and should be retried at a later time, respond with `Err` containing the
/// failed buckets. They will be merged back into the aggregator and flushed at a later time.
#[derive(Clone, Debug)]
pub struct FlushBuckets {
/// the project key
project_key: ProjectKey,
buckets: Vec<Bucket>,
}

impl FlushBuckets {
/// Creates a new message by consuming a vector of buckets.
pub fn new(project_key: ProjectKey, buckets: Vec<Bucket>) -> Self {
Self {
project_key,
buckets,
}
}

/// Consumes the buckets contained in this message.
pub fn into_buckets(self) -> Vec<Bucket> {
self.buckets
}

/// Returns the project key (formally project public key)
pub fn project_key(&self) -> ProjectKey {
self.project_key
}
/// The project key.
pub project_key: ProjectKey,
/// The logical partition to send this batch to.
pub partition_key: u64,
/// The buckets to be flushed.
pub buckets: Vec<Bucket>,
}

impl Message for FlushBuckets {
Expand Down Expand Up @@ -1413,7 +1396,7 @@ impl<T: Iterator<Item = Bucket>> FusedIterator for CappedBucketIter<T> {}
///
/// fn handle(&mut self, msg: FlushBuckets, _ctx: &mut Self::Context) -> Self::Result {
/// // Return `Ok` to consume the buckets or `Err` to send them back
/// Err(msg.into_buckets())
/// Err(msg.buckets)
/// }
/// }
/// ```
Expand Down Expand Up @@ -1749,6 +1732,7 @@ impl Aggregator {
buckets: Vec<HashedBucket>,
flush_partitions: u64,
) -> BTreeMap<u64, Vec<Bucket>> {
let flush_partitions = max(1, flush_partitions); // handle 0
let mut partitions = BTreeMap::<u64, Vec<Bucket>>::new();
for bucket in buckets {
let partition_key = bucket.hashed_key % flush_partitions;
Expand All @@ -1763,36 +1747,32 @@ impl Aggregator {
/// Split the provided buckets into batches and process each batch with the given function.
///
/// For each batch, log a histogram metric.
/// NOTE: This function can be inlined again once we are done with the dry run.
fn process_batches<F>(
&self,
buckets: impl IntoIterator<Item = Bucket>,
partition_key: Option<u64>,
partition_key: u64,
mut process: F,
) where
F: FnMut(Vec<Bucket>),
{
let capped_batches =
CappedBucketIter::new(buckets.into_iter(), self.config.max_flush_bytes);
let partition_tag = match partition_key {
Some(partition_key) => partition_key.to_string(),
None => "none".to_owned(),
};
let capped_batches: Vec<_> = capped_batches.collect();
if partition_tag != "none" {
relay_statsd::metric!(
histogram(MetricHistograms::BatchesPerPartition) = capped_batches.len() as f64,
partition_key = partition_tag.as_str(),
);
}
let partition_tag = partition_key.to_string();

for batch in capped_batches.into_iter() {
relay_statsd::metric!(
histogram(MetricHistograms::BucketsPerBatch) = batch.len() as f64,
partition_key = partition_tag.as_str(),
);
process(batch);
}
let num_batches = capped_batches
.map(|batch| {
relay_statsd::metric!(
histogram(MetricHistograms::BucketsPerBatch) = batch.len() as f64,
partition_key = partition_tag.as_str(),
);
process(batch);
})
.count();

relay_statsd::metric!(
histogram(MetricHistograms::BatchesPerPartition) = num_batches as f64,
partition_key = partition_tag.as_str(),
);
}

/// Sends the [`FlushBuckets`] message to the receiver.
Expand All @@ -1816,46 +1796,35 @@ impl Aggregator {
);
total_bucket_count += bucket_count;

// Simulate the behavior of partitioning buckets by logical key:
let project_buckets: Box<dyn Iterator<Item = Bucket>> = if let Some(num_partitions) =
self.config.flush_partitions
{
let partitioned_buckets = self.partition_buckets(project_buckets, num_partitions);
let mut all_project_batches = Vec::<Vec<Bucket>>::new();
for (partition_key, buckets) in partitioned_buckets {
self.process_batches(buckets, Some(partition_key), |batch| {
// This is just a dry run. Put the buckets back into the vector
all_project_batches.push(batch);
});
}
Box::new(all_project_batches.into_iter().flatten())
} else {
// Simply send buckets as before.
Box::new(project_buckets.into_iter().map(|x| x.bucket))
};

// Actually flush buckets:
self.process_batches(project_buckets, None, |batch| {
let fut = self
.receiver
.send(FlushBuckets::new(project_key, batch))
.into_actor(self)
.and_then(move |result, slf, _ctx| {
if let Err(buckets) = result {
relay_log::trace!(
"returned {} buckets from receiver, merging back",
buckets.len()
);
slf.merge_all(project_key, buckets).ok();
}
fut::ok(())
})
.drop_err();

if let Some(context) = context.as_deref_mut() {
fut.spawn(context);
}
});
let num_partitions = self.config.flush_partitions.unwrap_or(1);
let partitioned_buckets = self.partition_buckets(project_buckets, num_partitions);
for (partition_key, buckets) in partitioned_buckets {
self.process_batches(buckets, partition_key, |batch| {
let fut = self
.receiver
.send(FlushBuckets {
project_key,
partition_key,
buckets: batch,
})
.into_actor(self)
.and_then(move |result, slf, _ctx| {
if let Err(buckets) = result {
relay_log::trace!(
"returned {} buckets from receiver, merging back",
buckets.len()
);
slf.merge_all(project_key, buckets).ok();
}
fut::ok(())
})
.drop_err();

if let Some(context) = context.as_deref_mut() {
fut.spawn(context);
}
});
}
}

relay_statsd::metric!(histogram(MetricHistograms::BucketsFlushed) = total_bucket_count);
Expand Down Expand Up @@ -2043,7 +2012,7 @@ mod tests {
type Result = Result<(), Vec<Bucket>>;

fn handle(&mut self, msg: FlushBuckets, _ctx: &mut Self::Context) -> Self::Result {
let buckets = msg.into_buckets();
let buckets = msg.buckets;
relay_log::debug!("received buckets: {:#?}", buckets);
if self.reject_all {
return Err(buckets);
Expand Down Expand Up @@ -3062,7 +3031,8 @@ mod tests {
);
}

fn run_test_bucket_partitioning(flush_partitions: Option<u64>, expected: Vec<String>) {
#[must_use]
fn run_test_bucket_partitioning(flush_partitions: Option<u64>) -> Vec<String> {
let config = AggregatorConfig {
max_flush_bytes: 1000,
flush_partitions,
Expand Down Expand Up @@ -3093,38 +3063,34 @@ mod tests {
aggregator.try_flush(None);
});

assert_eq!(
captures
.into_iter()
.filter(|x| x.contains("per_batch") || x.contains("batches_per_partition"))
.collect::<Vec<_>>(),
expected
);
captures
.into_iter()
.filter(|x| x.contains("per_batch") || x.contains("batches_per_partition"))
.collect::<Vec<_>>()
}

#[test]
fn test_bucket_partitioning() {
// TODO: Also test with different `max_flush_bytes`.
// It currently looks like setting a small max_flush_bytes leads to no buckets being
// flushed at all.
for (flush_partitions, expected) in [
(
None,
vec!["metrics.buckets.per_batch:2|h|#partition_key:none".to_owned()],
),
(
Some(5),
vec![
"metrics.buckets.batches_per_partition:1|h|#partition_key:0".to_owned(),
"metrics.buckets.per_batch:1|h|#partition_key:0".to_owned(),
"metrics.buckets.batches_per_partition:1|h|#partition_key:3".to_owned(),
"metrics.buckets.per_batch:1|h|#partition_key:3".to_owned(),
"metrics.buckets.per_batch:2|h|#partition_key:none".to_owned(),
],
),
] {
run_test_bucket_partitioning(flush_partitions, expected)
}
fn test_bucket_partitioning_dummy() {
let output = run_test_bucket_partitioning(None);
insta::assert_debug_snapshot!(output, @r###"
[
"metrics.buckets.per_batch:2|h|#partition_key:0",
"metrics.buckets.batches_per_partition:1|h|#partition_key:0",
]
"###);
}

#[test]
fn test_bucket_partitioning_128() {
let output = run_test_bucket_partitioning(Some(128));
insta::assert_debug_snapshot!(output, @r###"
[
"metrics.buckets.per_batch:1|h|#partition_key:59",
"metrics.buckets.batches_per_partition:1|h|#partition_key:59",
"metrics.buckets.per_batch:1|h|#partition_key:62",
"metrics.buckets.batches_per_partition:1|h|#partition_key:62",
]
"###);
}

fn test_capped_iter_completeness(max_flush_bytes: usize, expected_elements: usize) {
Expand Down
Loading

0 comments on commit 14d431f

Please sign in to comment.