Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/datadog-serverless-compat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ tokio-util = { version = "0.7", default-features = false }
tracing = { version = "0.1", default-features = false }
tracing-core = { version = "0.1", default-features = false }
tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "registry", "fmt", "env-filter", "tracing-log"] }
zstd = { version = "0.13.3", default-features = false }

[[bin]]
name = "datadog-serverless-compat"
2 changes: 2 additions & 0 deletions crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::{
};
use tracing::{debug, error, info};
use tracing_subscriber::EnvFilter;
use zstd::zstd_safe::CompressionLevel;

use datadog_trace_agent::{
aggregator::TraceAggregator,
Expand Down Expand Up @@ -215,6 +216,7 @@ async fn start_dogstatsd(
https_proxy,
timeout: DOGSTATSD_TIMEOUT_DURATION,
retry_strategy: RetryStrategy::LinearBackoff(3, 1),
compression_level: CompressionLevel::try_from(6).unwrap_or_default(),
});
Some(metrics_flusher)
}
Expand Down
6 changes: 5 additions & 1 deletion crates/dogstatsd/src/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::OnceLock;
use std::time::Duration;
use tracing::{debug, error};
use zstd::stream::write::Encoder;
use zstd::zstd_safe::CompressionLevel;

// TODO: Move to the more ergonomic LazyLock when MSRV is 1.80
static SITE_RE: OnceLock<Regex> = OnceLock::new();
Expand Down Expand Up @@ -138,6 +139,7 @@ pub struct DdApi {
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
client: Option<Client>,
retry_strategy: RetryStrategy,
compression_level: CompressionLevel,
}

impl DdApi {
Expand All @@ -148,6 +150,7 @@ impl DdApi {
https_proxy: Option<String>,
timeout: Duration,
retry_strategy: RetryStrategy,
compression_level: CompressionLevel,
) -> Self {
let client = build_client(https_proxy, timeout)
.inspect_err(|e| {
Expand All @@ -159,6 +162,7 @@ impl DdApi {
metrics_intake_url_prefix,
client,
retry_strategy,
compression_level,
}
}

Expand Down Expand Up @@ -206,7 +210,7 @@ impl DdApi {
let start = std::time::Instant::now();

let result = (|| -> std::io::Result<Vec<u8>> {
let mut encoder = Encoder::new(Vec::new(), 6)?;
let mut encoder = Encoder::new(Vec::new(), self.compression_level)?;
encoder.write_all(&body)?;
encoder.finish()
})();
Expand Down
5 changes: 5 additions & 0 deletions crates/dogstatsd/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::OnceCell;
use tracing::{debug, error};
use zstd::zstd_safe::CompressionLevel;

#[derive(Clone)]
pub struct Flusher {
Expand All @@ -20,6 +21,7 @@ pub struct Flusher {
retry_strategy: RetryStrategy,
aggregator_handle: AggregatorHandle,
dd_api: OnceCell<Option<DdApi>>,
compression_level: CompressionLevel,
}

pub struct FlusherConfig {
Expand All @@ -29,6 +31,7 @@ pub struct FlusherConfig {
pub https_proxy: Option<String>,
pub timeout: Duration,
pub retry_strategy: RetryStrategy,
pub compression_level: CompressionLevel,
}

impl Flusher {
Expand All @@ -40,6 +43,7 @@ impl Flusher {
timeout: config.timeout,
retry_strategy: config.retry_strategy,
aggregator_handle: config.aggregator_handle,
compression_level: config.compression_level,
dd_api: OnceCell::new(),
}
}
Expand All @@ -55,6 +59,7 @@ impl Flusher {
self.https_proxy.clone(),
self.timeout,
self.retry_strategy.clone(),
self.compression_level.clone(),
)),
None => {
error!("Failed to create dd_api: failed to get API key");
Expand Down
6 changes: 6 additions & 0 deletions crates/dogstatsd/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::{
time::{sleep, timeout, Duration},
};
use tokio_util::sync::CancellationToken;
use zstd::zstd_safe::CompressionLevel;

#[cfg(test)]
#[tokio::test]
Expand Down Expand Up @@ -60,6 +61,8 @@ async fn dogstatsd_server_ships_series() {
https_proxy: None,
timeout: std::time::Duration::from_secs(5),
retry_strategy: RetryStrategy::Immediate(3),
compression_level: CompressionLevel::try_from(6)
.expect("failed to create compression level"),
});

let server_address = "127.0.0.1:18125";
Expand Down Expand Up @@ -137,6 +140,7 @@ async fn test_send_with_retry_immediate_failure() {
None,
Duration::from_secs(1),
retry_strategy.clone(),
6,
);

// Create a series using the Aggregator
Expand Down Expand Up @@ -192,6 +196,7 @@ async fn test_send_with_retry_linear_backoff_success() {
None,
Duration::from_secs(1),
retry_strategy.clone(),
6,
);

// Create a series using the Aggregator
Expand Down Expand Up @@ -246,6 +251,7 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() {
None,
Duration::from_secs(1),
retry_strategy.clone(),
6,
);

// Create a series using the Aggregator
Expand Down
Loading