Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5b68c4d
wip: tracing
astuyve Jun 17, 2024
c532836
feat: tracing WIP
astuyve Jun 18, 2024
8b54c2b
feat: rename mini agent to trace agent
astuyve Jun 18, 2024
c58c786
feat: fmt
astuyve Jun 18, 2024
7a98a1f
feat: Fix formatting after rename
astuyve Jun 18, 2024
a44be55
fix: remove extra tokio task
astuyve Jun 18, 2024
442039d
feat: allow tracing
astuyve Jun 20, 2024
8a02ce8
feat: working v5 traces
astuyve Jun 25, 2024
d029b50
feat: Update to use my branch of libdatadog so we have v5 support
astuyve Jun 25, 2024
3f99b22
Merge branch 'main' into aj/add-trace-agent
astuyve Jun 25, 2024
e95c8eb
feat: Update w/ libdatadog to pass trace encoding version
astuyve Jun 26, 2024
dcb19eb
feat: update w/ merged libdatadog changes
astuyve Jun 27, 2024
d53eb85
feat: Refactor trace agent, reduce code duplication, enum for trace v…
astuyve Jun 27, 2024
7171b61
feat: Unify config, remove trace config. Tests pass
astuyve Jun 27, 2024
ed76cf3
feat: fmt
astuyve Jun 27, 2024
e435e89
feat: fmt
astuyve Jun 27, 2024
2ce64fd
clippy fixes
astuyve Jun 27, 2024
c06c5a2
parse time
astuyve Jun 27, 2024
aeb64cb
feat: clippy again
astuyve Jun 27, 2024
e90cc9f
feat: revert dockerfile
astuyve Jun 28, 2024
63cfecc
feat: no-default-features
astuyve Jun 28, 2024
165c798
feat: Remove utils, take only what we need
astuyve Jun 28, 2024
d49402a
feat: fmt moves the import
astuyve Jun 28, 2024
86836e0
feat: replace info with debug. Replace log with tracing lib
astuyve Jun 28, 2024
bf040c3
feat: more debug
astuyve Jun 28, 2024
83da528
feat: Remove call to trace utils
astuyve Jun 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
791 changes: 661 additions & 130 deletions bottlecap/Cargo.lock

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,21 @@ edition = "2021"
publish = false

[dependencies]
async-trait = { version = "0.1.64", default-features = false }
anyhow = { version = "1.0", default-features = false }
chrono = { version = "0.4.38", features = ["serde", "std", "now"], default-features = false}
datadog-protos = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/" }
ddsketch-agent = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/" }
ddcommon = { version = "10.0", git = "https://github.com/DataDog/libdatadog" }
datadog-trace-protobuf = { version = "10.0.0", git = "https://github.com/DataDog/libdatadog" }
datadog-trace-utils = { version = "10.0.0", git= "https://github.com/DataDog/libdatadog" }
datadog-trace-mini-agent = { version = "0.4.2", git= "https://github.com/DataDog/libdatadog" }
datadog-trace-normalization = { version = "10.0.0", git= "https://github.com/DataDog/libdatadog" }
datadog-trace-obfuscation = { version = "10.0.0", git= "https://github.com/DataDog/libdatadog" }
figment = { version = "0.10.15", default-features = false, features = ["yaml", "env"] }
fnv = { version = "1.0.7", default-features = false }
hashbrown = { version = "0.14.3", default-features = false, features = ["inline-more"] }
hyper = { version = "0.14", default-features = false, features = ["server"] }
log = { version = "0.4.21", default-features = false }
protobuf = { version = "3.4.0", default-features = false }
regex = { version = "1.10.4", default-features = false }
Expand All @@ -29,6 +38,9 @@ hmac = { version = "0.12.1", default-features = false }
sha2 = { version = "0.10.8", default-features = false }
hex = { version = "0.4.3", default-features = false, features = ["std"] }
base64 = { version = "0.22.0", default-features = false }
rmp-serde = { version = "1.3.0", default-features = false }
rmpv = { version = "1.3.0", default-features = false }
rmp = { version = "0.8.14", default-features = false }

[dev-dependencies]
figment = { version = "0.10.15", default-features = false, features = ["yaml", "env", "test"] }
Expand Down
87 changes: 69 additions & 18 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,6 @@
#![deny(missing_copy_implementations)]
#![deny(missing_debug_implementations)]

use decrypt::resolve_secrets;
use std::{
collections::hash_map,
collections::HashMap,
env,
io::Error,
io::Result,
os::unix::process::CommandExt,
path::Path,
process::Command,
sync::{Arc, Mutex},
};
use telemetry::listener::TelemetryListenerConfig;
use tracing::{debug, error, info};
use tracing_subscriber::EnvFilter;

