From 54e80ccef95d63cb0a98f19e03b53ab27bc62824 Mon Sep 17 00:00:00 2001 From: Tianning Li Date: Fri, 22 Aug 2025 12:56:59 -0400 Subject: [PATCH 1/2] https://datadoghq.atlassian.net/browse/SVLS-7461 feat: Add configurable compression level to DogStatsD metrics submission Add compression_level field to FlusherConfig and DdApi to enable configurable zstd compression for metrics payloads. --- Cargo.lock | 1 + crates/datadog-serverless-compat/Cargo.toml | 1 + crates/datadog-serverless-compat/src/main.rs | 3 +++ crates/dogstatsd/src/datadog.rs | 6 +++++- crates/dogstatsd/src/flusher.rs | 5 +++++ crates/dogstatsd/tests/integration_test.rs | 6 ++++++ 6 files changed, 21 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index ed681d9..a592725 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -781,6 +781,7 @@ dependencies = [ "tracing", "tracing-core", "tracing-subscriber", + "zstd", ] [[package]] diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index 1bcab6c..1d2e1fa 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -17,6 +17,7 @@ tokio-util = { version = "0.7", default-features = false } tracing = { version = "0.1", default-features = false } tracing-core = { version = "0.1", default-features = false } tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "registry", "fmt", "env-filter", "tracing-log"] } +zstd = { version = "0.13.3", default-features = false } [[bin]] name = "datadog-serverless-compat" diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 9a05a5a..975ad37 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -15,6 +15,7 @@ use tokio::{ time::{interval, Duration}, }; use tracing_subscriber::EnvFilter; +use zstd::zstd_safe::CompressionLevel; use datadog_trace_agent::{ aggregator::TraceAggregator, @@ -218,6 +219,8 @@ async fn start_dogstatsd( https_proxy, timeout: DOGSTATSD_TIMEOUT_DURATION, retry_strategy: RetryStrategy::LinearBackoff(3, 1), + compression_level: CompressionLevel::try_from(6) + .expect("failed to create compression level"), }); Some(metrics_flusher) } diff --git a/crates/dogstatsd/src/datadog.rs b/crates/dogstatsd/src/datadog.rs index c2f2b06..7e1401e 100644 --- a/crates/dogstatsd/src/datadog.rs +++ b/crates/dogstatsd/src/datadog.rs @@ -19,6 +19,7 @@ use std::sync::OnceLock; use std::time::Duration; use tracing::{debug, error}; use zstd::stream::write::Encoder; +use zstd::zstd_safe::CompressionLevel; // TODO: Move to the more ergonomic LazyLock when MSRV is 1.80 static SITE_RE: OnceLock = OnceLock::new(); @@ -138,6 +139,7 @@ pub struct DdApi { metrics_intake_url_prefix: MetricsIntakeUrlPrefix, client: Option, retry_strategy: RetryStrategy, + compression_level: CompressionLevel, } impl DdApi { @@ -148,6 +150,7 @@ impl DdApi { https_proxy: Option, timeout: Duration, retry_strategy: RetryStrategy, + compression_level: CompressionLevel, ) -> Self { let client = build_client(https_proxy, timeout) .inspect_err(|e| { @@ -159,6 +162,7 @@ impl DdApi { metrics_intake_url_prefix, client, retry_strategy, + compression_level, } } @@ -206,7 +210,7 @@ impl DdApi { let start = std::time::Instant::now(); let result = (|| -> std::io::Result> { - let mut encoder = Encoder::new(Vec::new(), 6)?; + let mut encoder = Encoder::new(Vec::new(), self.compression_level)?; encoder.write_all(&body)?; encoder.finish() })(); diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index c114405..b523a93 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::OnceCell; use tracing::{debug, error}; +use zstd::zstd_safe::CompressionLevel; #[derive(Clone)] pub struct Flusher { @@ -20,6 +21,7 @@ pub struct Flusher { retry_strategy: RetryStrategy, aggregator_handle: AggregatorHandle, dd_api: OnceCell>, + compression_level: CompressionLevel, } pub struct FlusherConfig { @@ -29,6 +31,7 @@ pub struct FlusherConfig { pub https_proxy: Option, pub timeout: Duration, pub retry_strategy: RetryStrategy, + pub compression_level: CompressionLevel, } impl Flusher { @@ -40,6 +43,7 @@ impl Flusher { timeout: config.timeout, retry_strategy: config.retry_strategy, aggregator_handle: config.aggregator_handle, + compression_level: config.compression_level, dd_api: OnceCell::new(), } } @@ -55,6 +59,7 @@ impl Flusher { self.https_proxy.clone(), self.timeout, self.retry_strategy.clone(), + self.compression_level.clone(), )), None => { error!("Failed to create dd_api: failed to get API key"); diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 4d07f45..4be84bf 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -18,6 +18,7 @@ use tokio::{ time::{sleep, timeout, Duration}, }; use tokio_util::sync::CancellationToken; +use zstd::zstd_safe::CompressionLevel; #[cfg(test)] #[tokio::test] @@ -60,6 +61,8 @@ async fn dogstatsd_server_ships_series() { https_proxy: None, timeout: std::time::Duration::from_secs(5), retry_strategy: RetryStrategy::Immediate(3), + compression_level: CompressionLevel::try_from(6) + .expect("failed to create compression level"), }); let server_address = "127.0.0.1:18125"; @@ -137,6 +140,7 @@ async fn test_send_with_retry_immediate_failure() { None, Duration::from_secs(1), retry_strategy.clone(), + 6, ); // Create a series using the Aggregator @@ -192,6 +196,7 @@ async fn test_send_with_retry_linear_backoff_success() { None, Duration::from_secs(1), retry_strategy.clone(), + 6, ); // Create a series using the Aggregator @@ -246,6 +251,7 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() { None, Duration::from_secs(1), retry_strategy.clone(), + 6, ); // Create a series using the Aggregator From 50a8ffcdfce0037377855354694c5d35fe7ebfa9 Mon Sep 17 00:00:00 2001 From: Tianning Li Date: Fri, 22 Aug 2025 12:56:59 -0400 Subject: [PATCH 2/2] https://datadoghq.atlassian.net/browse/SVLS-7461 feat: Add configurable compression level to DogStatsD metrics submission Add compression_level field to FlusherConfig and DdApi to enable configurable zstd compression for metrics payloads. --- Cargo.lock | 1 + crates/datadog-serverless-compat/Cargo.toml | 1 + crates/datadog-serverless-compat/src/main.rs | 3 +++ crates/dogstatsd/src/datadog.rs | 6 +++++- crates/dogstatsd/src/flusher.rs | 5 +++++ crates/dogstatsd/tests/integration_test.rs | 6 ++++++ 6 files changed, 21 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 79fa2f4..e6fde7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -779,6 +779,7 @@ dependencies = [ "tracing", "tracing-core", "tracing-subscriber", + "zstd", ] [[package]] diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index 62f9480..3b53276 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -15,6 +15,7 @@ tokio-util = { version = "0.7", default-features = false } tracing = { version = "0.1", default-features = false } tracing-core = { version = "0.1", default-features = false } tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "registry", "fmt", "env-filter", "tracing-log"] } +zstd = { version = "0.13.3", default-features = false } [[bin]] name = "datadog-serverless-compat" diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 043548a..db895a1 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -14,6 +14,7 @@ use tokio::{ }; use tracing::{debug, error, info}; use tracing_subscriber::EnvFilter; +use zstd::zstd_safe::CompressionLevel; use datadog_trace_agent::{ aggregator::TraceAggregator, @@ -215,6 +216,8 @@ async fn start_dogstatsd( https_proxy, timeout: DOGSTATSD_TIMEOUT_DURATION, retry_strategy: RetryStrategy::LinearBackoff(3, 1), + compression_level: CompressionLevel::try_from(6) + .expect("failed to create compression level"), }); Some(metrics_flusher) } diff --git a/crates/dogstatsd/src/datadog.rs b/crates/dogstatsd/src/datadog.rs index c2f2b06..7e1401e 100644 --- a/crates/dogstatsd/src/datadog.rs +++ b/crates/dogstatsd/src/datadog.rs @@ -19,6 +19,7 @@ use std::sync::OnceLock; use std::time::Duration; use tracing::{debug, error}; use zstd::stream::write::Encoder; +use zstd::zstd_safe::CompressionLevel; // TODO: Move to the more ergonomic LazyLock when MSRV is 1.80 static SITE_RE: OnceLock = OnceLock::new(); @@ -138,6 +139,7 @@ pub struct DdApi { metrics_intake_url_prefix: MetricsIntakeUrlPrefix, client: Option, retry_strategy: RetryStrategy, + compression_level: CompressionLevel, } impl DdApi { @@ -148,6 +150,7 @@ impl DdApi { https_proxy: Option, timeout: Duration, retry_strategy: RetryStrategy, + compression_level: CompressionLevel, ) -> Self { let client = build_client(https_proxy, timeout) .inspect_err(|e| { @@ -159,6 +162,7 @@ impl DdApi { metrics_intake_url_prefix, client, retry_strategy, + compression_level, } } @@ -206,7 +210,7 @@ impl DdApi { let start = std::time::Instant::now(); let result = (|| -> std::io::Result> { - let mut encoder = Encoder::new(Vec::new(), 6)?; + let mut encoder = Encoder::new(Vec::new(), self.compression_level)?; encoder.write_all(&body)?; encoder.finish() })(); diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index c114405..b523a93 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::OnceCell; use tracing::{debug, error}; +use zstd::zstd_safe::CompressionLevel; #[derive(Clone)] pub struct Flusher { @@ -20,6 +21,7 @@ pub struct Flusher { retry_strategy: RetryStrategy, aggregator_handle: AggregatorHandle, dd_api: OnceCell>, + compression_level: CompressionLevel, } pub struct FlusherConfig { @@ -29,6 +31,7 @@ pub struct FlusherConfig { pub https_proxy: Option, pub timeout: Duration, pub retry_strategy: RetryStrategy, + pub compression_level: CompressionLevel, } impl Flusher { @@ -40,6 +43,7 @@ impl Flusher { timeout: config.timeout, retry_strategy: config.retry_strategy, aggregator_handle: config.aggregator_handle, + compression_level: config.compression_level, dd_api: OnceCell::new(), } } @@ -55,6 +59,7 @@ impl Flusher { self.https_proxy.clone(), self.timeout, self.retry_strategy.clone(), + self.compression_level.clone(), )), None => { error!("Failed to create dd_api: failed to get API key"); diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 4d07f45..4be84bf 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -18,6 +18,7 @@ use tokio::{ time::{sleep, timeout, Duration}, }; use tokio_util::sync::CancellationToken; +use zstd::zstd_safe::CompressionLevel; #[cfg(test)] #[tokio::test] @@ -60,6 +61,8 @@ async fn dogstatsd_server_ships_series() { https_proxy: None, timeout: std::time::Duration::from_secs(5), retry_strategy: RetryStrategy::Immediate(3), + compression_level: CompressionLevel::try_from(6) + .expect("failed to create compression level"), }); let server_address = "127.0.0.1:18125"; @@ -137,6 +140,7 @@ async fn test_send_with_retry_immediate_failure() { None, Duration::from_secs(1), retry_strategy.clone(), + 6, ); // Create a series using the Aggregator @@ -192,6 +196,7 @@ async fn test_send_with_retry_linear_backoff_success() { None, Duration::from_secs(1), retry_strategy.clone(), + 6, ); // Create a series using the Aggregator @@ -246,6 +251,7 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() { None, Duration::from_secs(1), retry_strategy.clone(), + 6, ); // Create a series using the Aggregator