diff --git a/Cargo.lock b/Cargo.lock index e5769b0..a08a686 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -741,6 +741,23 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "datadog-serverless-compat" +version = "0.1.0" +dependencies = [ + "datadog-trace-protobuf", + "datadog-trace-utils", + "dogstatsd", + "env_logger", + "log", + "tokio", + "tokio-util", + "trace-agent", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "datadog-trace-normalization" version = "17.0.0" @@ -1040,6 +1057,19 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "env_logger" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -1383,6 +1413,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" +[[package]] +name = "hermit-abi" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbd780fe5cc30f81464441920d82ac8740e2e46b29a6fad543ddd075229ce37e" + [[package]] name = "hex" version = "0.4.3" @@ -1550,6 +1586,12 @@ dependencies = [ "url", ] +[[package]] +name = "humantime" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" + [[package]] name = "hyper" version = "0.14.32" @@ -1891,6 +1933,17 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" +[[package]] +name = "is-terminal" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" +dependencies = [ + "hermit-abi 0.5.0", + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "itertools" version = "0.10.5" @@ -2426,7 +2479,7 @@ checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f" dependencies = [ "cfg-if", "concurrent-queue", - "hermit-abi", + "hermit-abi 0.4.0", "pin-project-lite", "rustix 0.38.44", "tracing", @@ -3487,6 +3540,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "testcontainers" version = "0.22.0" diff --git a/Cargo.toml b/Cargo.toml index f084484..cb438b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,3 +12,9 @@ edition = "2021" license = "Apache-2.0" homepage = "https://github.com/DataDog/serverless-components" repository = "https://github.com/DataDog/serverless-components" + +[profile.release] +opt-level = "z" +lto = true +codegen-units = 1 +strip = true diff --git a/crates/serverless-compat/Cargo.toml b/crates/serverless-compat/Cargo.toml new file mode 100644 index 0000000..16398ee --- /dev/null +++ b/crates/serverless-compat/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "datadog-serverless-compat" +version = "0.1.0" +edition.workspace = true +license.workspace = true +description = "Binary to run trace-agent and dogstatsd servers in Serverless environments" + +[dependencies] +log = "0.4" +env_logger = "0.10.0" +trace-agent = { path = "../trace-agent" } +datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog/", rev = "3dab0bed2e144ce78c10a2378d1aff8fb5974f7d" } +datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog/", rev = "3dab0bed2e144ce78c10a2378d1aff8fb5974f7d" } +dogstatsd = { path = "../dogstatsd", default-features = false } +tokio = { version = "1", features = ["macros", "rt-multi-thread"]} +tokio-util = { version = "0.7", default-features = false } +tracing = { version = "0.1", default-features = false } +tracing-core = { version = "0.1", default-features = false } +tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "registry", "fmt", "env-filter", "tracing-log"] } + +[[bin]] +name = "datadog-serverless-compat" diff --git a/crates/serverless-compat/src/main.rs b/crates/serverless-compat/src/main.rs new file mode 100644 index 0000000..7f1cce0 --- /dev/null +++ b/crates/serverless-compat/src/main.rs @@ -0,0 +1,226 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(not(test), deny(clippy::panic))] +#![cfg_attr(not(test), deny(clippy::unwrap_used))] +#![cfg_attr(not(test), deny(clippy::expect_used))] +#![cfg_attr(not(test), deny(clippy::todo))] +#![cfg_attr(not(test), deny(clippy::unimplemented))] + +use env_logger::Builder; +use log::{debug, error, info}; +use std::{env, str::FromStr, sync::Arc, sync::Mutex}; +use tokio::{ + sync::Mutex as TokioMutex, + time::{interval, Duration}, +}; +use tracing_subscriber::EnvFilter; + +use trace_agent::{ + aggregator::TraceAggregator, + config, env_verifier, mini_agent, stats_flusher, stats_processor, + trace_flusher::{self, TraceFlusher}, + trace_processor, +}; + +use datadog_trace_utils::{config_utils::read_cloud_env, trace_utils::EnvironmentType}; + +use dogstatsd::{ + aggregator::Aggregator as MetricsAggregator, + constants::CONTEXTS, + datadog::{MetricsIntakeUrlPrefix, RetryStrategy, Site}, + dogstatsd::{DogStatsD, DogStatsDConfig}, + flusher::{Flusher, FlusherConfig}, +}; + +use dogstatsd::metric::{SortedTags, EMPTY_TAGS}; +use tokio_util::sync::CancellationToken; + +const DOGSTATSD_FLUSH_INTERVAL: u64 = 10; +const DOGSTATSD_TIMEOUT_DURATION: Duration = Duration::from_secs(5); +const DEFAULT_DOGSTATSD_PORT: u16 = 8125; +const AGENT_HOST: &str = "0.0.0.0"; + +#[tokio::main] +pub async fn main() { + let log_level = env::var("DD_LOG_LEVEL") + .map(|val| val.to_lowercase()) + .unwrap_or("info".to_string()); + let level_filter = log::LevelFilter::from_str(&log_level).unwrap_or(log::LevelFilter::Info); + Builder::new().filter_level(level_filter).init(); + + let (_, env_type) = match read_cloud_env() { + Some(value) => value, + None => { + error!("Unable to identify environment. Shutting down Mini Agent."); + return; + } + }; + + let dogstatsd_tags = match env_type { + EnvironmentType::CloudFunction => "origin:cloudfunction,dd.origin:cloudfunction", + EnvironmentType::AzureFunction => "origin:azurefunction,dd.origin:azurefunction", + EnvironmentType::AzureSpringApp => "origin:azurespringapp,dd.origin:azurespringapp", + EnvironmentType::LambdaFunction => "origin:lambda,dd.origin:lambda", // historical reasons + }; + + let dd_api_key: Option = env::var("DD_API_KEY").ok(); + let dd_dogstatsd_port: u16 = env::var("DD_DOGSTATSD_PORT") + .ok() + .and_then(|port| port.parse::().ok()) + .unwrap_or(DEFAULT_DOGSTATSD_PORT); + let dd_site = env::var("DD_SITE").unwrap_or_else(|_| "datadoghq.com".to_string()); + let dd_use_dogstatsd = env::var("DD_USE_DOGSTATSD") + .map(|val| val.to_lowercase() != "false") + .unwrap_or(true); + + let https_proxy = env::var("DD_PROXY_HTTPS") + .or_else(|_| env::var("HTTPS_PROXY")) + .ok(); + debug!("Starting serverless trace mini agent"); + + let env_filter = format!("h2=off,hyper=off,rustls=off,{}", log_level); + + #[allow(clippy::expect_used)] + let subscriber = tracing_subscriber::fmt::Subscriber::builder() + .with_env_filter( + EnvFilter::try_new(env_filter).expect("could not parse log level in configuration"), + ) + .with_level(true) + .with_thread_names(false) + .with_thread_ids(false) + .with_line_number(false) + .with_file(false) + .with_target(false) + .without_time() + .finish(); + + #[allow(clippy::expect_used)] + tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + + debug!("Logging subsystem enabled"); + + let env_verifier = Arc::new(env_verifier::ServerlessEnvVerifier::default()); + + let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {}); + + let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher {}); + let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {}); + + let config = match config::Config::new() { + Ok(c) => Arc::new(c), + Err(e) => { + error!("Error creating config on serverless trace mini agent startup: {e}"); + return; + } + }; + + let trace_aggregator = Arc::new(TokioMutex::new(TraceAggregator::default())); + let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new( + trace_aggregator, + Arc::clone(&config), + )); + + let mini_agent = Box::new(mini_agent::MiniAgent { + config: Arc::clone(&config), + env_verifier, + trace_processor, + trace_flusher, + stats_processor, + stats_flusher, + }); + + tokio::spawn(async move { + let res = mini_agent.start_mini_agent().await; + if let Err(e) = res { + error!("Error when starting serverless trace mini agent: {e:?}"); + } + }); + + let mut metrics_flusher = if dd_use_dogstatsd { + debug!("Starting dogstatsd"); + let (_, metrics_flusher) = start_dogstatsd( + dd_dogstatsd_port, + dd_api_key, + dd_site, + https_proxy, + dogstatsd_tags, + ) + .await; + info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + metrics_flusher + } else { + info!("dogstatsd disabled"); + None + }; + + let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); + flush_interval.tick().await; // discard first tick, which is instantaneous + + loop { + flush_interval.tick().await; + + if let Some(metrics_flusher) = metrics_flusher.as_mut() { + debug!("Flushing dogstatsd metrics"); + metrics_flusher.flush().await; + } + } +} + +async fn start_dogstatsd( + port: u16, + dd_api_key: Option, + dd_site: String, + https_proxy: Option, + dogstatsd_tags: &str, +) -> (CancellationToken, Option) { + #[allow(clippy::expect_used)] + let metrics_aggr = Arc::new(Mutex::new( + MetricsAggregator::new( + SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS), + CONTEXTS, + ) + .expect("Failed to create metrics aggregator"), + )); + + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + }; + let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); + let dogstatsd_client = DogStatsD::new( + &dogstatsd_config, + Arc::clone(&metrics_aggr), + dogstatsd_cancel_token.clone(), + ) + .await; + + tokio::spawn(async move { + dogstatsd_client.spin().await; + }); + + let metrics_flusher = match dd_api_key { + Some(dd_api_key) => { + #[allow(clippy::expect_used)] + let metrics_flusher = Flusher::new(FlusherConfig { + api_key: dd_api_key, + aggregator: Arc::clone(&metrics_aggr), + metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( + Some(Site::new(dd_site).expect("Failed to parse site")), + None, + ) + .expect("Failed to create intake URL prefix"), + https_proxy, + timeout: DOGSTATSD_TIMEOUT_DURATION, + retry_strategy: RetryStrategy::LinearBackoff(3, 1), + }); + Some(metrics_flusher) + } + None => { + error!("DD_API_KEY not set, won't flush metrics"); + None + } + }; + + (dogstatsd_cancel_token, metrics_flusher) +}