use bottlecap::{
base_url,
config::{self, AwsConfig, Config},
Expand Down Expand Up @@ -53,10 +37,33 @@ use bottlecap::{
events::{Status, TelemetryRecord},
listener::TelemetryListener,
},
traces::{
stats_flusher::{self, StatsFlusher},
stats_processor, trace_agent,
trace_flusher::{self, TraceFlusher},
trace_processor,
},
DOGSTATSD_PORT, EXTENSION_ACCEPT_FEATURE_HEADER, EXTENSION_FEATURES, EXTENSION_HOST,
EXTENSION_ID_HEADER, EXTENSION_NAME, EXTENSION_NAME_HEADER, EXTENSION_ROUTE,
LAMBDA_RUNTIME_SLUG, TELEMETRY_PORT,
};
use datadog_trace_obfuscation::obfuscation_config;
use decrypt::resolve_secrets;
use std::{
collections::hash_map,
collections::HashMap,
env,
io::Error,
io::Result,
os::unix::process::CommandExt,
path::Path,
process::Command,
sync::{Arc, Mutex},
};
use telemetry::listener::TelemetryListenerConfig;
use tokio::sync::Mutex as TokioMutex;
use tracing::{debug, error, info};
use tracing_subscriber::EnvFilter;

use reqwest::Client;
use serde::Deserialize;
Expand Down Expand Up @@ -261,6 +268,40 @@ async fn extension_loop_active(
Arc::clone(&metrics_aggr),
config.site.clone(),
);

let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher {
buffer: Arc::new(TokioMutex::new(Vec::new())),
});
let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
obfuscation_config: Arc::new(
obfuscation_config::ObfuscationConfig::new()
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?,
),
});

let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher {
buffer: Arc::new(TokioMutex::new(Vec::new())),
config: Arc::clone(config),
});
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});

let trace_flusher_clone = trace_flusher.clone();
let stats_flusher_clone = stats_flusher.clone();

let trace_agent = Box::new(trace_agent::TraceAgent {
config: Arc::clone(config),
trace_processor,
trace_flusher: trace_flusher_clone,
stats_processor,
stats_flusher: stats_flusher_clone,
tags_provider,
});
tokio::spawn(async move {
let res = trace_agent.start_trace_agent().await;
if let Err(e) = res {
error!("Error starting trace agent: {e:?}");
}
});
let lambda_enhanced_metrics = enhanced_metrics::new(Arc::clone(&metrics_aggr));
let dogstatsd_cancel_token = start_dogstatsd(event_bus.get_sender_copy(), &metrics_aggr).await;

