Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
132060a
Serverless Trace Mini Agent (#124)
thedavl Apr 4, 2023
1c791bf
Mini Agent: add trace normalization, root span calculation, logging (…
thedavl Apr 10, 2023
b7f8694
Mini Agent: Modify flushing logic, Serverless root span tags (#133)
thedavl Apr 25, 2023
a5bafc5
Mini Agent: Trace stats (#139)
thedavl Apr 28, 2023
8344dc1
Mini Agent: Verify GCP Env (#142)
thedavl May 1, 2023
8f43c43
Fix env_verifier + protobuf gitlab build issues (#148)
thedavl May 2, 2023
cd6eee2
Mini Agent: Support DD_SITE (#149)
thedavl May 3, 2023
194aa97
Mini Agent: Support env var set custom trace & trace stats intake url…
thedavl Jun 1, 2023
5bc563c
Mini Agent: Verify azure function env w/ filesystem (#174)
thedavl Jun 13, 2023
f73ef13
support dotnet (#177)
thedavl Jun 13, 2023
daeb839
Mini Agent: Integrate tag replacement (#195)
thedavl Jul 19, 2023
a04191c
Implement sending via sidecar (#192)
bwoebi Aug 4, 2023
40398c4
Refactor common unit test create_test_span function into trace-utils …
thedavl Aug 25, 2023
b056a8e
Http url string obfuscation (#228)
thedavl Sep 8, 2023
dee5f9f
Run Miri everywhere (#269)
danielsn Oct 6, 2023
0f7ded5
Reformat License and add SPDX headers
bantonsson Mar 5, 2024
97d7e94
Add comment wrapping and lenghts to rustfmt
bantonsson Mar 5, 2024
9c53e5b
Make verify timeout test in datadog-trace-mini-agent trigger timeout
bantonsson Apr 16, 2024
b2d6599
Add descriptions to cargo.toml files, mainly to help discover the repo
pierotibou May 15, 2024
57979cc
Collect some trace_api.* metrics in the trace flusher
iamluc May 7, 2024
c0c59ef
Refactor trace_utils.rs into smaller units.
hoolioh May 13, 2024
df0427a
adds azure app service tags to serverless mini agent traces for azure…
duncanpharvey May 29, 2024
b861ae9
refactor logic for setting azure span tags in serverless mini agent
duncanpharvey May 30, 2024
97b4ff8
apply formatting
duncanpharvey May 30, 2024
0266505
add mini agent version to config
duncanpharvey May 31, 2024
0a35e9c
fix unit test
duncanpharvey May 31, 2024
f421ac1
apply formatting
duncanpharvey May 31, 2024
43820fe
Add 'src_library' tag to metrics collect in the sidecar.
hoolioh May 31, 2024
34aa923
Merge branch 'main' into duncan-harvey/serverless-mini-agent-azure-ap…
duncanpharvey Jun 3, 2024
c2892d6
bump serverless mini agent version to 0.4.0
duncanpharvey Jun 3, 2024
67d6e2c
move trace_util test helpers behind a feature flag (#461)
ekump Jun 4, 2024
1247251
Skip normalization & obfuscation and coalesce instead (#475)
bwoebi Jun 7, 2024
41789b6
bump version of datadog-trace-mini-agent to 0.4.2 (#502)
duncanpharvey Jun 24, 2024
59ad03a
Add support for sending v0.4 traces. (#491)
hoolioh Jun 25, 2024
01978f5
Support v0.5 trace endpoint (#505)
astuyve Jun 27, 2024
f19bac7
Enable all benchmarks
bantonsson Jul 9, 2024
babc5ae
bump versions of datadog-serverless-trace-mini-agent and datadog-trac…
duncanpharvey Jul 11, 2024
9b7604c
ekump/APMSP-1279 benchmark trace exporting (#531)
ekump Jul 17, 2024
ce790da
Implement timeouts for both sending traces and telemetry. (#518)
hoolioh Jul 18, 2024
c2d9602
Fixes to make clippy version 1.80 happy (#550)
taegyunkim Jul 25, 2024
932a157
[Serverless Mini Agent] Add _dd.mini_agent_version tag to all spans f…
duncanpharvey Jul 31, 2024
d82e016
[Serverless Mini Agent] Run in Azure Spring Apps (#547)
duncanpharvey Jul 31, 2024
208edec
[SVLS-5049] It is okay to have a stats payload without stats (#567)
apiarian-datadog Aug 5, 2024
78aabe5
[Serverless Mini Agent] Use DogStatsD in Serverless (#616)
duncanpharvey Sep 13, 2024
0bd2b17
reduced allocation v04 span representation (#598)
ekump Sep 19, 2024
69d25e7
Support http-proxy for trace agent, remove proxy from dsd (#658)
astuyve Oct 2, 2024
58e1424
[Serverless Mini Agent] Add Span Tags for Azure Spring Apps (#672)
duncanpharvey Oct 15, 2024
dd7f2b8
feat: Prefer DD_PROXY_HTTPS over HTTPS_PROXY (#673)
astuyve Oct 16, 2024
c2c6b1b
Enable backports/deprecated features on hyper crate
ivoanjo Oct 17, 2024
48a976e
Fix all compatibility warnings emitted by hyper crate
ivoanjo Oct 18, 2024
2fa4a79
Refactor proxy handling.
hoolioh Nov 11, 2024
ad3a549
[chore] Use elapsed() if possible when calculating durations (#750)
danielsn Nov 20, 2024
3877fab
fix(alloc): potentially dangling temporary (#772)
morrisonlevi Dec 2, 2024
8067923
[trace-mini-agent] change logging crate (#799)
duncanista Dec 13, 2024
696e009
change logs to be `debug` (#806)
duncanista Dec 16, 2024
5d83ae2
Add _dd.gcrfx.resource_name to serverless compatibility layer for Go…
nina9753 Dec 18, 2024
8f4dd0a
ekump/APMSP-1756 add trace exporter integration tests (#897)
ekump Feb 26, 2025
cd118e0
[trace-mini-agent] add trace aggregator (#907)
duncanista Mar 6, 2025
bfd47b9
Add v05 support in the trace exporter (#898)
hoolioh Mar 10, 2025
20f0c5f
ekump/APMSP-1827 add warnings for panics (#915)
ekump Mar 14, 2025
fdd0c19
refactor(trace-utils): make flate2 and and hyper-proxy dependencies o…
paullegranddc Mar 17, 2025
db2fe68
chore: hyper 1.x migration (#946)
paullegranddc Mar 21, 2025
94f84ec
refactor: split collect_chunks into two methods (#1021)
paullegranddc Apr 16, 2025
1b01d1f
Add 'crates/trace-mini-agent/' from commit '94f84ec7b574009bef6bf8d4a…
duncanpharvey Apr 16, 2025
b59a137
rename to trace-agent, update version, pin dependencies with libdatad…
duncanpharvey Apr 16, 2025
246c991
remove description
duncanpharvey Apr 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,336 changes: 2,240 additions & 96 deletions Cargo.lock

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions crates/trace-agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[package]
name = "trace-agent"
version = "0.1.0"
license.workspace = true
edition.workspace = true

[lib]
bench = false

[dependencies]
anyhow = "1.0"
hyper = { version = "1.6", features = ["http1", "client", "server"] }
hyper-util = {version = "0.1", features = ["service"] }
tower = { version = "0.5.2", features = ["util"] }
http-body-util = "0.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"]}
async-trait = "0.1.64"
tracing = { version = "0.1", default-features = false }
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0"
ddcommon = { git = "https://github.com/DataDog/libdatadog/", rev = "3dab0bed2e144ce78c10a2378d1aff8fb5974f7d" }
datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog/", rev = "3dab0bed2e144ce78c10a2378d1aff8fb5974f7d" }
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog/", rev = "3dab0bed2e144ce78c10a2378d1aff8fb5974f7d", features = ["mini_agent"] }
datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog/", rev = "3dab0bed2e144ce78c10a2378d1aff8fb5974f7d" }
datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog/", rev = "3dab0bed2e144ce78c10a2378d1aff8fb5974f7d" }

[dev-dependencies]
rmp-serde = "1.1.1"
serial_test = "2.0.0"
duplicate = "0.4.1"
tempfile = "3.3.0"
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog/", rev = "3dab0bed2e144ce78c10a2378d1aff8fb5974f7d", features=["test-utils"] }
171 changes: 171 additions & 0 deletions crates/trace-agent/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use datadog_trace_utils::send_data::SendData;
use std::collections::VecDeque;

/// Maximum content size per payload uncompressed in bytes,
/// that the Datadog Trace API accepts. The value is 3.2 MB.
///
/// <https://github.com/DataDog/datadog-agent/blob/9d57c10a9eeb3916e661d35dbd23c6e36395a99d/pkg/trace/writer/trace.go#L27-L31>
pub const MAX_CONTENT_SIZE_BYTES: usize = (32 * 1_024 * 1_024) / 10;

#[allow(clippy::module_name_repetitions)]
pub struct TraceAggregator {
queue: VecDeque<SendData>,
max_content_size_bytes: usize,
buffer: Vec<SendData>,
}

impl Default for TraceAggregator {
fn default() -> Self {
TraceAggregator {
queue: VecDeque::new(),
max_content_size_bytes: MAX_CONTENT_SIZE_BYTES,
buffer: Vec::new(),
}
}
}

impl TraceAggregator {
#[allow(dead_code)]
#[allow(clippy::must_use_candidate)]
pub fn new(max_content_size_bytes: usize) -> Self {
TraceAggregator {
queue: VecDeque::new(),
max_content_size_bytes,
buffer: Vec::new(),
}
}

pub fn add(&mut self, p: SendData) {
self.queue.push_back(p);
}

pub fn get_batch(&mut self) -> Vec<SendData> {
let mut batch_size = 0;

// Fill the batch
while batch_size < self.max_content_size_bytes {
if let Some(payload) = self.queue.pop_front() {
let payload_size = payload.len();

// Put stats back in the queue
if batch_size + payload_size > self.max_content_size_bytes {
self.queue.push_front(payload);
break;
}
batch_size += payload_size;
self.buffer.push(payload);
} else {
break;
}
}

std::mem::take(&mut self.buffer)
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use datadog_trace_utils::{
trace_utils::TracerHeaderTags, tracer_payload::TracerPayloadCollection,
};
use ddcommon::Endpoint;

use super::*;

#[test]
fn test_add() {
let mut aggregator = TraceAggregator::default();
let tracer_header_tags = TracerHeaderTags {
lang: "lang",
lang_version: "lang_version",
lang_interpreter: "lang_interpreter",
lang_vendor: "lang_vendor",
tracer_version: "tracer_version",
container_id: "container_id",
client_computed_top_level: true,
client_computed_stats: true,
dropped_p0_traces: 0,
dropped_p0_spans: 0,
};
let payload = SendData::new(
1,
TracerPayloadCollection::V07(Vec::new()),
tracer_header_tags,
&Endpoint::from_slice("localhost"),
);

aggregator.add(payload.clone());
assert_eq!(aggregator.queue.len(), 1);
assert_eq!(aggregator.queue[0].is_empty(), payload.is_empty());
}

#[test]
fn test_get_batch() {
let mut aggregator = TraceAggregator::default();
let tracer_header_tags = TracerHeaderTags {
lang: "lang",
lang_version: "lang_version",
lang_interpreter: "lang_interpreter",
lang_vendor: "lang_vendor",
tracer_version: "tracer_version",
container_id: "container_id",
client_computed_top_level: true,
client_computed_stats: true,
dropped_p0_traces: 0,
dropped_p0_spans: 0,
};
let payload = SendData::new(
1,
TracerPayloadCollection::V07(Vec::new()),
tracer_header_tags,
&Endpoint::from_slice("localhost"),
);

aggregator.add(payload.clone());
assert_eq!(aggregator.queue.len(), 1);
let batch = aggregator.get_batch();
assert_eq!(batch.len(), 1);
}

#[test]
fn test_get_batch_full_entries() {
let mut aggregator = TraceAggregator::new(2);
let tracer_header_tags = TracerHeaderTags {
lang: "lang",
lang_version: "lang_version",
lang_interpreter: "lang_interpreter",
lang_vendor: "lang_vendor",
tracer_version: "tracer_version",
container_id: "container_id",
client_computed_top_level: true,
client_computed_stats: true,
dropped_p0_traces: 0,
dropped_p0_spans: 0,
};
let payload = SendData::new(
1,
TracerPayloadCollection::V07(Vec::new()),
tracer_header_tags,
&Endpoint::from_slice("localhost"),
);

// Add 3 payloads
aggregator.add(payload.clone());
aggregator.add(payload.clone());
aggregator.add(payload.clone());

// The batch should only contain the first 2 payloads
let first_batch = aggregator.get_batch();
assert_eq!(first_batch.len(), 2);
assert_eq!(aggregator.queue.len(), 1);

// The second batch should only contain the last log
let second_batch = aggregator.get_batch();
assert_eq!(second_batch.len(), 1);
assert_eq!(aggregator.queue.len(), 0);
}
}
Loading
Loading