Skip to content
30 changes: 25 additions & 5 deletions crates/dogstatsd/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
//! The aggregation of metrics.

use crate::constants;
use crate::datadog::{self, Metric as MetricToShip, Series};
use crate::datadog::{
self, Metadata as MetadataToShip, Metric as MetricToShip, Origin as OriginToShip, Series,
};
use crate::errors;
use crate::metric::{self, Metric, MetricValue, SortedTags};

use datadog_protos::metrics::{Dogsketch, Sketch, SketchPayload};
use datadog_protos::metrics::{Dogsketch, Metadata, Sketch, SketchPayload};
use ddsketch_agent::DDSketch;
use hashbrown::hash_table;
use protobuf::Message;
use protobuf::{Message, MessageField, SpecialFields};
use tracing::{error, warn};
use ustr::Ustr;

Expand Down Expand Up @@ -261,6 +263,14 @@ fn build_sketch(entry: &Metric, mut base_tag_vec: SortedTags) -> Option<Sketch>
base_tag_vec.extend(&tags);
}
sketch.set_tags(base_tag_vec.to_chars());

if let Some(origin) = entry.find_origin(base_tag_vec) {
sketch.set_metadata(Metadata {
origin: MessageField::some(origin),
special_fields: SpecialFields::default(),
});
}

Some(sketch)
}

Expand All @@ -286,12 +296,22 @@ fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option<MetricTo
base_tag_vec.extend(&tags);
}

let origin = entry.find_origin(base_tag_vec.clone());
let metadata = origin.map(|o| MetadataToShip {
origin: Some(OriginToShip {
origin_product: o.origin_product,
origin_sub_product: o.origin_category,
origin_product_detail: o.origin_service,
}),
});

Some(MetricToShip {
metric: entry.name.as_str(),
resources,
kind,
points: [point; 1],
tags: base_tag_vec.to_strings(),
metadata,
})
}

Expand All @@ -308,7 +328,7 @@ pub mod tests {

const PRECISION: f64 = 0.000_000_01;

const SINGLE_METRIC_SIZE: usize = 193; // taken from the test, size of a serialized metric with one tag and 1 digit counter value
const SINGLE_METRIC_SIZE: usize = 209; // taken from the test, size of a serialized metric with one tag and 1 digit counter value
const SINGLE_DISTRIBUTION_SIZE: u64 = 140;
const DEFAULT_TAGS: &str =
"dd_extension_version:63-next,architecture:x86_64,_dd.compute_stats:1";
Expand Down Expand Up @@ -673,7 +693,7 @@ pub mod tests {
fn consume_metrics_batch_bytes() {
let expected_metrics_per_batch = 2;
let total_number_of_metrics = 5;
let two_metrics_size = 374;
let two_metrics_size = 406;
let max_bytes = SINGLE_METRIC_SIZE * expected_metrics_per_batch + 13;
let mut aggregator = Aggregator {
tags: to_sorted_tags(),
Expand Down
14 changes: 14 additions & 0 deletions crates/dogstatsd/src/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,20 @@ pub(crate) struct Metric {
/// The kind of metric
pub(crate) kind: DdMetricKind,
pub(crate) tags: Vec<String>,
/// Optional metadata associated with the metric
pub(crate) metadata: Option<Metadata>,
}

#[derive(Debug, Serialize, Clone)]
pub struct Metadata {
pub(crate) origin: Option<Origin>,
}

#[derive(Debug, Serialize, Clone)]
pub struct Origin {
pub(crate) origin_product: u32,
pub(crate) origin_sub_product: u32,
pub(crate) origin_product_detail: u32,
}

#[derive(Debug, Serialize, Clone)]
Expand Down
1 change: 1 addition & 0 deletions crates/dogstatsd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ pub mod dogstatsd;
pub mod errors;
pub mod flusher;
pub mod metric;
pub mod origin;
21 changes: 21 additions & 0 deletions crates/dogstatsd/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ impl SortedTags {
tags_as_vec
}

pub fn find_all(&self, tag_key: &str) -> Vec<&Ustr> {
self.values
.iter()
.filter_map(|(k, v)| if k == tag_key { Some(v) } else { None })
.collect()
}

pub(crate) fn to_resources(&self) -> Vec<datadog::Resource> {
let mut resources = Vec::with_capacity(constants::MAX_TAGS);
for (key, val) in &self.values {
Expand Down Expand Up @@ -586,6 +593,7 @@ mod tests {
}

#[test]
#[cfg_attr(miri, ignore)]
fn parse_tag_no_value() {
let result = parse("datadog.tracer.flush_triggered:1|c|#lang:go,lang_version:go1.22.10,_dd.origin:lambda,runtime-id:d66f501c-d09b-4d0d-970f-515235c4eb56,v1.65.1,service:aws.lambda,reason:scheduled");
assert!(result.is_ok());
Expand All @@ -599,6 +607,7 @@ mod tests {
}

#[test]
#[cfg_attr(miri, ignore)]
fn parse_tag_multi_column() {
let result = parse("datadog.tracer.flush_triggered:1|c|#lang:go:and:something:else");
assert!(result.is_ok());
Expand All @@ -609,6 +618,7 @@ mod tests {
}

#[test]
#[cfg_attr(miri, ignore)]
fn parse_tracer_metric() {
let input = "datadog.tracer.flush_duration:0.785551|ms|#lang:go,lang_version:go1.23.2,env:redacted_env,_dd.origin:lambda,runtime-id:redacted_runtime,tracer_version:v1.70.1,service:redacted_service,env:redacted_env,service:redacted_service,version:redacted_version";
let expected_error = "ms".to_string();
Expand All @@ -620,6 +630,7 @@ mod tests {
}

#[test]
#[cfg_attr(miri, ignore)]
fn parse_metric_timestamp() {
// Important to test that we round down to the nearest 10 seconds
// for our buckets
Expand All @@ -636,6 +647,7 @@ mod tests {
}

#[test]
#[cfg_attr(miri, ignore)]
fn parse_metric_no_timestamp() {
// *wince* this could be a race condition
// we round the timestamp down to a 10s bucket and I want to test now
Expand Down Expand Up @@ -663,4 +675,13 @@ mod tests {
assert_eq!(first_element.0, Ustr::from("a"));
assert_eq!(first_element.1, Ustr::from("a1"));
}

#[test]
fn sorted_tags_find_all() {
let tags = SortedTags::parse("a,a:1,b:2,c:3").unwrap();
assert_eq!(tags.find_all("a"), vec![&Ustr::from(""), &Ustr::from("1")]);
assert_eq!(tags.find_all("b"), vec![&Ustr::from("2")]);
assert_eq!(tags.find_all("c"), vec![&Ustr::from("3")]);
assert_eq!(tags.find_all("d"), Vec::<&Ustr>::new());
}
}
Loading
Loading