Skip to content

Commit

Permalink
feat: Implement dry run (untested)
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Aug 18, 2022
1 parent 46879b6 commit a9d9390
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 29 deletions.
60 changes: 31 additions & 29 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1144,10 +1144,11 @@ impl Ord for QueuedBucket {
}
}

// A Bucket and its hashed key.
// This is cheaper to pass around than a (BucketKey, Bucket) pair.
// TODO: Find better name for this
struct HashedBucket {
/// A Bucket and its hashed key.
/// This is cheaper to pass around than a (BucketKey, Bucket) pair.
pub struct HashedBucket {
// This is only public because pop_flush_buckets is used in benchmark.
// TODO: Find better name for this struct
hashed_key: u32,
bucket: Bucket,
}
Expand Down Expand Up @@ -1751,39 +1752,38 @@ impl Aggregator {
}

/// Split buckets into N logical partitions, determined by the bucket key.
fn partition_buckets(&self, buckets: Vec<HashedBucket>) -> BTreeMap<u32, Vec<HashedBucket>> {
let flush_partitions = match self.config.flush_partitions {
Some(n) => n,
None => {
// This is not intended to happen.
return BTreeMap::from([(0, buckets)]);
}
};
let partitions = BTreeMap::<u32, Vec<HashedBucket>>::new();
fn partition_buckets(
&self,
buckets: Vec<HashedBucket>,
flush_partitions: u32,
) -> BTreeMap<u32, Vec<Bucket>> {
let mut partitions = BTreeMap::<u32, Vec<Bucket>>::new();
for bucket in buckets {
let partition_key = bucket.hashed_key % flush_partitions;
partitions.entry(partition_key).or_default().push(bucket);
partitions
.entry(partition_key)
.or_default()
.push(bucket.bucket);
}
partitions
}

fn process_batches<F>(
&self,
buckets: Vec<HashedBucket>,
partition_key: Option<usize>,
process: F,
) where
F: FnOnce(Vec<HashedBucket>),
/// Split the provided buckets into batches and process each batch with the given function.
///
/// For each batch, log a histogram metric.
fn process_batches<F>(&self, buckets: Vec<Bucket>, partition_key: Option<u32>, mut process: F)
where
F: FnMut(Vec<Bucket>),
{
let capped_batches = CappedBucketIter::new(buckets, self.config.max_flush_bytes);
let partition_tag = match partition_key {
Some(partition_key) => format!("{partition_key}").as_str(),
None => "none",
Some(partition_key) => format!("{partition_key}"),
None => "none".to_owned(),
};
for (i, batch) in capped_batches.enumerate() {
relay_statsd::metric!(
histogram(MetricHistograms::BucketsPerBatch) = batch.len() as f64,
partition_key = partition_tag,
partition_key = partition_tag.as_str(),
batch_index = format!("{i}").as_str(),
);
process(batch);
Expand Down Expand Up @@ -1813,9 +1813,8 @@ impl Aggregator {

// Simulate the behavior of partitioning buckets by logical key:
let project_buckets = if let Some(num_partitions) = self.config.flush_partitions {
let partitioned_buckets = self.partition_buckets(project_buckets);
// TODO: partition_bucket could already return Bucket instead of HashedBucket,
let restored_project_buckets = Vec::new();
let partitioned_buckets = self.partition_buckets(project_buckets, num_partitions);
let mut restored_project_buckets = Vec::new();
for (partition_key, buckets) in partitioned_buckets {
self.process_batches(buckets, Some(partition_key), |batch| {
// This is just a dry run. Put the buckets back into the vector
Expand All @@ -1824,8 +1823,10 @@ impl Aggregator {
}
restored_project_buckets
} else {
// Nothing changes.
project_buckets
// Simply send buckets as before.
// TODO: Make process_batches and CappedIterator accepted an Iterator as input,
// to make this allocation unnecessary.
project_buckets.into_iter().map(|x| x.bucket).collect()
};

// Actually flush buckets:
Expand Down Expand Up @@ -2056,6 +2057,7 @@ mod tests {
max_tag_value_length: 200,
max_project_key_bucket_bytes: None,
max_total_bucket_bytes: None,
..Default::default()
}
}

Expand Down
1 change: 1 addition & 0 deletions relay-metrics/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl HistogramMetric for MetricHistograms {
Self::BucketsFlushed => "metrics.buckets.flushed",
Self::BucketsFlushedPerProject => "metrics.buckets.flushed_per_project",
Self::BucketsDelay => "metrics.buckets.delay",
Self::BucketsPerBatch => "metrics.buckets.per_batch",
}
}
}
Expand Down

0 comments on commit a9d9390

Please sign in to comment.