Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ rust-version.workspace = true
edition.workspace = true
version.workspace = true
license.workspace = true
autobenches = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -26,5 +27,11 @@ datadog-ddsketch = { path = "../ddsketch"}
[lib]
bench = false

[[bench]]
name = "main"
harness = false
path = "benches/main.rs"

[dev-dependencies]
criterion = "0.5.1"
rand = "0.8.5"
7 changes: 7 additions & 0 deletions data-pipeline/benches/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
use criterion::criterion_main;

mod span_concentrator_bench;

criterion_main!(span_concentrator_bench::benches);
72 changes: 72 additions & 0 deletions data-pipeline/benches/span_concentrator_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
use std::{
collections::HashMap,
time::{self, Duration, SystemTime},
};

use criterion::{criterion_group, Criterion};
use data_pipeline::span_concentrator::SpanConcentrator;
use datadog_trace_protobuf::pb;

fn get_bucket_start(now: SystemTime, n: u64) -> i64 {
let start = now.duration_since(time::UNIX_EPOCH).unwrap() + Duration::from_secs(10 * n);
start.as_nanos() as i64
}

fn get_span(now: SystemTime, trace_id: u64, span_id: u64) -> pb::Span {
let mut metrics = HashMap::from([("_dd.measured".to_string(), 1.0)]);
if span_id == 1 {
metrics.insert("_dd.top_level".to_string(), 1.0);
}
let mut meta = HashMap::from([("db_name".to_string(), "postgres".to_string())]);
if span_id % 3 == 0 {
meta.insert("bucket_s3".to_string(), "aws_bucket".to_string());
}
pb::Span {
trace_id,
span_id,
service: "test-service".to_string(),
name: "test-name".to_string(),
resource: format!("test-{trace_id}"),
error: (span_id % 2) as i32,
metrics,
meta,
parent_id: span_id - 1,
start: get_bucket_start(now, trace_id),
duration: span_id as i64 % Duration::from_secs(10).as_nanos() as i64,
..Default::default()
}
}

pub fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("concentrator");
let now = SystemTime::now() - Duration::from_secs(10 * 100);
let concentrator = SpanConcentrator::new(
Duration::from_secs(10),
now,
true,
true,
vec!["db_name".to_string(), "bucket_s3".to_string()],
);
let mut spans = vec![];
for trace_id in 1..100 {
for span_id in 1..100 {
spans.push(get_span(now, trace_id, span_id));
}
}
group.bench_function("add_spans_to_concentrator", |b| {
b.iter_batched_ref(
|| (concentrator.clone(), spans.clone()),
|data| {
let concentrator = &mut data.0;
let spans = &data.1;
for span in spans {
concentrator.add_span(span);
}
},
criterion::BatchSize::LargeInput,
);
});
}
criterion_group!(benches, criterion_benchmark);
2 changes: 1 addition & 1 deletion data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

mod span_concentrator;
pub mod span_concentrator;
pub mod trace_exporter;
4 changes: 2 additions & 2 deletions data-pipeline/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fn get_peer_tags(span: &pb::Span, peer_tag_keys: &[String]) -> Vec<Tag> {
}

/// The stats computed from a group of span with the same AggregationKey
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub(super) struct GroupedStats {
hits: u64,
errors: u64,
Expand Down Expand Up @@ -140,7 +140,7 @@ impl GroupedStats {

/// A time bucket used for stats aggregation. It stores a map of GroupedStats storing the stats of
/// spans aggregated on their AggregationKey.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(super) struct StatsBucket {
data: HashMap<AggregationKey, GroupedStats>,
start: u64,
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/span_concentrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn should_ignore_span(span: &pb::Span, compute_stats_by_span_kind: bool) -> bool
/// When the SpanConcentrator is flushed it keeps the `buffer_len` most recent buckets and remove
/// all older buckets returning their content. When using force flush all buckets are flushed
/// regardless of their age.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SpanConcentrator {
/// Size of the time buckets used for aggregation in nanos
bucket_size: u64,
Expand Down