Expand Down Expand Up @@ -364,7 +405,12 @@ async fn extension_loop_active(
// pass the invocation deadline to
// flush tasks here, so they can
// retry if we have more time
tokio::join!(logs_flusher.flush(), metrics_flusher.flush());
tokio::join!(
logs_flusher.flush(),
metrics_flusher.flush(),
trace_flusher.manual_flush(),
stats_flusher.manual_flush()
);
break;
}
TelemetryRecord::PlatformReport {
Expand Down Expand Up @@ -410,7 +456,12 @@ async fn extension_loop_active(
if shutdown {
dogstatsd_cancel_token.cancel();
telemetry_listener_cancel_token.cancel();
tokio::join!(logs_flusher.flush(), metrics_flusher.flush());
tokio::join!(
logs_flusher.flush(),
metrics_flusher.flush(),
trace_flusher.manual_flush(),
stats_flusher.manual_flush()
);
return Ok(());
}
}
Expand Down
14 changes: 14 additions & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::config::processing_rule::{deserialize_processing_rules, ProcessingRul
#[derive(Debug, PartialEq, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
#[serde(default)]
#[allow(clippy::struct_excessive_bools)]
pub struct Config {
pub site: String,
pub api_key: String,
Expand All @@ -33,6 +34,13 @@ pub struct Config {
pub apm_enabled: bool,
pub lambda_handler: String,
pub serverless_flush_strategy: FlushStrategy,
pub trace_enabled: bool,
pub serverless_trace_enabled: bool,
pub capture_lambda_payload: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was gonna say should we remove this? Then I remembered that we are currently targeting Node + Python

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah need all these

// Deprecated or ignored, just here so we don't failover
pub flush_to_log: bool,
pub logs_injection: bool,
pub merge_xray_traces: bool,
}

impl Default for Config {
Expand All @@ -57,6 +65,12 @@ impl Default for Config {
// APM
apm_enabled: false,
lambda_handler: String::default(),
serverless_trace_enabled: true,
trace_enabled: true,
capture_lambda_payload: false,
flush_to_log: false,
logs_injection: false,
merge_xray_traces: false,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod metrics;
pub mod secrets;
pub mod tags;
pub mod telemetry;
pub mod traces;

use std::{env, io};

Expand Down
2 changes: 1 addition & 1 deletion bottlecap/src/secrets/decrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::config::{AwsConfig, Config};
use base64::prelude::*;
use chrono::{DateTime, Utc};
use hmac::{Hmac, Mac};
use log::error;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::Client;
use serde_json::Value;
Expand All @@ -11,6 +10,7 @@ use std::io::Error;
use std::sync::Arc;
use std::time::Instant;
use tracing::debug;
use tracing::error;

pub async fn resolve_secrets(config: Arc<Config>, aws_config: &AwsConfig) -> Option<String> {
if !config.api_key.is_empty() {
Expand Down
5 changes: 5 additions & 0 deletions bottlecap/src/tags/lambda/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ impl Lambda {
pub fn get_function_arn(&self) -> Option<&String> {
self.tags_map.get(FUNCTION_ARN_KEY)
}

#[must_use]
pub fn get_tags_map(&self) -> &hash_map::HashMap<String, String> {
&self.tags_map
}
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions bottlecap/src/tags/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,17 @@ impl Provider {
pub fn get_canonical_id(&self) -> Option<String> {
self.tag_provider.get_canonical_id()
}

#[must_use]
pub fn get_tags_map(&self) -> &hash_map::HashMap<String, String> {
self.tag_provider.get_tags_map()
}
}

trait GetTags {
fn get_tags_vec(&self) -> Vec<String>;
fn get_canonical_id(&self) -> Option<String>;
fn get_tags_map(&self) -> &hash_map::HashMap<String, String>;
}

impl GetTags for TagProvider {
Expand All @@ -65,6 +71,12 @@ impl GetTags for TagProvider {
TagProvider::Lambda(lambda_tags) => lambda_tags.get_function_arn().cloned(),
}
}

fn get_tags_map(&self) -> &hash_map::HashMap<String, String> {
match self {
TagProvider::Lambda(lambda_tags) => lambda_tags.get_tags_map(),
}
}
}

#[cfg(test)]
Expand Down
8 changes: 8 additions & 0 deletions bottlecap/src/traces/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

pub mod stats_flusher;
pub mod stats_processor;
pub mod trace_agent;
pub mod trace_flusher;
pub mod trace_processor;
92 changes: 92 additions & 0 deletions bottlecap/src/traces/stats_flusher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::{mpsc::Receiver, Mutex};
use tracing::{debug, error};

use crate::config;
use datadog_trace_protobuf::pb;
use datadog_trace_utils::config_utils::trace_stats_url;
use datadog_trace_utils::stats_utils;
use ddcommon::Endpoint;

#[async_trait]
pub trait StatsFlusher {
/// Starts a stats flusher that listens for stats payloads sent to the tokio mpsc Receiver,
/// implementing flushing logic that calls flush_stats.
async fn start_stats_flusher(&self, mut rx: Receiver<pb::ClientStatsPayload>);
/// Flushes stats to the Datadog trace stats intake.
async fn flush_stats(&self, traces: Vec<pb::ClientStatsPayload>);

async fn manual_flush(&self);
}

#[allow(clippy::module_name_repetitions)]
#[derive(Clone)]
pub struct ServerlessStatsFlusher {
pub buffer: Arc<Mutex<Vec<pb::ClientStatsPayload>>>,
pub config: Arc<config::Config>,
}

#[async_trait]
impl StatsFlusher for ServerlessStatsFlusher {
async fn start_stats_flusher(&self, mut rx: Receiver<pb::ClientStatsPayload>) {
let buffer_producer = self.buffer.clone();

tokio::spawn(async move {
while let Some(stats_payload) = rx.recv().await {
let mut buffer = buffer_producer.lock().await;
buffer.push(stats_payload);
}
});
}

async fn manual_flush(&self) {
let mut buffer = self.buffer.lock().await;
if !buffer.is_empty() {
self.flush_stats(buffer.to_vec()).await;
buffer.clear();
}
}
async fn flush_stats(&self, stats: Vec<pb::ClientStatsPayload>) {
if stats.is_empty() {
return;
}
debug!("Flushing {} stats", stats.len());

let stats_payload = stats_utils::construct_stats_payload(stats);

debug!("Stats payload to be sent: {stats_payload:?}");

let serialized_stats_payload = match stats_utils::serialize_stats_payload(stats_payload) {
Ok(res) => res,
Err(err) => {
error!("Failed to serialize stats payload, dropping stats: {err}");
return;
}
};

let stats_url = trace_stats_url(&self.config.site);

let endpoint = Endpoint {
url: hyper::Uri::from_str(&stats_url).expect("can't make URI from stats url, exiting"),
api_key: Some(self.config.api_key.clone().into()),
};

match stats_utils::send_stats_payload(
serialized_stats_payload,
&endpoint,
&self.config.api_key,
)
.await
{
Ok(()) => debug!("Successfully flushed stats"),
Err(e) => {
error!("Error sending stats: {e:?}");
}
}
}
}
Loading