diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 05693d4..fb2418f 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -35,6 +35,7 @@ bitflags,https://github.com/bitflags/bitflags,MIT OR Apache-2.0,The Rust Project block-buffer,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers blocking,https://github.com/smol-rs/blocking,Apache-2.0 OR MIT,Stjepan Glavina bollard,https://github.com/fussybeaver/bollard,Apache-2.0,Bollard contributors +bollard-stubs,https://github.com/fussybeaver/bollard,Apache-2.0,Bollard contributors bumpalo,https://github.com/fitzgen/bumpalo,MIT OR Apache-2.0,Nick Fitzgerald byteorder,https://github.com/BurntSushi/byteorder,Unlicense OR MIT,Andrew Gallant bytes,https://github.com/tokio-rs/bytes,MIT,"Carl Lerche , Sean McArthur " @@ -50,12 +51,15 @@ concurrent-queue,https://github.com/smol-rs/concurrent-queue,Apache-2.0 OR MIT," const_format,https://github.com/rodrimati1992/const_format_crates,Zlib,rodrimati1992 const_format_proc_macros,https://github.com/rodrimati1992/const_format_crates,Zlib,rodrimati1992 core-foundation,https://github.com/servo/core-foundation-rs,MIT OR Apache-2.0,The Servo Project Developers +core-foundation-sys,https://github.com/servo/core-foundation-rs,MIT OR Apache-2.0,The Servo Project Developers cpufeatures,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers crc32fast,https://github.com/srijs/rust-crc32fast,MIT OR Apache-2.0,"Sam Rijs , Alex Crichton " crossbeam-utils,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-utils Authors crunchy,https://github.com/eira-fransham/crunchy,MIT,Eira Fransham crypto-common,https://github.com/RustCrypto/traits,MIT OR Apache-2.0,RustCrypto Developers darling,https://github.com/TedDriggs/darling,MIT,Ted Driggs +darling_core,https://github.com/TedDriggs/darling,MIT,Ted Driggs +darling_macro,https://github.com/TedDriggs/darling,MIT,Ted Driggs dashmap,https://github.com/xacrimon/dashmap,MIT,Acrimon data-encoding,https://github.com/ia0/data-encoding,MIT,Julien Cretin datadog-protos,https://github.com/DataDog/saluki,Apache-2.0,The datadog-protos Authors @@ -67,6 +71,7 @@ ddcommon,https://github.com/DataDog/libdatadog,Apache-2.0,The ddcommon Authors ddsketch-agent,https://github.com/DataDog/saluki,Apache-2.0,The ddsketch-agent Authors deranged,https://github.com/jhpratt/deranged,MIT OR Apache-2.0,Jacob Pratt derive_more,https://github.com/JelteF/derive_more,MIT,Jelte Fennema +derive_more-impl,https://github.com/JelteF/derive_more,MIT,Jelte Fennema digest,https://github.com/RustCrypto/traits,MIT OR Apache-2.0,RustCrypto Developers dirs,https://github.com/soc/dirs-rs,MIT OR Apache-2.0,Simon Ochsenreither dirs-next,https://github.com/xdg-rs/dirs,MIT OR Apache-2.0,The @xdg-rs members @@ -89,6 +94,7 @@ fixedbitset,https://github.com/petgraph/fixedbitset,MIT OR Apache-2.0,bluss flate2,https://github.com/rust-lang/flate2-rs,MIT OR Apache-2.0,"Alex Crichton , Josh Triplett " float-cmp,https://github.com/mikedilger/float-cmp,MIT,Mike Dilger fnv,https://github.com/servo/rust-fnv,Apache-2.0 OR MIT,Alex Crichton +form_urlencoded,https://github.com/servo/rust-url,MIT OR Apache-2.0,The rust-url developers futures,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures Authors futures-channel,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures-channel Authors futures-core,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures-core Authors @@ -107,6 +113,7 @@ gloo-timers,https://github.com/rustwasm/gloo/tree/master/crates/timers,MIT OR Ap h2,https://github.com/hyperium/h2,MIT,"Carl Lerche , Sean McArthur " hashbrown,https://github.com/rust-lang/hashbrown,MIT OR Apache-2.0,Amanieu d'Antras headers,https://github.com/hyperium/headers,MIT,Sean McArthur +headers-core,https://github.com/hyperium/headers,MIT,Sean McArthur heck,https://github.com/withoutboats/heck,MIT OR Apache-2.0,The heck Authors heck,https://github.com/withoutboats/heck,MIT OR Apache-2.0,Without Boats hermit-abi,https://github.com/hermit-os/hermit-rs,MIT OR Apache-2.0,Stefan Lankes @@ -117,6 +124,7 @@ home,https://github.com/rust-lang/cargo,MIT OR Apache-2.0,Brian Anderson , Carl Lerche , Sean McArthur " http-body,https://github.com/hyperium/http-body,MIT,"Carl Lerche , Lucio Franco , Sean McArthur " +http-body-util,https://github.com/hyperium/http-body,MIT,"Carl Lerche , Lucio Franco , Sean McArthur " httparse,https://github.com/seanmonstar/httparse,MIT OR Apache-2.0,Sean McArthur httpdate,https://github.com/pyfisch/httpdate,MIT OR Apache-2.0,Pyfisch httpmock,https://github.com/alexliesenfeld/httpmock,MIT,Alexander Liesenfeld @@ -140,6 +148,7 @@ icu_properties_data,https://github.com/unicode-org/icu4x,Unicode-3.0,The ICU4X P icu_provider,https://github.com/unicode-org/icu4x,Unicode-3.0,The ICU4X Project Developers icu_provider_macros,https://github.com/unicode-org/icu4x,Unicode-3.0,The ICU4X Project Developers ident_case,https://github.com/TedDriggs/ident_case,MIT OR Apache-2.0,Ted Driggs +idna,https://github.com/servo/rust-url,MIT OR Apache-2.0,The rust-url developers idna_adapter,https://github.com/hsivonen/idna_adapter,Apache-2.0 OR MIT,The rust-url developers indexmap,https://github.com/bluss/indexmap,Apache-2.0 OR MIT,The indexmap Authors indexmap,https://github.com/indexmap-rs/indexmap,Apache-2.0 OR MIT,The indexmap Authors @@ -161,6 +170,7 @@ libredox,https://gitlab.redox-os.org/redox-os/libredox,MIT,4lDO2 <4lDO2@protonma linked-hash-map,https://github.com/contain-rs/linked-hash-map,MIT OR Apache-2.0,"Stepan Koltsov , Andrew Paseltiner " linux-raw-sys,https://github.com/sunfishcode/linux-raw-sys,Apache-2.0 WITH LLVM-exception OR Apache-2.0 OR MIT,Dan Gohman litemap,https://github.com/unicode-org/icu4x,Unicode-3.0,The ICU4X Project Developers +lock_api,https://github.com/Amanieu/parking_lot,MIT OR Apache-2.0,Amanieu d'Antras log,https://github.com/rust-lang/log,MIT OR Apache-2.0,The Rust Project Developers lru-cache,https://github.com/contain-rs/lru-cache,MIT OR Apache-2.0,Stepan Koltsov matchers,https://github.com/hawkw/matchers,MIT,Eliza Weisman @@ -184,8 +194,11 @@ ordered-float,https://github.com/reem/rust-ordered-float,MIT,"Jonathan Reem parking,https://github.com/smol-rs/parking,Apache-2.0 OR MIT,"Stjepan Glavina , The Rust Project Developers" parking_lot,https://github.com/Amanieu/parking_lot,MIT OR Apache-2.0,Amanieu d'Antras +parking_lot_core,https://github.com/Amanieu/parking_lot,MIT OR Apache-2.0,Amanieu d'Antras parse-display,https://github.com/frozenlib/parse-display,MIT OR Apache-2.0,frozenlib +parse-display-derive,https://github.com/frozenlib/parse-display,MIT OR Apache-2.0,frozenlib paste,https://github.com/dtolnay/paste,MIT OR Apache-2.0,David Tolnay +percent-encoding,https://github.com/servo/rust-url,MIT OR Apache-2.0,The rust-url developers petgraph,https://github.com/petgraph/petgraph,MIT OR Apache-2.0,"bluss, mitchmindtree" phf_shared,https://github.com/rust-phf/rust-phf,MIT,Steven Fackler pico-args,https://github.com/RazrFalcon/pico-args,MIT,Yevhenii Reizner @@ -200,10 +213,15 @@ ppv-lite86,https://github.com/cryptocorrosion/cryptocorrosion,MIT OR Apache-2.0, precomputed-hash,https://github.com/emilio/precomputed-hash,MIT,Emilio Cobos Álvarez prettyplease,https://github.com/dtolnay/prettyplease,MIT OR Apache-2.0,David Tolnay proc-macro-error,https://gitlab.com/CreepySkeleton/proc-macro-error,MIT OR Apache-2.0,CreepySkeleton +proc-macro-error-attr,https://gitlab.com/CreepySkeleton/proc-macro-error,MIT OR Apache-2.0,CreepySkeleton proc-macro2,https://github.com/dtolnay/proc-macro2,MIT OR Apache-2.0,"David Tolnay , Alex Crichton " prost,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco , Casper Meijn , Tokio Contributors " +prost-build,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco , Casper Meijn , Tokio Contributors " +prost-derive,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco , Casper Meijn , Tokio Contributors " +prost-types,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco , Casper Meijn , Tokio Contributors " protobuf,https://github.com/stepancheg/rust-protobuf,MIT,Stepan Koltsov protobuf-parse,https://github.com/stepancheg/rust-protobuf/tree/master/protobuf-parse,MIT,Stepan Koltsov +protobuf-support,https://github.com/stepancheg/rust-protobuf,MIT,Stepan Koltsov quick-error,http://github.com/tailhook/quick-error,MIT OR Apache-2.0,"Paul Colomiets , Colin Kiegel " quinn,https://github.com/quinn-rs/quinn,MIT OR Apache-2.0,The quinn Authors quinn-proto,https://github.com/quinn-rs/quinn,MIT OR Apache-2.0,The quinn-proto Authors @@ -212,6 +230,7 @@ quote,https://github.com/dtolnay/quote,MIT OR Apache-2.0,David Tolnay redox_users,https://gitlab.redox-os.org/redox-os/users,MIT,"Jose Narvaez , Wesley Hershberger " @@ -242,9 +261,11 @@ same-file,https://github.com/BurntSushi/same-file,Unlicense OR MIT,Andrew Gallan schannel,https://github.com/steffengy/schannel-rs,MIT,"Steven Fackler , Steffen Butzer " scopeguard,https://github.com/bluss/scopeguard,MIT OR Apache-2.0,bluss security-framework,https://github.com/kornelski/rust-security-framework,MIT OR Apache-2.0,"Steven Fackler , Kornel " +security-framework-sys,https://github.com/kornelski/rust-security-framework,MIT OR Apache-2.0,"Steven Fackler , Kornel " semver,https://github.com/dtolnay/semver,MIT OR Apache-2.0,David Tolnay serde,https://github.com/serde-rs/serde,MIT OR Apache-2.0,"Erick Tryzelaar , David Tolnay " serde_bytes,https://github.com/serde-rs/bytes,MIT OR Apache-2.0,David Tolnay +serde_derive,https://github.com/serde-rs/serde,MIT OR Apache-2.0,"Erick Tryzelaar , David Tolnay " serde_json,https://github.com/serde-rs/json,MIT OR Apache-2.0,"Erick Tryzelaar , David Tolnay " serde_regex,https://github.com/tailhook/serde-regex,MIT OR Apache-2.0,paul@colomiets.name serde_repr,https://github.com/dtolnay/serde-repr,MIT OR Apache-2.0,David Tolnay @@ -266,6 +287,7 @@ static_assertions,https://github.com/nvzqz/static-assertions-rs,MIT OR Apache-2. string_cache,https://github.com/servo/string-cache,MIT OR Apache-2.0,The Servo Project Developers strsim,https://github.com/rapidfuzz/strsim-rs,MIT,"Danny Guo , maxbachmann " structmeta,https://github.com/frozenlib/structmeta,MIT OR Apache-2.0,frozenlib +structmeta-derive,https://github.com/frozenlib/structmeta,MIT OR Apache-2.0,frozenlib subtle,https://github.com/dalek-cryptography/subtle,BSD-3-Clause,"Isis Lovecruft , Henry de Valence " syn,https://github.com/dtolnay/syn,MIT OR Apache-2.0,David Tolnay sync_wrapper,https://github.com/Actyx/sync_wrapper,Apache-2.0,Actyx AG @@ -275,17 +297,25 @@ term,https://github.com/Stebalien/term,MIT OR Apache-2.0,"The Rust Project Devel termcolor,https://github.com/BurntSushi/termcolor,Unlicense OR MIT,Andrew Gallant testcontainers,https://github.com/testcontainers/testcontainers-rs,MIT OR Apache-2.0,"Thomas Eizinger, Artem Medvedev , Mervyn McCreight" thiserror,https://github.com/dtolnay/thiserror,MIT OR Apache-2.0,David Tolnay +thiserror-impl,https://github.com/dtolnay/thiserror,MIT OR Apache-2.0,David Tolnay thread_local,https://github.com/Amanieu/thread_local-rs,MIT OR Apache-2.0,Amanieu d'Antras time,https://github.com/time-rs/time,MIT OR Apache-2.0,"Jacob Pratt , Time contributors" +time-core,https://github.com/time-rs/time,MIT OR Apache-2.0,"Jacob Pratt , Time contributors" +time-macros,https://github.com/time-rs/time,MIT OR Apache-2.0,"Jacob Pratt , Time contributors" tiny-keccak,https://github.com/debris/tiny-keccak,CC0-1.0,debris tinybytes,https://github.com/DataDog/libdatadog,Apache-2.0,The tinybytes Authors tinystr,https://github.com/unicode-org/icu4x,Unicode-3.0,The ICU4X Project Developers tinyvec,https://github.com/Lokathor/tinyvec,Zlib OR Apache-2.0 OR MIT,Lokathor tinyvec_macros,https://github.com/Soveu/tinyvec_macros,MIT OR Apache-2.0 OR Zlib,Soveu tokio,https://github.com/tokio-rs/tokio,MIT,Tokio Contributors +tokio-macros,https://github.com/tokio-rs/tokio,MIT,Tokio Contributors tokio-rustls,https://github.com/rustls/tokio-rustls,MIT OR Apache-2.0,The tokio-rustls Authors +tokio-stream,https://github.com/tokio-rs/tokio,MIT,Tokio Contributors +tokio-util,https://github.com/tokio-rs/tokio,MIT,Tokio Contributors tonic,https://github.com/hyperium/tonic,MIT,Lucio Franco tower,https://github.com/tower-rs/tower,MIT,Tower Maintainers +tower-layer,https://github.com/tower-rs/tower,MIT,Tower Maintainers +tower-service,https://github.com/tower-rs/tower,MIT,Tower Maintainers tracing,https://github.com/tokio-rs/tracing,MIT,"Eliza Weisman , Tokio Contributors " tracing-attributes,https://github.com/tokio-rs/tracing,MIT,"Tokio Contributors , Eliza Weisman , David Barsky " tracing-core,https://github.com/tokio-rs/tracing,MIT,Tokio Contributors @@ -322,7 +352,9 @@ webpki-roots,https://github.com/rustls/webpki-roots,MPL-2.0,The webpki-roots Aut which,https://github.com/harryfei/which-rs,MIT,Harry Fei widestring,https://github.com/starkat99/widestring-rs,MIT OR Apache-2.0,The widestring Authors winapi,https://github.com/retep998/winapi-rs,MIT OR Apache-2.0,Peter Atashian +winapi-i686-pc-windows-gnu,https://github.com/retep998/winapi-rs,MIT OR Apache-2.0,Peter Atashian winapi-util,https://github.com/BurntSushi/winapi-util,Unlicense OR MIT,Andrew Gallant +winapi-x86_64-pc-windows-gnu,https://github.com/retep998/winapi-rs,MIT OR Apache-2.0,Peter Atashian windows-core,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft windows-implement,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft windows-interface,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft @@ -348,6 +380,8 @@ yoke,https://github.com/unicode-org/icu4x,Unicode-3.0,Manish Goregaokar zerocopy,https://github.com/google/zerocopy,BSD-2-Clause OR Apache-2.0 OR MIT,Joshua Liebow-Feeser zerocopy,https://github.com/google/zerocopy,BSD-2-Clause OR Apache-2.0 OR MIT,"Joshua Liebow-Feeser , Jack Wrenn " +zerocopy-derive,https://github.com/google/zerocopy,BSD-2-Clause OR Apache-2.0 OR MIT,Joshua Liebow-Feeser +zerocopy-derive,https://github.com/google/zerocopy,BSD-2-Clause OR Apache-2.0 OR MIT,"Joshua Liebow-Feeser , Jack Wrenn " zerofrom,https://github.com/unicode-org/icu4x,Unicode-3.0,Manish Goregaokar zerofrom-derive,https://github.com/unicode-org/icu4x,Unicode-3.0,Manish Goregaokar zeroize,https://github.com/RustCrypto/utils/tree/master/zeroize,Apache-2.0 OR MIT,The RustCrypto Project Developers diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 24f7f5f..9a05a5a 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -9,7 +9,7 @@ use env_logger::Builder; use log::{debug, error, info}; -use std::{env, str::FromStr, sync::Arc, sync::Mutex}; +use std::{env, str::FromStr, sync::Arc}; use tokio::{ sync::Mutex as TokioMutex, time::{interval, Duration}, @@ -26,7 +26,7 @@ use datadog_trace_agent::{ use datadog_trace_utils::{config_utils::read_cloud_env, trace_utils::EnvironmentType}; use dogstatsd::{ - aggregator::Aggregator as MetricsAggregator, + aggregator_service::{AggregatorHandle, AggregatorService}, api_key::ApiKeyFactory, constants::CONTEXTS, datadog::{MetricsIntakeUrlPrefix, RetryStrategy, Site}, @@ -138,9 +138,9 @@ pub async fn main() { } }); - let mut metrics_flusher = if dd_use_dogstatsd { + let (mut metrics_flusher, _aggregator_handle) = if dd_use_dogstatsd { debug!("Starting dogstatsd"); - let (_, metrics_flusher) = start_dogstatsd( + let (_, metrics_flusher, aggregator_handle) = start_dogstatsd( dd_dogstatsd_port, dd_api_key, dd_site, @@ -149,10 +149,10 @@ pub async fn main() { ) .await; info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); - metrics_flusher + (metrics_flusher, Some(aggregator_handle)) } else { info!("dogstatsd disabled"); - None + (None, None) }; let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); @@ -174,24 +174,28 @@ async fn start_dogstatsd( dd_site: String, https_proxy: Option, dogstatsd_tags: &str, -) -> (CancellationToken, Option) { +) -> (CancellationToken, Option, AggregatorHandle) { + // 1. Create the aggregator service #[allow(clippy::expect_used)] - let metrics_aggr = Arc::new(Mutex::new( - MetricsAggregator::new( - SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS), - CONTEXTS, - ) - .expect("Failed to create metrics aggregator"), - )); + let (service, handle) = AggregatorService::new( + SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS), + CONTEXTS, + ) + .expect("Failed to create aggregator service"); + + // 2. Start the aggregator service in the background + tokio::spawn(service.run()); let dogstatsd_config = DogStatsDConfig { host: AGENT_HOST.to_string(), port, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); + + // 3. Use handle in DogStatsD (cheap to clone) let dogstatsd_client = DogStatsD::new( &dogstatsd_config, - Arc::clone(&metrics_aggr), + handle.clone(), dogstatsd_cancel_token.clone(), ) .await; @@ -205,7 +209,7 @@ async fn start_dogstatsd( #[allow(clippy::expect_used)] let metrics_flusher = Flusher::new(FlusherConfig { api_key_factory: Arc::new(ApiKeyFactory::new(&dd_api_key)), - aggregator: Arc::clone(&metrics_aggr), + aggregator_handle: handle.clone(), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( Some(Site::new(dd_site).expect("Failed to parse site")), None, @@ -223,5 +227,5 @@ async fn start_dogstatsd( } }; - (dogstatsd_cancel_token, metrics_flusher) + (dogstatsd_cancel_token, metrics_flusher, handle) } diff --git a/crates/dogstatsd/src/aggregator_service.rs b/crates/dogstatsd/src/aggregator_service.rs new file mode 100644 index 0000000..0422252 --- /dev/null +++ b/crates/dogstatsd/src/aggregator_service.rs @@ -0,0 +1,177 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::aggregator::Aggregator; +use crate::datadog::Series; +use crate::metric::{Metric, SortedTags}; +use datadog_protos::metrics::SketchPayload; +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, error, warn}; + +#[derive(Debug)] +pub enum AggregatorCommand { + InsertBatch(Vec), + Flush(oneshot::Sender), + Shutdown, +} + +#[derive(Debug)] +pub struct FlushResponse { + pub series: Vec, + pub distributions: Vec, +} + +#[derive(Clone)] +pub struct AggregatorHandle { + tx: mpsc::UnboundedSender, +} + +impl AggregatorHandle { + pub fn insert_batch( + &self, + metrics: Vec, + ) -> Result<(), mpsc::error::SendError> { + self.tx.send(AggregatorCommand::InsertBatch(metrics)) + } + + pub async fn flush(&self) -> Result { + let (response_tx, response_rx) = oneshot::channel(); + self.tx + .send(AggregatorCommand::Flush(response_tx)) + .map_err(|e| format!("Failed to send flush command: {}", e))?; + + response_rx + .await + .map_err(|e| format!("Failed to receive flush response: {}", e)) + } + + pub fn shutdown(&self) -> Result<(), mpsc::error::SendError> { + self.tx.send(AggregatorCommand::Shutdown) + } +} + +pub struct AggregatorService { + aggregator: Aggregator, + rx: mpsc::UnboundedReceiver, +} + +impl AggregatorService { + pub fn new( + tags: SortedTags, + max_context: usize, + ) -> Result<(Self, AggregatorHandle), crate::errors::Creation> { + let (tx, rx) = mpsc::unbounded_channel(); + let aggregator = Aggregator::new(tags, max_context)?; + + let service = Self { aggregator, rx }; + + let handle = AggregatorHandle { tx }; + + Ok((service, handle)) + } + + pub async fn run(mut self) { + debug!("Aggregator service started"); + + while let Some(command) = self.rx.recv().await { + match command { + AggregatorCommand::InsertBatch(metrics) => { + let mut insert_errors = 0; + for metric in metrics { + // The only possible error here is an overflow + if let Err(_e) = self.aggregator.insert(metric) { + insert_errors += 1; + } + } + if insert_errors > 0 { + warn!("Total of {} metrics failed to insert", insert_errors); + } + } + + AggregatorCommand::Flush(response_tx) => { + let series = self.aggregator.consume_metrics(); + let distributions = self.aggregator.consume_distributions(); + + let response = FlushResponse { + series, + distributions, + }; + + if let Err(_) = response_tx.send(response) { + error!("Failed to send flush response - receiver dropped"); + } + } + + AggregatorCommand::Shutdown => { + debug!("Aggregator service shutting down"); + break; + } + } + } + + debug!("Aggregator service stopped"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metric::{parse, EMPTY_TAGS}; + + #[tokio::test] + async fn test_aggregator_service_basic_flow() { + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1000).expect("Failed to create aggregator service"); + + // Start the service in a background task + let service_task = tokio::spawn(service.run()); + + // Insert some metrics + let metrics = vec![ + parse("test:1|c|#k:v").expect("metric parse failed"), + parse("foo:2|c|#k:v").expect("metric parse failed"), + ]; + + handle + .insert_batch(metrics) + .expect("Failed to insert metrics"); + + // Flush and check results + let response = handle.flush().await.expect("Failed to flush"); + assert_eq!(response.series.len(), 1); + assert_eq!(response.series[0].series.len(), 2); + + // Shutdown the service + handle.shutdown().expect("Failed to shutdown"); + service_task.await.expect("Service task failed"); + } + + #[tokio::test] + async fn test_aggregator_service_distributions() { + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1000).expect("Failed to create aggregator service"); + + // Start the service in a background task + let service_task = tokio::spawn(service.run()); + + // Insert distribution metrics + let metrics = vec![ + parse("dist1:100|d|#k:v").expect("metric parse failed"), + parse("dist2:200|d|#k:v").expect("metric parse failed"), + ]; + + handle + .insert_batch(metrics) + .expect("Failed to insert metrics"); + + // Flush and check results + let response = handle.flush().await.expect("Failed to flush"); + assert_eq!(response.distributions.len(), 1); + assert_eq!(response.distributions[0].sketches.len(), 2); + assert_eq!(response.series.len(), 0); + + // Shutdown the service + handle.shutdown().expect("Failed to shutdown"); + service_task.await.expect("Service task failed"); + } +} diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index 3e129b2..f16ff0b 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -3,16 +3,15 @@ use std::net::SocketAddr; use std::str::Split; -use std::sync::{Arc, Mutex}; -use crate::aggregator::Aggregator; +use crate::aggregator_service::AggregatorHandle; use crate::errors::ParseError::UnsupportedType; use crate::metric::{parse, Metric}; use tracing::{debug, error}; pub struct DogStatsD { cancel_token: tokio_util::sync::CancellationToken, - aggregator: Arc>, + aggregator_handle: AggregatorHandle, buffer_reader: BufferReader, } @@ -52,7 +51,7 @@ impl DogStatsD { #[must_use] pub async fn new( config: &DogStatsDConfig, - aggregator: Arc>, + aggregator_handle: AggregatorHandle, cancel_token: tokio_util::sync::CancellationToken, ) -> DogStatsD { let addr = format!("{}:{}", config.host, config.port); @@ -64,7 +63,7 @@ impl DogStatsD { .expect("couldn't bind to address"); DogStatsD { cancel_token, - aggregator, + aggregator_handle, buffer_reader: BufferReader::UdpSocketReader(socket), } } @@ -119,10 +118,9 @@ impl DogStatsD { }) .collect(); if !all_valid_metrics.is_empty() { - #[allow(clippy::expect_used)] - let mut guarded_aggregator = self.aggregator.lock().expect("lock poisoned"); - for a_valid_value in all_valid_metrics { - let _ = guarded_aggregator.insert(a_valid_value); + // Send metrics through the channel - no lock needed! + if let Err(e) = self.aggregator_handle.insert_batch(all_valid_metrics) { + error!("Failed to send metrics to aggregator: {}", e); } } } @@ -131,62 +129,38 @@ impl DogStatsD { #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { - use crate::aggregator::tests::assert_sketch; - use crate::aggregator::tests::assert_value; - use crate::aggregator::Aggregator; + use crate::aggregator_service::AggregatorService; use crate::dogstatsd::{BufferReader, DogStatsD}; use crate::metric::EMPTY_TAGS; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use std::sync::{Arc, Mutex}; use tracing_test::traced_test; #[tokio::test] async fn test_dogstatsd_multi_distribution() { - let locked_aggregator = setup_dogstatsd( + let response = setup_and_consume_dogstatsd( "single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d|T1656581409 single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d|T1656581409 single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d|T1656581409 ", ) .await; - let aggregator = locked_aggregator.lock().expect("lock poisoned"); - - let parsed_metrics = aggregator.distributions_to_protobuf(); - - assert_eq!(parsed_metrics.sketches.len(), 3); - assert_eq!(aggregator.to_series().len(), 0); - drop(aggregator); - - assert_sketch( - &locked_aggregator, - "single_machine_performance.rouster.api.series_v2.payload_size_bytes", - 269_942_f64, - 1656581400, - ); - assert_sketch( - &locked_aggregator, - "single_machine_performance.rouster.metrics_min_timestamp_latency", - 1_426.908_702_16, - 1656581400, - ); - assert_sketch( - &locked_aggregator, - "single_machine_performance.rouster.metrics_max_timestamp_latency", - 1_376.908_702_16, - 1656581400, - ); + + assert_eq!(response.distributions.len(), 1); + assert_eq!(response.distributions[0].sketches.len(), 3); + assert_eq!(response.series.len(), 0); } #[tokio::test] async fn test_dogstatsd_multi_metric() { - let mut now = std::time::UNIX_EPOCH + let mut now: i64 = std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") .as_secs() .try_into() .unwrap_or_default(); now = (now / 10) * 10; - let locked_aggregator = setup_dogstatsd( + + let response = setup_and_consume_dogstatsd( format!( "metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2|T{:}\n", now @@ -194,69 +168,56 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d .as_str(), ) .await; - let aggregator = locked_aggregator.lock().expect("lock poisoned"); - - let parsed_metrics = aggregator.to_series(); - - assert_eq!(parsed_metrics.len(), 3); - assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); - drop(aggregator); - - assert_value(&locked_aggregator, "metric1", 1.0, "", now); - assert_value(&locked_aggregator, "metric2", 2.0, "tag2:val2", now); - assert_value( - &locked_aggregator, - "metric3", - 3.0, - "tag3:val3,tag4:val4", - now, - ); + + assert_eq!(response.series.len(), 1); + assert_eq!(response.series[0].series.len(), 3); + assert_eq!(response.distributions.len(), 0); } #[tokio::test] async fn test_dogstatsd_single_metric() { - let locked_aggregator = setup_dogstatsd("metric123:99123|c|T1656581409").await; - let aggregator = locked_aggregator.lock().expect("lock poisoned"); - let parsed_metrics = aggregator.to_series(); - - assert_eq!(parsed_metrics.len(), 1); - assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); - drop(aggregator); + let response = setup_and_consume_dogstatsd("metric123:99123|c|T1656581409").await; - assert_value(&locked_aggregator, "metric123", 99_123.0, "", 1656581400); + assert_eq!(response.series.len(), 1); + assert_eq!(response.series[0].series.len(), 1); + assert_eq!(response.distributions.len(), 0); } #[tokio::test] #[traced_test] async fn test_dogstatsd_filter_service_check() { - let locked_aggregator = setup_dogstatsd("_sc|servicecheck|0").await; - let aggregator = locked_aggregator.lock().expect("lock poisoned"); - let parsed_metrics = aggregator.to_series(); + let response = setup_and_consume_dogstatsd("_sc|servicecheck|0").await; assert!(!logs_contain("Failed to parse metric")); - assert_eq!(parsed_metrics.len(), 0); + assert_eq!(response.series.len(), 0); + assert_eq!(response.distributions.len(), 0); } #[tokio::test] #[traced_test] async fn test_dogstatsd_filter_event() { - let locked_aggregator = setup_dogstatsd("_e{5,10}:event|test event").await; - let aggregator = locked_aggregator.lock().expect("lock poisoned"); - let parsed_metrics = aggregator.to_series(); + let response = setup_and_consume_dogstatsd("_e{5,10}:event|test event").await; assert!(!logs_contain("Failed to parse metric")); - assert_eq!(parsed_metrics.len(), 0); + assert_eq!(response.series.len(), 0); + assert_eq!(response.distributions.len(), 0); } - async fn setup_dogstatsd(statsd_string: &str) -> Arc> { - let aggregator_arc = Arc::new(Mutex::new( - Aggregator::new(EMPTY_TAGS, 1_024).expect("aggregator creation failed"), - )); + async fn setup_and_consume_dogstatsd( + statsd_string: &str, + ) -> crate::aggregator_service::FlushResponse { + // Create the aggregator service + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1_024).expect("aggregator service creation failed"); + + // Start the service in a background task + let service_task = tokio::spawn(service.run()); + let cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd = DogStatsD { cancel_token, - aggregator: Arc::clone(&aggregator_arc), + aggregator_handle: handle.clone(), buffer_reader: BufferReader::MirrorReader( statsd_string.as_bytes().to_vec(), SocketAddr::new(IpAddr::V4(Ipv4Addr::new(111, 112, 113, 114)), 0), @@ -264,6 +225,13 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d }; dogstatsd.consume_statsd().await; - aggregator_arc + // Get the metrics via flush + let response = handle.flush().await.expect("Failed to flush"); + + // Shutdown the service + handle.shutdown().expect("Failed to shutdown"); + service_task.await.expect("Service task failed"); + + response } } diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index a66d843..c114405 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -1,11 +1,11 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::aggregator::Aggregator; +use crate::aggregator_service::AggregatorHandle; use crate::api_key::ApiKeyFactory; use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy}; use reqwest::{Response, StatusCode}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; use tokio::sync::OnceCell; use tracing::{debug, error}; @@ -18,20 +18,19 @@ pub struct Flusher { https_proxy: Option, timeout: Duration, retry_strategy: RetryStrategy, - aggregator: Arc>, + aggregator_handle: AggregatorHandle, dd_api: OnceCell>, } pub struct FlusherConfig { pub api_key_factory: Arc, - pub aggregator: Arc>, + pub aggregator_handle: AggregatorHandle, pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix, pub https_proxy: Option, pub timeout: Duration, pub retry_strategy: RetryStrategy, } -#[allow(clippy::await_holding_lock)] impl Flusher { pub fn new(config: FlusherConfig) -> Self { Flusher { @@ -40,7 +39,7 @@ impl Flusher { https_proxy: config.https_proxy, timeout: config.timeout, retry_strategy: config.retry_strategy, - aggregator: config.aggregator, + aggregator_handle: config.aggregator_handle, dd_api: OnceCell::new(), } } @@ -73,15 +72,17 @@ impl Flusher { Vec, Vec, )> { - let (series, distributions) = { - #[allow(clippy::expect_used)] - let mut aggregator = self.aggregator.lock().expect("lock poisoned"); - ( - aggregator.consume_metrics(), - aggregator.consume_distributions(), - ) + // Request flush through the channel - no lock needed! + let response = match self.aggregator_handle.flush().await { + Ok(response) => response, + Err(e) => { + error!("Failed to flush aggregator: {}", e); + return Some((Vec::new(), Vec::new())); + } }; - self.flush_metrics(series, distributions).await + + self.flush_metrics(response.series, response.distributions) + .await } /// Flush given batch of metrics diff --git a/crates/dogstatsd/src/lib.rs b/crates/dogstatsd/src/lib.rs index 59cbd5b..e30bb86 100644 --- a/crates/dogstatsd/src/lib.rs +++ b/crates/dogstatsd/src/lib.rs @@ -8,6 +8,7 @@ #![cfg_attr(not(test), deny(clippy::unimplemented))] pub mod aggregator; +pub mod aggregator_service; pub mod api_key; pub mod constants; pub mod datadog; diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 10fbc78..4d07f45 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -4,6 +4,7 @@ use dogstatsd::metric::SortedTags; use dogstatsd::{ aggregator::Aggregator as MetricsAggregator, + aggregator_service::{AggregatorHandle, AggregatorService}, api_key::ApiKeyFactory, constants::CONTEXTS, datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride}, @@ -11,7 +12,7 @@ use dogstatsd::{ flusher::{Flusher, FlusherConfig}, }; use mockito::Server; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use tokio::{ net::UdpSocket, time::{sleep, timeout, Duration}, @@ -33,18 +34,21 @@ async fn dogstatsd_server_ships_series() { .create_async() .await; - let metrics_aggr = Arc::new(Mutex::new( - MetricsAggregator::new(SortedTags::parse("sometkey:somevalue").unwrap(), CONTEXTS) - .expect("failed to create aggregator"), - )); + // Create the aggregator service + let (service, handle) = + AggregatorService::new(SortedTags::parse("sometkey:somevalue").unwrap(), CONTEXTS) + .expect("failed to create aggregator service"); - let _ = start_dogstatsd(&metrics_aggr).await; + // Start the service in a background task + tokio::spawn(service.run()); + + let _ = start_dogstatsd(handle.clone()).await; let api_key_factory = ApiKeyFactory::new("mock-api-key"); let mut metrics_flusher = Flusher::new(FlusherConfig { api_key_factory: Arc::new(api_key_factory), - aggregator: Arc::clone(&metrics_aggr), + aggregator_handle: handle.clone(), metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new( None, MetricsIntakeUrlPrefixOverride::maybe_new( @@ -84,7 +88,7 @@ async fn dogstatsd_server_ships_series() { } } -async fn start_dogstatsd(metrics_aggr: &Arc>) -> CancellationToken { +async fn start_dogstatsd(aggregator_handle: AggregatorHandle) -> CancellationToken { let dogstatsd_config = DogStatsDConfig { host: "127.0.0.1".to_string(), port: 18125, @@ -92,7 +96,7 @@ async fn start_dogstatsd(metrics_aggr: &Arc>) -> Cancel let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd_client = DogStatsD::new( &dogstatsd_config, - Arc::clone(metrics_aggr), + aggregator_handle, dogstatsd_cancel_token.clone(), ) .await;