Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ use bottlecap::{
},
logger,
logs::{
agent::LogsAgent, aggregator_service::AggregatorService as LogsAggregatorService,
agent::LogsAgent,
aggregator_service::{
AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService,
},
flusher::LogsFlusher,
},
otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent},
Expand All @@ -64,7 +67,10 @@ use bottlecap::{
stats_flusher::{self, StatsFlusher},
stats_generator::StatsGenerator,
stats_processor, trace_agent,
trace_aggregator::{self, SendDataBuilderInfo},
trace_aggregator::SendDataBuilderInfo,
trace_aggregator_service::{
AggregatorHandle as TraceAggregatorHandle, AggregatorService as TraceAggregatorService,
},
trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher},
trace_processor::{self, SendingTraceProcessor},
},
Expand Down Expand Up @@ -391,12 +397,13 @@ async fn extension_loop_active(
.to_string();
let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id);

let (logs_agent_channel, logs_flusher, logs_agent_cancel_token) = start_logs_agent(
config,
Arc::clone(&api_key_factory),
&tags_provider,
event_bus_tx.clone(),
);
let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) =
start_logs_agent(
config,
Arc::clone(&api_key_factory),
&tags_provider,
event_bus_tx.clone(),
);

let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) =
start_dogstatsd(tags_provider.clone(), Arc::clone(&api_key_factory), config).await;
Expand Down Expand Up @@ -435,6 +442,7 @@ async fn extension_loop_active(
proxy_flusher,
trace_agent_shutdown_token,
stats_concentrator,
trace_aggregator_handle,
) = start_trace_agent(
config,
&api_key_factory,
Expand Down Expand Up @@ -734,6 +742,15 @@ async fn extension_loop_active(
true, // force_flush_trace_stats
)
.await;

// Shutdown aggregator services
if let Err(e) = logs_aggregator_handle.shutdown() {
error!("Failed to shutdown logs aggregator: {e}");
}
if let Err(e) = trace_aggregator_handle.shutdown() {
error!("Failed to shutdown trace aggregator: {e}");
}

return Ok(());
}
}
Expand Down Expand Up @@ -948,7 +965,12 @@ fn start_logs_agent(
api_key_factory: Arc<ApiKeyFactory>,
tags_provider: &Arc<TagProvider>,
event_bus: Sender<Event>,
) -> (Sender<TelemetryEvent>, LogsFlusher, CancellationToken) {
) -> (
Sender<TelemetryEvent>,
LogsFlusher,
CancellationToken,
LogsAggregatorHandle,
) {
let (aggregator_service, aggregator_handle) = LogsAggregatorService::default();
// Start service in background
tokio::spawn(async move {
Expand All @@ -970,8 +992,8 @@ fn start_logs_agent(
drop(agent);
});

let flusher = LogsFlusher::new(api_key_factory, aggregator_handle, config.clone());
(tx, flusher, cancel_token)
let flusher = LogsFlusher::new(api_key_factory, aggregator_handle.clone(), config.clone());
(tx, flusher, cancel_token, aggregator_handle)
}

#[allow(clippy::type_complexity)]
Expand All @@ -989,6 +1011,7 @@ fn start_trace_agent(
Arc<ProxyFlusher>,
tokio_util::sync::CancellationToken,
StatsConcentratorHandle,
TraceAggregatorHandle,
) {
// Stats
let (stats_concentrator_service, stats_concentrator_handle) =
Expand All @@ -1006,9 +1029,11 @@ fn start_trace_agent(
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});

// Traces
let trace_aggregator = Arc::new(TokioMutex::new(trace_aggregator::TraceAggregator::default()));
let (trace_aggregator_service, trace_aggregator_handle) = TraceAggregatorService::default();
tokio::spawn(trace_aggregator_service.run());

let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new(
trace_aggregator.clone(),
trace_aggregator_handle.clone(),
config.clone(),
api_key_factory.clone(),
));
Expand Down Expand Up @@ -1037,7 +1062,7 @@ fn start_trace_agent(

let trace_agent = trace_agent::TraceAgent::new(
Arc::clone(config),
trace_aggregator,
trace_aggregator_handle.clone(),
trace_processor.clone(),
stats_aggregator,
stats_processor,
Expand All @@ -1064,6 +1089,7 @@ fn start_trace_agent(
proxy_flusher,
shutdown_token,
stats_concentrator_handle,
trace_aggregator_handle,
)
}

Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod stats_generator;
pub mod stats_processor;
pub mod trace_agent;
pub mod trace_aggregator;
pub mod trace_aggregator_service;
pub mod trace_flusher;
pub mod trace_processor;

Expand Down
10 changes: 6 additions & 4 deletions bottlecap/src/traces/trace_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use crate::{
stats_aggregator,
stats_generator::StatsGenerator,
stats_processor,
trace_aggregator::{self, SendDataBuilderInfo},
trace_aggregator::SendDataBuilderInfo,
trace_aggregator_service::AggregatorHandle,
trace_processor,
},
};
Expand Down Expand Up @@ -123,7 +124,7 @@ impl TraceAgent {
#[allow(clippy::too_many_arguments)]
pub fn new(
config: Arc<config::Config>,
trace_aggregator: Arc<Mutex<trace_aggregator::TraceAggregator>>,
aggregator_handle: AggregatorHandle,
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
stats_aggregator: Arc<Mutex<stats_aggregator::StatsAggregator>>,
stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
Expand All @@ -142,8 +143,9 @@ impl TraceAgent {
// Start the trace aggregator, which receives and buffers trace payloads to be consumed by the trace flusher.
tokio::spawn(async move {
while let Some(tracer_payload_info) = trace_rx.recv().await {
let mut aggregator = trace_aggregator.lock().await;
aggregator.add(tracer_payload_info);
if let Err(e) = aggregator_handle.insert_payload(tracer_payload_info) {
error!("TRACE_AGENT | Failed to insert payload into aggregator: {e}");
}
}
});

Expand Down
155 changes: 155 additions & 0 deletions bottlecap/src/traces/trace_aggregator_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use datadog_trace_utils::send_data::SendDataBuilder;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error};

use crate::traces::trace_aggregator::{
MAX_CONTENT_SIZE_BYTES, SendDataBuilderInfo, TraceAggregator,
};

pub enum AggregatorCommand {
InsertPayload(SendDataBuilderInfo),
GetBatches(oneshot::Sender<Vec<Vec<SendDataBuilder>>>),
Clear,
Shutdown,
}

#[derive(Clone, Debug)]
pub struct AggregatorHandle {
tx: mpsc::UnboundedSender<AggregatorCommand>,
}

impl AggregatorHandle {
pub fn insert_payload(
&self,
payload_info: SendDataBuilderInfo,
) -> Result<(), mpsc::error::SendError<AggregatorCommand>> {
self.tx.send(AggregatorCommand::InsertPayload(payload_info))
}

pub async fn get_batches(&self) -> Result<Vec<Vec<SendDataBuilder>>, String> {
let (response_tx, response_rx) = oneshot::channel();
self.tx
.send(AggregatorCommand::GetBatches(response_tx))
.map_err(|e| format!("Failed to send flush command: {e}"))?;

response_rx
.await
.map_err(|e| format!("Failed to receive flush response: {e}"))
}

pub fn clear(&self) -> Result<(), mpsc::error::SendError<AggregatorCommand>> {
self.tx.send(AggregatorCommand::Clear)
}

pub fn shutdown(&self) -> Result<(), mpsc::error::SendError<AggregatorCommand>> {
self.tx.send(AggregatorCommand::Shutdown)
}
}

pub struct AggregatorService {
aggregator: TraceAggregator,
rx: mpsc::UnboundedReceiver<AggregatorCommand>,
}

impl AggregatorService {
#[must_use]
#[allow(clippy::should_implement_trait)]
pub fn default() -> (Self, AggregatorHandle) {
Self::new(MAX_CONTENT_SIZE_BYTES)
}

#[must_use]
pub fn new(max_content_size_bytes: usize) -> (Self, AggregatorHandle) {
let (tx, rx) = mpsc::unbounded_channel();
let aggregator = TraceAggregator::new(max_content_size_bytes);

let service = Self { aggregator, rx };
let handle = AggregatorHandle { tx };

(service, handle)
}

pub async fn run(mut self) {
debug!("Trace aggregator service started");

while let Some(command) = self.rx.recv().await {
match command {
AggregatorCommand::InsertPayload(payload_info) => {
self.aggregator.add(payload_info);
}
AggregatorCommand::GetBatches(response_tx) => {
let mut batches = Vec::new();
let mut current_batch = self.aggregator.get_batch();
while !current_batch.is_empty() {
batches.push(current_batch);
current_batch = self.aggregator.get_batch();
}
if response_tx.send(batches).is_err() {
error!("Failed to send trace flush response - receiver dropped");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to log the error details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the error here just contains the data that failed to send

Copy link
Contributor

@litianningdatadog litianningdatadog Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of sth like below. It will give us more context in case of errors.

if let Err(e) = response_tx.send(batches) {
   error!("Failed to send trace flush response : {}", e);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this, but here the error is the Vec<Vec<SendDataBuilder>> of data that failed to send and doesn't have additional context I believe

}
AggregatorCommand::Clear => {
self.aggregator.clear();
}
AggregatorCommand::Shutdown => {
debug!("Trace aggregator service shutting down");
break;
}
}
}
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use datadog_trace_utils::{
trace_utils::TracerHeaderTags, tracer_payload::TracerPayloadCollection,
};
use ddcommon::Endpoint;

#[tokio::test]
async fn test_aggregator_service_insert_and_flush() {
let (service, handle) = AggregatorService::default();

let service_handle = tokio::spawn(async move {
service.run().await;
});

let tracer_header_tags = TracerHeaderTags {
lang: "lang",
lang_version: "lang_version",
lang_interpreter: "lang_interpreter",
lang_vendor: "lang_vendor",
tracer_version: "tracer_version",
container_id: "container_id",
client_computed_top_level: true,
client_computed_stats: true,
dropped_p0_traces: 0,
dropped_p0_spans: 0,
};
let size = 1;
let payload = SendDataBuilder::new(
size,
TracerPayloadCollection::V07(Vec::new()),
tracer_header_tags,
&Endpoint::from_slice("localhost"),
);

handle
.insert_payload(SendDataBuilderInfo::new(payload, size))
.unwrap();

let batches = handle.get_batches().await.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].len(), 1);

handle
.shutdown()
.expect("Failed to shutdown aggregator service");
service_handle
.await
.expect("Aggregator service task failed");
}
}
Loading
Loading