From 819f1085faa1bed6e2fa247e620e7ca3ee5d797f Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Fri, 24 Oct 2025 09:48:11 -0400 Subject: [PATCH 1/2] trace aggregator service --- bottlecap/src/bin/bottlecap/main.rs | 11 +- bottlecap/src/traces/mod.rs | 1 + bottlecap/src/traces/trace_agent.rs | 10 +- .../src/traces/trace_aggregator_service.rs | 155 ++++++++++++++++++ bottlecap/src/traces/trace_flusher.rs | 42 ++--- 5 files changed, 187 insertions(+), 32 deletions(-) create mode 100644 bottlecap/src/traces/trace_aggregator_service.rs diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index b71acc6ab..7ffd9564c 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -64,7 +64,8 @@ use bottlecap::{ stats_flusher::{self, StatsFlusher}, stats_generator::StatsGenerator, stats_processor, trace_agent, - trace_aggregator::{self, SendDataBuilderInfo}, + trace_aggregator::SendDataBuilderInfo, + trace_aggregator_service::AggregatorService as TraceAggregatorService, trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher}, trace_processor::{self, SendingTraceProcessor}, }, @@ -1006,9 +1007,11 @@ fn start_trace_agent( let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); // Traces - let trace_aggregator = Arc::new(TokioMutex::new(trace_aggregator::TraceAggregator::default())); + let (trace_aggregator_service, trace_aggregator_handle) = TraceAggregatorService::default(); + tokio::spawn(trace_aggregator_service.run()); + let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new( - trace_aggregator.clone(), + trace_aggregator_handle.clone(), config.clone(), api_key_factory.clone(), )); @@ -1037,7 +1040,7 @@ fn start_trace_agent( let trace_agent = trace_agent::TraceAgent::new( Arc::clone(config), - trace_aggregator, + trace_aggregator_handle, trace_processor.clone(), stats_aggregator, stats_processor, diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 5ee4562b4..aee6fa2b1 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -13,6 +13,7 @@ pub mod stats_generator; pub mod stats_processor; pub mod trace_agent; pub mod trace_aggregator; +pub mod trace_aggregator_service; pub mod trace_flusher; pub mod trace_processor; diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 1ba2685ed..7814f3e49 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -36,7 +36,8 @@ use crate::{ stats_aggregator, stats_generator::StatsGenerator, stats_processor, - trace_aggregator::{self, SendDataBuilderInfo}, + trace_aggregator::SendDataBuilderInfo, + trace_aggregator_service::AggregatorHandle, trace_processor, }, }; @@ -123,7 +124,7 @@ impl TraceAgent { #[allow(clippy::too_many_arguments)] pub fn new( config: Arc, - trace_aggregator: Arc>, + aggregator_handle: AggregatorHandle, trace_processor: Arc, stats_aggregator: Arc>, stats_processor: Arc, @@ -142,8 +143,9 @@ impl TraceAgent { // Start the trace aggregator, which receives and buffers trace payloads to be consumed by the trace flusher. tokio::spawn(async move { while let Some(tracer_payload_info) = trace_rx.recv().await { - let mut aggregator = trace_aggregator.lock().await; - aggregator.add(tracer_payload_info); + if let Err(e) = aggregator_handle.insert_payload(tracer_payload_info) { + error!("TRACE_AGENT | Failed to insert payload into aggregator: {e}"); + } } }); diff --git a/bottlecap/src/traces/trace_aggregator_service.rs b/bottlecap/src/traces/trace_aggregator_service.rs new file mode 100644 index 000000000..ac77a5748 --- /dev/null +++ b/bottlecap/src/traces/trace_aggregator_service.rs @@ -0,0 +1,155 @@ +use datadog_trace_utils::send_data::SendDataBuilder; +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, error}; + +use crate::traces::trace_aggregator::{ + MAX_CONTENT_SIZE_BYTES, SendDataBuilderInfo, TraceAggregator, +}; + +pub enum AggregatorCommand { + InsertPayload(SendDataBuilderInfo), + GetBatches(oneshot::Sender>>), + Clear, + Shutdown, +} + +#[derive(Clone, Debug)] +pub struct AggregatorHandle { + tx: mpsc::UnboundedSender, +} + +impl AggregatorHandle { + pub fn insert_payload( + &self, + payload_info: SendDataBuilderInfo, + ) -> Result<(), mpsc::error::SendError> { + self.tx.send(AggregatorCommand::InsertPayload(payload_info)) + } + + pub async fn get_batches(&self) -> Result>, String> { + let (response_tx, response_rx) = oneshot::channel(); + self.tx + .send(AggregatorCommand::GetBatches(response_tx)) + .map_err(|e| format!("Failed to send flush command: {e}"))?; + + response_rx + .await + .map_err(|e| format!("Failed to receive flush response: {e}")) + } + + pub fn clear(&self) -> Result<(), mpsc::error::SendError> { + self.tx.send(AggregatorCommand::Clear) + } + + pub fn shutdown(&self) -> Result<(), mpsc::error::SendError> { + self.tx.send(AggregatorCommand::Shutdown) + } +} + +pub struct AggregatorService { + aggregator: TraceAggregator, + rx: mpsc::UnboundedReceiver, +} + +impl AggregatorService { + #[must_use] + #[allow(clippy::should_implement_trait)] + pub fn default() -> (Self, AggregatorHandle) { + Self::new(MAX_CONTENT_SIZE_BYTES) + } + + #[must_use] + pub fn new(max_content_size_bytes: usize) -> (Self, AggregatorHandle) { + let (tx, rx) = mpsc::unbounded_channel(); + let aggregator = TraceAggregator::new(max_content_size_bytes); + + let service = Self { aggregator, rx }; + let handle = AggregatorHandle { tx }; + + (service, handle) + } + + pub async fn run(mut self) { + debug!("Trace aggregator service started"); + + while let Some(command) = self.rx.recv().await { + match command { + AggregatorCommand::InsertPayload(payload_info) => { + self.aggregator.add(payload_info); + } + AggregatorCommand::GetBatches(response_tx) => { + let mut batches = Vec::new(); + let mut current_batch = self.aggregator.get_batch(); + while !current_batch.is_empty() { + batches.push(current_batch); + current_batch = self.aggregator.get_batch(); + } + if response_tx.send(batches).is_err() { + error!("Failed to send trace flush response - receiver dropped"); + } + } + AggregatorCommand::Clear => { + self.aggregator.clear(); + } + AggregatorCommand::Shutdown => { + debug!("Trace aggregator service shutting down"); + break; + } + } + } + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use datadog_trace_utils::{ + trace_utils::TracerHeaderTags, tracer_payload::TracerPayloadCollection, + }; + use ddcommon::Endpoint; + + #[tokio::test] + async fn test_aggregator_service_insert_and_flush() { + let (service, handle) = AggregatorService::default(); + + let service_handle = tokio::spawn(async move { + service.run().await; + }); + + let tracer_header_tags = TracerHeaderTags { + lang: "lang", + lang_version: "lang_version", + lang_interpreter: "lang_interpreter", + lang_vendor: "lang_vendor", + tracer_version: "tracer_version", + container_id: "container_id", + client_computed_top_level: true, + client_computed_stats: true, + dropped_p0_traces: 0, + dropped_p0_spans: 0, + }; + let size = 1; + let payload = SendDataBuilder::new( + size, + TracerPayloadCollection::V07(Vec::new()), + tracer_header_tags, + &Endpoint::from_slice("localhost"), + ); + + handle + .insert_payload(SendDataBuilderInfo::new(payload, size)) + .unwrap(); + + let batches = handle.get_batches().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].len(), 1); + + handle + .shutdown() + .expect("Failed to shutdown aggregator service"); + service_handle + .await + .expect("Aggregator service task failed"); + } +} diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 890cd6abe..bb7186d09 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -2,28 +2,26 @@ // SPDX-License-Identifier: Apache-2.0 use async_trait::async_trait; -use ddcommon::Endpoint; -use std::str::FromStr; -use std::sync::Arc; -use tokio::sync::Mutex; -use tokio::task::JoinSet; -use tracing::{debug, error}; - use datadog_trace_utils::{ config_utils::trace_intake_url_prefixed, send_data::SendDataBuilder, trace_utils::{self, SendData}, }; +use ddcommon::Endpoint; use dogstatsd::api_key::ApiKeyFactory; +use std::str::FromStr; +use std::sync::Arc; +use tokio::task::JoinSet; +use tracing::{debug, error}; use crate::config::Config; use crate::lifecycle::invocation::processor::S_TO_MS; -use crate::traces::trace_aggregator::TraceAggregator; +use crate::traces::trace_aggregator_service::AggregatorHandle; #[async_trait] pub trait TraceFlusher { fn new( - aggregator: Arc>, + aggregator_handle: AggregatorHandle, config: Arc, api_key_factory: Arc, ) -> Self @@ -46,7 +44,7 @@ pub trait TraceFlusher { #[derive(Clone)] #[allow(clippy::module_name_repetitions)] pub struct ServerlessTraceFlusher { - pub aggregator: Arc>, + pub aggregator_handle: AggregatorHandle, pub config: Arc, pub api_key_factory: Arc, pub additional_endpoints: Vec, @@ -55,7 +53,7 @@ pub struct ServerlessTraceFlusher { #[async_trait] impl TraceFlusher for ServerlessTraceFlusher { fn new( - aggregator: Arc>, + aggregator_handle: AggregatorHandle, config: Arc, api_key_factory: Arc, ) -> Self { @@ -77,7 +75,7 @@ impl TraceFlusher for ServerlessTraceFlusher { } ServerlessTraceFlusher { - aggregator, + aggregator_handle, config, api_key_factory, additional_endpoints, @@ -89,9 +87,8 @@ impl TraceFlusher for ServerlessTraceFlusher { error!( "TRACES | Failed to resolve API key, dropping aggregated data and skipping flushing." ); - { - let mut guard = self.aggregator.lock().await; - guard.clear(); + if let Err(e) = self.aggregator_handle.clear() { + error!("TRACES | Failed to clear aggregator data: {e}"); } return None; }; @@ -113,16 +110,13 @@ impl TraceFlusher for ServerlessTraceFlusher { } } - let mut all_batches = Vec::new(); - { - let mut guard = self.aggregator.lock().await; - let mut trace_builders = guard.get_batch(); - - while !trace_builders.is_empty() { - all_batches.push(trace_builders); - trace_builders = guard.get_batch(); + let all_batches = match self.aggregator_handle.get_batches().await { + Ok(v) => v, + Err(e) => { + error!("TRACES | Failed to fetch batches from aggregator service: {e}"); + return None; } - } + }; let mut batch_tasks = JoinSet::new(); From ddc7a01f74013eb198ae47a699e442355a65c730 Mon Sep 17 00:00:00 2001 From: shreyamalpani Date: Tue, 28 Oct 2025 10:00:22 -0400 Subject: [PATCH 2/2] shutdown aggregator services --- bottlecap/src/bin/bottlecap/main.rs | 47 +++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 7ffd9564c..c5eb06f63 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -45,7 +45,10 @@ use bottlecap::{ }, logger, logs::{ - agent::LogsAgent, aggregator_service::AggregatorService as LogsAggregatorService, + agent::LogsAgent, + aggregator_service::{ + AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService, + }, flusher::LogsFlusher, }, otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent}, @@ -65,7 +68,9 @@ use bottlecap::{ stats_generator::StatsGenerator, stats_processor, trace_agent, trace_aggregator::SendDataBuilderInfo, - trace_aggregator_service::AggregatorService as TraceAggregatorService, + trace_aggregator_service::{ + AggregatorHandle as TraceAggregatorHandle, AggregatorService as TraceAggregatorService, + }, trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher}, trace_processor::{self, SendingTraceProcessor}, }, @@ -392,12 +397,13 @@ async fn extension_loop_active( .to_string(); let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id); - let (logs_agent_channel, logs_flusher, logs_agent_cancel_token) = start_logs_agent( - config, - Arc::clone(&api_key_factory), - &tags_provider, - event_bus_tx.clone(), - ); + let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) = + start_logs_agent( + config, + Arc::clone(&api_key_factory), + &tags_provider, + event_bus_tx.clone(), + ); let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) = start_dogstatsd(tags_provider.clone(), Arc::clone(&api_key_factory), config).await; @@ -436,6 +442,7 @@ async fn extension_loop_active( proxy_flusher, trace_agent_shutdown_token, stats_concentrator, + trace_aggregator_handle, ) = start_trace_agent( config, &api_key_factory, @@ -735,6 +742,15 @@ async fn extension_loop_active( true, // force_flush_trace_stats ) .await; + + // Shutdown aggregator services + if let Err(e) = logs_aggregator_handle.shutdown() { + error!("Failed to shutdown logs aggregator: {e}"); + } + if let Err(e) = trace_aggregator_handle.shutdown() { + error!("Failed to shutdown trace aggregator: {e}"); + } + return Ok(()); } } @@ -949,7 +965,12 @@ fn start_logs_agent( api_key_factory: Arc, tags_provider: &Arc, event_bus: Sender, -) -> (Sender, LogsFlusher, CancellationToken) { +) -> ( + Sender, + LogsFlusher, + CancellationToken, + LogsAggregatorHandle, +) { let (aggregator_service, aggregator_handle) = LogsAggregatorService::default(); // Start service in background tokio::spawn(async move { @@ -971,8 +992,8 @@ fn start_logs_agent( drop(agent); }); - let flusher = LogsFlusher::new(api_key_factory, aggregator_handle, config.clone()); - (tx, flusher, cancel_token) + let flusher = LogsFlusher::new(api_key_factory, aggregator_handle.clone(), config.clone()); + (tx, flusher, cancel_token, aggregator_handle) } #[allow(clippy::type_complexity)] @@ -990,6 +1011,7 @@ fn start_trace_agent( Arc, tokio_util::sync::CancellationToken, StatsConcentratorHandle, + TraceAggregatorHandle, ) { // Stats let (stats_concentrator_service, stats_concentrator_handle) = @@ -1040,7 +1062,7 @@ fn start_trace_agent( let trace_agent = trace_agent::TraceAgent::new( Arc::clone(config), - trace_aggregator_handle, + trace_aggregator_handle.clone(), trace_processor.clone(), stats_aggregator, stats_processor, @@ -1067,6 +1089,7 @@ fn start_trace_agent( proxy_flusher, shutdown_token, stats_concentrator_handle, + trace_aggregator_handle, ) }