From 8347d19c6eae95138924e0966011309ad36be0f1 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 26 Mar 2025 14:47:31 -0400 Subject: [PATCH 1/8] add serverless metric origins to dogstatsd --- crates/dogstatsd/src/aggregator.rs | 31 +++- crates/dogstatsd/src/datadog.rs | 14 ++ crates/dogstatsd/src/lib.rs | 1 + crates/dogstatsd/src/metric.rs | 21 +++ crates/dogstatsd/src/origin.rs | 246 +++++++++++++++++++++++++++++ 5 files changed, 308 insertions(+), 5 deletions(-) create mode 100644 crates/dogstatsd/src/origin.rs diff --git a/crates/dogstatsd/src/aggregator.rs b/crates/dogstatsd/src/aggregator.rs index f31a3b0..67b9dae 100644 --- a/crates/dogstatsd/src/aggregator.rs +++ b/crates/dogstatsd/src/aggregator.rs @@ -4,14 +4,17 @@ //! The aggregation of metrics. use crate::constants; -use crate::datadog::{self, Metric as MetricToShip, Series}; +use crate::datadog::{ + self, Metadata as MetadataToShip, Metric as MetricToShip, Origin as OriginToShip, Series, +}; use crate::errors; use crate::metric::{self, Metric, MetricValue, SortedTags}; +use crate::origin::find_metric_origin; -use datadog_protos::metrics::{Dogsketch, Sketch, SketchPayload}; +use datadog_protos::metrics::{Dogsketch, Metadata, Sketch, SketchPayload}; use ddsketch_agent::DDSketch; use hashbrown::hash_table; -use protobuf::Message; +use protobuf::{Message, MessageField, SpecialFields}; use tracing::{error, warn}; use ustr::Ustr; @@ -261,6 +264,14 @@ fn build_sketch(entry: &Metric, mut base_tag_vec: SortedTags) -> Option base_tag_vec.extend(&tags); } sketch.set_tags(base_tag_vec.to_chars()); + + if let Some(origin) = find_metric_origin(entry, base_tag_vec) { + sketch.set_metadata(Metadata { + origin: MessageField::some(origin), + special_fields: SpecialFields::default(), + }); + } + Some(sketch) } @@ -286,12 +297,22 @@ fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option, + /// Optional metadata associated with the metric + pub(crate) metadata: Option, +} + +#[derive(Debug, Serialize)] +pub struct Metadata { + pub(crate) origin: Option, +} + +#[derive(Debug, Serialize)] +pub struct Origin { + pub(crate) origin_product: u32, + pub(crate) origin_sub_product: u32, + pub(crate) origin_product_detail: u32, } #[derive(Debug, Serialize)] diff --git a/crates/dogstatsd/src/lib.rs b/crates/dogstatsd/src/lib.rs index 4009db1..968ee54 100644 --- a/crates/dogstatsd/src/lib.rs +++ b/crates/dogstatsd/src/lib.rs @@ -14,3 +14,4 @@ pub mod dogstatsd; pub mod errors; pub mod flusher; pub mod metric; +pub mod origin; diff --git a/crates/dogstatsd/src/metric.rs b/crates/dogstatsd/src/metric.rs index 08d68ab..ce4a685 100644 --- a/crates/dogstatsd/src/metric.rs +++ b/crates/dogstatsd/src/metric.rs @@ -130,6 +130,13 @@ impl SortedTags { tags_as_vec } + pub fn find_all(&self, tag_key: &str) -> Vec<&Ustr> { + self.values + .iter() + .filter_map(|(k, v)| if k == tag_key { Some(v) } else { None }) + .collect() + } + pub(crate) fn to_resources(&self) -> Vec { let mut resources = Vec::with_capacity(constants::MAX_TAGS); for (key, val) in &self.values { @@ -583,6 +590,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn parse_tag_no_value() { let result = parse("datadog.tracer.flush_triggered:1|c|#lang:go,lang_version:go1.22.10,_dd.origin:lambda,runtime-id:d66f501c-d09b-4d0d-970f-515235c4eb56,v1.65.1,service:aws.lambda,reason:scheduled"); assert!(result.is_ok()); @@ -596,6 +604,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn parse_tag_multi_column() { let result = parse("datadog.tracer.flush_triggered:1|c|#lang:go:and:something:else"); assert!(result.is_ok()); @@ -606,6 +615,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn parse_tracer_metric() { let input = "datadog.tracer.flush_duration:0.785551|ms|#lang:go,lang_version:go1.23.2,env:redacted_env,_dd.origin:lambda,runtime-id:redacted_runtime,tracer_version:v1.70.1,service:redacted_service,env:redacted_env,service:redacted_service,version:redacted_version"; let expected_error = "ms".to_string(); @@ -617,6 +627,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn parse_metric_timestamp() { // Important to test that we round down to the nearest 10 seconds // for our buckets @@ -626,6 +637,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn parse_metric_no_timestamp() { // *wince* this could be a race condition // we round the timestamp down to a 10s bucket and I want to test now @@ -653,4 +665,13 @@ mod tests { assert_eq!(first_element.0, Ustr::from("a")); assert_eq!(first_element.1, Ustr::from("a1")); } + + #[test] + fn sorted_tags_find_all() { + let tags = SortedTags::parse("a,a:1,b:2,c:3").unwrap(); + assert_eq!(tags.find_all("a"), vec![&Ustr::from(""), &Ustr::from("1")]); + assert_eq!(tags.find_all("b"), vec![&Ustr::from("2")]); + assert_eq!(tags.find_all("c"), vec![&Ustr::from("3")]); + assert_eq!(tags.find_all("d"), Vec::<&Ustr>::new()); + } } diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs new file mode 100644 index 0000000..227dd6d --- /dev/null +++ b/crates/dogstatsd/src/origin.rs @@ -0,0 +1,246 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::metric::{Metric, SortedTags}; +use datadog_protos::metrics::Origin; + +// Metric tag keys +const DD_ORIGIN_TAG_KEY: &str = "origin"; +const AWS_LAMBDA_TAG_KEY: &str = "function_arn"; + +// Metric tag values +const GOOGLE_CLOUD_RUN_TAG_VALUE: &str = "cloudrun"; +const AZURE_APP_SERVICES_TAG_VALUE: &str = "appservice"; +const AZURE_CONTAINER_APP_TAG_VALUE: &str = "containerapp"; +const AZURE_FUNCTIONS_TAG_VALUE: &str = "azurefunction"; + +// Metric prefixes +const DATADOG_PREFIX: &str = "datadog"; +const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run"; +const AZURE_APP_SERVICES_PREFIX: &str = "azure.app_services"; +const AZURE_CONTAINER_APP_PREFIX: &str = "azure.app_containerapps"; +const AZURE_FUNCTIONS_PREFIX: &str = "azure.functions"; +const AWS_LAMBDA_PREFIX: &str = "aws.lambda"; + +/// Represents the product origin of a metric. +/// The full enum is exhaustive so we only include what we need. Please reference the corresponding +/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L161 +pub enum OriginProduct { + Other = 0, + Serverless = 1, +} + +impl From for u32 { + fn from(product: OriginProduct) -> u32 { + product as u32 + } +} + +/// Represents the category origin of a metric. +/// The full enum is exhaustive so we only include what we need. Please reference the corresponding +/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L276 +pub enum OriginCategory { + Other = 0, + AppServicesMetrics = 35, + CloudRunMetrics = 36, + ContainerAppMetrics = 37, + LambdaMetrics = 38, + AzureFunctionsMetrics = 71, +} + +impl From for u32 { + fn from(category: OriginCategory) -> u32 { + category as u32 + } +} + +/// Represents the service origin of a metric. +/// The full enum is exhaustive so we only include what we need. Please reference the corresponding +/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L417 +pub enum OriginService { + Other = 0, +} + +impl From for u32 { + fn from(service: OriginService) -> u32 { + service as u32 + } +} + +/// Struct to hold tag key, tag value, and prefix for matching. +struct MetricOriginCheck { + tag_key: &'static str, + tag_value: &'static str, + prefix: &'static str, +} + +impl MetricOriginCheck { + /// Checks if the tag matches the given key, value, and prefix. + fn matches(&self, tags: &SortedTags, metric_prefix: &str) -> bool { + has_tag_value(tags, self.tag_key, self.tag_value) && metric_prefix != self.prefix + } +} + +const METRIC_ORIGIN_CHECKS: &[MetricOriginCheck] = &[ + MetricOriginCheck { + tag_key: DD_ORIGIN_TAG_KEY, + tag_value: GOOGLE_CLOUD_RUN_TAG_VALUE, + prefix: GOOGLE_CLOUD_RUN_PREFIX, + }, + MetricOriginCheck { + tag_key: DD_ORIGIN_TAG_KEY, + tag_value: AZURE_APP_SERVICES_TAG_VALUE, + prefix: AZURE_APP_SERVICES_PREFIX, + }, + MetricOriginCheck { + tag_key: DD_ORIGIN_TAG_KEY, + tag_value: AZURE_CONTAINER_APP_TAG_VALUE, + prefix: AZURE_CONTAINER_APP_PREFIX, + }, + MetricOriginCheck { + tag_key: DD_ORIGIN_TAG_KEY, + tag_value: AZURE_FUNCTIONS_TAG_VALUE, + prefix: AZURE_FUNCTIONS_PREFIX, + }, + MetricOriginCheck { + tag_key: AWS_LAMBDA_TAG_KEY, + tag_value: "", + prefix: AWS_LAMBDA_PREFIX, + }, +]; + +/// Creates an Origin for serverless metrics. +fn serverless_origin(category: OriginCategory) -> Origin { + Origin { + origin_product: OriginProduct::Serverless.into(), + origin_service: OriginService::Other.into(), + origin_category: category.into(), + ..Default::default() + } +} + +/// Finds the origin of a metric based on its tags and name prefix. +pub fn find_metric_origin(metric: &Metric, tags: SortedTags) -> Option { + let metric_name = metric.name.to_string(); + let metric_prefix = metric_name + .split('.') + .take(2) + .collect::>() + .join("."); + + if is_datadog_metric(&metric_prefix) { + return None; + } + + for (index, origin_check) in METRIC_ORIGIN_CHECKS.iter().enumerate() { + if origin_check.matches(&tags, &metric_prefix) { + let category = match index { + 0 => OriginCategory::CloudRunMetrics, + 1 => OriginCategory::AppServicesMetrics, + 2 => OriginCategory::ContainerAppMetrics, + 3 => OriginCategory::AzureFunctionsMetrics, + 4 => OriginCategory::LambdaMetrics, + _ => OriginCategory::Other, + }; + return Some(serverless_origin(category)); + } + } + + None +} + +/// Checks if the given key-value pair exists in the tags. +fn has_tag_value(tags: &SortedTags, key: &str, value: &str) -> bool { + if value.is_empty() { + return !tags.find_all(key).is_empty(); + } + tags.find_all(key) + .iter() + .any(|tag_value| tag_value.as_str() == value) +} + +/// Checks if the metric is a Datadog metric. +fn is_datadog_metric(prefix: &str) -> bool { + prefix == DATADOG_PREFIX +} + +#[cfg(test)] +mod tests { + use crate::metric::MetricValue; + + use super::*; + + #[test] + fn test_origin_product() { + let origin_product: u32 = OriginProduct::Serverless.into(); + assert_eq!(origin_product, 1); + } + + #[test] + fn test_origin_category() { + let origin_category: u32 = OriginCategory::LambdaMetrics.into(); + assert_eq!(origin_category, 38); + } + + #[test] + fn test_origin_service() { + let origin_service: u32 = OriginService::Other.into(); + assert_eq!(origin_service, 0); + } + + #[test] + fn test_find_metric_origin_aws_lambda_standard_metric() { + let tags = SortedTags::parse("function_arn:hello123").unwrap(); + let mut now = 1656581409; + now = (now / 10) * 10; + + let metric = Metric { + id: 0, + name: "aws.lambda.enhanced.invocations".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: now, + }; + let origin = find_metric_origin(&metric, tags); + assert_eq!(origin, None); + } + + #[test] + fn test_find_metric_origin_aws_lambda_custom_metric() { + let tags = SortedTags::parse("function_arn:hello123").unwrap(); + let mut now = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + now = (now / 10) * 10; + + let metric = Metric { + id: 0, + name: "my.custom.aws.lambda.invocations".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: now, + }; + let origin = find_metric_origin(&metric, tags); + assert_eq!( + origin, + Some(Origin { + origin_product: OriginProduct::Serverless.into(), + origin_category: OriginCategory::LambdaMetrics.into(), + origin_service: OriginService::Other.into(), + ..Default::default() + }) + ); + } + + #[test] + fn test_has_tag_value() { + let tags = SortedTags::parse("a,a:1,b:2,c:3").unwrap(); + assert!(has_tag_value(&tags, "a", "1")); + assert!(has_tag_value(&tags, "b", "2")); + assert!(has_tag_value(&tags, "c", "3")); + assert!(!has_tag_value(&tags, "d", "4")); + } +} From c61f47dbb73d6340aad66472db49035f3fbd27b6 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Fri, 9 May 2025 15:56:57 -0400 Subject: [PATCH 2/8] add details --- crates/dogstatsd/src/origin.rs | 357 +++++++++++++++++++++++---------- 1 file changed, 250 insertions(+), 107 deletions(-) diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index 227dd6d..40b0078 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -15,12 +15,14 @@ const AZURE_CONTAINER_APP_TAG_VALUE: &str = "containerapp"; const AZURE_FUNCTIONS_TAG_VALUE: &str = "azurefunction"; // Metric prefixes -const DATADOG_PREFIX: &str = "datadog"; +const DATADOG_PREFIX: &str = "datadog."; const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run"; const AZURE_APP_SERVICES_PREFIX: &str = "azure.app_services"; const AZURE_CONTAINER_APP_PREFIX: &str = "azure.app_containerapps"; const AZURE_FUNCTIONS_PREFIX: &str = "azure.functions"; const AWS_LAMBDA_PREFIX: &str = "aws.lambda"; +const JVM_PREFIX: &str = "jvm."; +const RUNTIME_PREFIX: &str = "runtime."; /// Represents the product origin of a metric. /// The full enum is exhaustive so we only include what we need. Please reference the corresponding @@ -59,6 +61,9 @@ impl From for u32 { /// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L417 pub enum OriginService { Other = 0, + ServerlessCustom = 472, + ServerlessEnhanced = 473, + ServerlessRuntime = 474, } impl From for u32 { @@ -67,86 +72,57 @@ impl From for u32 { } } -/// Struct to hold tag key, tag value, and prefix for matching. -struct MetricOriginCheck { - tag_key: &'static str, - tag_value: &'static str, - prefix: &'static str, -} - -impl MetricOriginCheck { - /// Checks if the tag matches the given key, value, and prefix. - fn matches(&self, tags: &SortedTags, metric_prefix: &str) -> bool { - has_tag_value(tags, self.tag_key, self.tag_value) && metric_prefix != self.prefix - } -} - -const METRIC_ORIGIN_CHECKS: &[MetricOriginCheck] = &[ - MetricOriginCheck { - tag_key: DD_ORIGIN_TAG_KEY, - tag_value: GOOGLE_CLOUD_RUN_TAG_VALUE, - prefix: GOOGLE_CLOUD_RUN_PREFIX, - }, - MetricOriginCheck { - tag_key: DD_ORIGIN_TAG_KEY, - tag_value: AZURE_APP_SERVICES_TAG_VALUE, - prefix: AZURE_APP_SERVICES_PREFIX, - }, - MetricOriginCheck { - tag_key: DD_ORIGIN_TAG_KEY, - tag_value: AZURE_CONTAINER_APP_TAG_VALUE, - prefix: AZURE_CONTAINER_APP_PREFIX, - }, - MetricOriginCheck { - tag_key: DD_ORIGIN_TAG_KEY, - tag_value: AZURE_FUNCTIONS_TAG_VALUE, - prefix: AZURE_FUNCTIONS_PREFIX, - }, - MetricOriginCheck { - tag_key: AWS_LAMBDA_TAG_KEY, - tag_value: "", - prefix: AWS_LAMBDA_PREFIX, - }, -]; - -/// Creates an Origin for serverless metrics. -fn serverless_origin(category: OriginCategory) -> Origin { - Origin { - origin_product: OriginProduct::Serverless.into(), - origin_service: OriginService::Other.into(), - origin_category: category.into(), - ..Default::default() - } -} - /// Finds the origin of a metric based on its tags and name prefix. pub fn find_metric_origin(metric: &Metric, tags: SortedTags) -> Option { let metric_name = metric.name.to_string(); + + // First check if it's a Datadog metric + if metric_name.starts_with(DATADOG_PREFIX) { + return None; + } + let metric_prefix = metric_name .split('.') .take(2) .collect::>() .join("."); - if is_datadog_metric(&metric_prefix) { - return None; - } + // Determine the service based on metric prefix first + let service = if metric_name.starts_with(JVM_PREFIX) || metric_name.starts_with(RUNTIME_PREFIX) + { + OriginService::ServerlessRuntime + } else if metric_prefix == GOOGLE_CLOUD_RUN_PREFIX + || metric_prefix == AWS_LAMBDA_PREFIX + || metric_prefix == AZURE_APP_SERVICES_PREFIX + || metric_prefix == AZURE_CONTAINER_APP_PREFIX + || metric_prefix == AZURE_FUNCTIONS_PREFIX + { + OriginService::ServerlessEnhanced + } else { + OriginService::ServerlessCustom + }; - for (index, origin_check) in METRIC_ORIGIN_CHECKS.iter().enumerate() { - if origin_check.matches(&tags, &metric_prefix) { - let category = match index { - 0 => OriginCategory::CloudRunMetrics, - 1 => OriginCategory::AppServicesMetrics, - 2 => OriginCategory::ContainerAppMetrics, - 3 => OriginCategory::AzureFunctionsMetrics, - 4 => OriginCategory::LambdaMetrics, - _ => OriginCategory::Other, - }; - return Some(serverless_origin(category)); - } - } + // Then determine the category based on tags + let category = if has_tag_value(&tags, AWS_LAMBDA_TAG_KEY, "") { + OriginCategory::LambdaMetrics + } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, AZURE_APP_SERVICES_TAG_VALUE) { + OriginCategory::AppServicesMetrics + } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, GOOGLE_CLOUD_RUN_TAG_VALUE) { + OriginCategory::CloudRunMetrics + } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, AZURE_CONTAINER_APP_TAG_VALUE) { + OriginCategory::ContainerAppMetrics + } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, AZURE_FUNCTIONS_TAG_VALUE) { + OriginCategory::AzureFunctionsMetrics + } else { + return None; + }; - None + Some(Origin { + origin_product: OriginProduct::Serverless.into(), + origin_service: service.into(), + origin_category: category.into(), + ..Default::default() + }) } /// Checks if the given key-value pair exists in the tags. @@ -159,11 +135,6 @@ fn has_tag_value(tags: &SortedTags, key: &str, value: &str) -> bool { .any(|tag_value| tag_value.as_str() == value) } -/// Checks if the metric is a Datadog metric. -fn is_datadog_metric(prefix: &str) -> bool { - prefix == DATADOG_PREFIX -} - #[cfg(test)] mod tests { use crate::metric::MetricValue; @@ -184,63 +155,235 @@ mod tests { #[test] fn test_origin_service() { - let origin_service: u32 = OriginService::Other.into(); - assert_eq!(origin_service, 0); + let origin_service: u32 = OriginService::ServerlessRuntime.into(); + assert_eq!(origin_service, 474); } #[test] - fn test_find_metric_origin_aws_lambda_standard_metric() { + fn test_find_metric_origin_lambda_runtime() { let tags = SortedTags::parse("function_arn:hello123").unwrap(); - let mut now = 1656581409; - now = (now / 10) * 10; + let metric = Metric { + id: 0, + name: "runtime.memory.used".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = find_metric_origin(&metric, tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::LambdaMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessRuntime as u32 + ); + } + #[test] + fn test_find_metric_origin_lambda_enhanced() { + let tags = SortedTags::parse("function_arn:hello123").unwrap(); let metric = Metric { id: 0, name: "aws.lambda.enhanced.invocations".into(), value: MetricValue::Gauge(1.0), tags: Some(tags.clone()), - timestamp: now, + timestamp: 0, }; - let origin = find_metric_origin(&metric, tags); - assert_eq!(origin, None); + let origin = find_metric_origin(&metric, tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::LambdaMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 + ); } #[test] - fn test_find_metric_origin_aws_lambda_custom_metric() { + fn test_find_metric_origin_lambda_custom() { let tags = SortedTags::parse("function_arn:hello123").unwrap(); - let mut now = std::time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); - now = (now / 10) * 10; + let metric = Metric { + id: 0, + name: "my.custom.metric".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = find_metric_origin(&metric, tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::LambdaMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessCustom as u32 + ); + } + #[test] + fn test_find_metric_origin_cloudrun() { + let tags = SortedTags::parse("origin:cloudrun").unwrap(); let metric = Metric { id: 0, - name: "my.custom.aws.lambda.invocations".into(), + name: "gcp.run.requests".into(), value: MetricValue::Gauge(1.0), tags: Some(tags.clone()), - timestamp: now, + timestamp: 0, }; - let origin = find_metric_origin(&metric, tags); + let origin = find_metric_origin(&metric, tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); assert_eq!( - origin, - Some(Origin { - origin_product: OriginProduct::Serverless.into(), - origin_category: OriginCategory::LambdaMetrics.into(), - origin_service: OriginService::Other.into(), - ..Default::default() - }) + origin.origin_category as u32, + OriginCategory::CloudRunMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 ); } #[test] - fn test_has_tag_value() { - let tags = SortedTags::parse("a,a:1,b:2,c:3").unwrap(); - assert!(has_tag_value(&tags, "a", "1")); - assert!(has_tag_value(&tags, "b", "2")); - assert!(has_tag_value(&tags, "c", "3")); - assert!(!has_tag_value(&tags, "d", "4")); + fn test_find_metric_origin_azure_app_services() { + let tags = SortedTags::parse("origin:appservice").unwrap(); + let metric = Metric { + id: 0, + name: "azure.app_services.requests".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = find_metric_origin(&metric, tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::AppServicesMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 + ); + } + + #[test] + fn test_find_metric_origin_azure_container_app() { + let tags = SortedTags::parse("origin:containerapp").unwrap(); + let metric = Metric { + id: 0, + name: "azure.app_containerapps.requests".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = find_metric_origin(&metric, tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::ContainerAppMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 + ); + } + + #[test] + fn test_find_metric_origin_azure_functions() { + let tags = SortedTags::parse("origin:azurefunction").unwrap(); + let metric = Metric { + id: 0, + name: "azure.functions.requests".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = find_metric_origin(&metric, tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::AzureFunctionsMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 + ); + } + + #[test] + fn test_find_metric_origin_jvm() { + let tags = SortedTags::parse("function_arn:hello123").unwrap(); + let metric = Metric { + id: 0, + name: "jvm.memory.used".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = find_metric_origin(&metric, tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::LambdaMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessRuntime as u32 + ); + } + + #[test] + fn test_find_metric_origin_datadog() { + let tags = SortedTags::parse("function_arn:hello123").unwrap(); + let metric = Metric { + id: 0, + name: "datadog.agent.running".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = find_metric_origin(&metric, tags); + assert_eq!(origin, None); + } + + #[test] + fn test_find_metric_origin_unknown() { + let tags = SortedTags::parse("unknown:tag").unwrap(); + let metric = Metric { + id: 0, + name: "unknown.metric".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = find_metric_origin(&metric, tags); + assert_eq!(origin, None); } } From 4d4fb057a9499e7d0a58bcbe4c80d4686b7c91c2 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Tue, 1 Jul 2025 15:23:05 -0400 Subject: [PATCH 3/8] remove comments referencing internal repo --- crates/dogstatsd/src/origin.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index 40b0078..14ae3f5 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -25,8 +25,7 @@ const JVM_PREFIX: &str = "jvm."; const RUNTIME_PREFIX: &str = "runtime."; /// Represents the product origin of a metric. -/// The full enum is exhaustive so we only include what we need. Please reference the corresponding -/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L161 +/// The full enum is exhaustive so we only include what we need. pub enum OriginProduct { Other = 0, Serverless = 1, @@ -39,8 +38,7 @@ impl From for u32 { } /// Represents the category origin of a metric. -/// The full enum is exhaustive so we only include what we need. Please reference the corresponding -/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L276 +/// The full enum is exhaustive so we only include what we need. pub enum OriginCategory { Other = 0, AppServicesMetrics = 35, @@ -57,8 +55,7 @@ impl From for u32 { } /// Represents the service origin of a metric. -/// The full enum is exhaustive so we only include what we need. Please reference the corresponding -/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L417 +/// The full enum is exhaustive so we only include what we need. pub enum OriginService { Other = 0, ServerlessCustom = 472, From 8904e4ede956f881b2fa907c92f8ca4245fb5235 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Tue, 1 Jul 2025 15:55:48 -0400 Subject: [PATCH 4/8] implement Clone for Metadata and Origin --- crates/dogstatsd/src/datadog.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/dogstatsd/src/datadog.rs b/crates/dogstatsd/src/datadog.rs index 21da16f..c2f2b06 100644 --- a/crates/dogstatsd/src/datadog.rs +++ b/crates/dogstatsd/src/datadog.rs @@ -353,12 +353,12 @@ pub(crate) struct Metric { pub(crate) metadata: Option, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone)] pub struct Metadata { pub(crate) origin: Option, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone)] pub struct Origin { pub(crate) origin_product: u32, pub(crate) origin_sub_product: u32, From 180ad0e7ca7746a955419dcbcd93653225a2930c Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 2 Jul 2025 16:38:56 -0400 Subject: [PATCH 5/8] remove prefixes for enhanced metrics that don't yet exist --- crates/dogstatsd/src/origin.rs | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index 14ae3f5..791e789 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -16,10 +16,6 @@ const AZURE_FUNCTIONS_TAG_VALUE: &str = "azurefunction"; // Metric prefixes const DATADOG_PREFIX: &str = "datadog."; -const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run"; -const AZURE_APP_SERVICES_PREFIX: &str = "azure.app_services"; -const AZURE_CONTAINER_APP_PREFIX: &str = "azure.app_containerapps"; -const AZURE_FUNCTIONS_PREFIX: &str = "azure.functions"; const AWS_LAMBDA_PREFIX: &str = "aws.lambda"; const JVM_PREFIX: &str = "jvm."; const RUNTIME_PREFIX: &str = "runtime."; @@ -88,12 +84,7 @@ pub fn find_metric_origin(metric: &Metric, tags: SortedTags) -> Option { let service = if metric_name.starts_with(JVM_PREFIX) || metric_name.starts_with(RUNTIME_PREFIX) { OriginService::ServerlessRuntime - } else if metric_prefix == GOOGLE_CLOUD_RUN_PREFIX - || metric_prefix == AWS_LAMBDA_PREFIX - || metric_prefix == AZURE_APP_SERVICES_PREFIX - || metric_prefix == AZURE_CONTAINER_APP_PREFIX - || metric_prefix == AZURE_FUNCTIONS_PREFIX - { + } else if metric_prefix == AWS_LAMBDA_PREFIX { OriginService::ServerlessEnhanced } else { OriginService::ServerlessCustom @@ -236,7 +227,7 @@ mod tests { let tags = SortedTags::parse("origin:cloudrun").unwrap(); let metric = Metric { id: 0, - name: "gcp.run.requests".into(), + name: "my.custom.metric".into(), value: MetricValue::Gauge(1.0), tags: Some(tags.clone()), timestamp: 0, @@ -252,7 +243,7 @@ mod tests { ); assert_eq!( origin.origin_service as u32, - OriginService::ServerlessEnhanced as u32 + OriginService::ServerlessCustom as u32 ); } @@ -261,7 +252,7 @@ mod tests { let tags = SortedTags::parse("origin:appservice").unwrap(); let metric = Metric { id: 0, - name: "azure.app_services.requests".into(), + name: "my.custom.metric".into(), value: MetricValue::Gauge(1.0), tags: Some(tags.clone()), timestamp: 0, @@ -277,7 +268,7 @@ mod tests { ); assert_eq!( origin.origin_service as u32, - OriginService::ServerlessEnhanced as u32 + OriginService::ServerlessCustom as u32 ); } @@ -286,7 +277,7 @@ mod tests { let tags = SortedTags::parse("origin:containerapp").unwrap(); let metric = Metric { id: 0, - name: "azure.app_containerapps.requests".into(), + name: "my.custom.metric".into(), value: MetricValue::Gauge(1.0), tags: Some(tags.clone()), timestamp: 0, @@ -302,7 +293,7 @@ mod tests { ); assert_eq!( origin.origin_service as u32, - OriginService::ServerlessEnhanced as u32 + OriginService::ServerlessCustom as u32 ); } @@ -311,7 +302,7 @@ mod tests { let tags = SortedTags::parse("origin:azurefunction").unwrap(); let metric = Metric { id: 0, - name: "azure.functions.requests".into(), + name: "my.custom.metric".into(), value: MetricValue::Gauge(1.0), tags: Some(tags.clone()), timestamp: 0, @@ -327,7 +318,7 @@ mod tests { ); assert_eq!( origin.origin_service as u32, - OriginService::ServerlessEnhanced as u32 + OriginService::ServerlessCustom as u32 ); } From 74af5f62a381fad6d4248373864a2189a2f911d1 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 9 Jul 2025 09:54:30 -0400 Subject: [PATCH 6/8] add back logic for gcp.run enhanced metrics --- crates/dogstatsd/src/origin.rs | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index 791e789..a54bdc8 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -17,6 +17,7 @@ const AZURE_FUNCTIONS_TAG_VALUE: &str = "azurefunction"; // Metric prefixes const DATADOG_PREFIX: &str = "datadog."; const AWS_LAMBDA_PREFIX: &str = "aws.lambda"; +const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run"; const JVM_PREFIX: &str = "jvm."; const RUNTIME_PREFIX: &str = "runtime."; @@ -84,7 +85,7 @@ pub fn find_metric_origin(metric: &Metric, tags: SortedTags) -> Option { let service = if metric_name.starts_with(JVM_PREFIX) || metric_name.starts_with(RUNTIME_PREFIX) { OriginService::ServerlessRuntime - } else if metric_prefix == AWS_LAMBDA_PREFIX { + } else if metric_prefix == AWS_LAMBDA_PREFIX || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX { OriginService::ServerlessEnhanced } else { OriginService::ServerlessCustom @@ -223,7 +224,32 @@ mod tests { } #[test] - fn test_find_metric_origin_cloudrun() { + fn test_find_metric_origin_cloudrun_enhanced() { + let tags = SortedTags::parse("origin:cloudrun").unwrap(); + let metric = Metric { + id: 0, + name: "gcp.run.enhanced.cold_start".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = find_metric_origin(&metric, tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::CloudRunMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 + ); + } + + #[test] + fn test_find_metric_origin_cloudrun_custom() { let tags = SortedTags::parse("origin:cloudrun").unwrap(); let metric = Metric { id: 0, From 5de9cd42f41197a461a404a44279a38098ee75c9 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 9 Jul 2025 12:17:41 -0400 Subject: [PATCH 7/8] convert find_metric_origin to struct method --- crates/dogstatsd/src/aggregator.rs | 5 +- crates/dogstatsd/src/origin.rs | 112 +++++++++++++++-------------- 2 files changed, 59 insertions(+), 58 deletions(-) diff --git a/crates/dogstatsd/src/aggregator.rs b/crates/dogstatsd/src/aggregator.rs index 67b9dae..7fd17e7 100644 --- a/crates/dogstatsd/src/aggregator.rs +++ b/crates/dogstatsd/src/aggregator.rs @@ -9,7 +9,6 @@ use crate::datadog::{ }; use crate::errors; use crate::metric::{self, Metric, MetricValue, SortedTags}; -use crate::origin::find_metric_origin; use datadog_protos::metrics::{Dogsketch, Metadata, Sketch, SketchPayload}; use ddsketch_agent::DDSketch; @@ -265,7 +264,7 @@ fn build_sketch(entry: &Metric, mut base_tag_vec: SortedTags) -> Option } sketch.set_tags(base_tag_vec.to_chars()); - if let Some(origin) = find_metric_origin(entry, base_tag_vec) { + if let Some(origin) = entry.find_origin(base_tag_vec) { sketch.set_metadata(Metadata { origin: MessageField::some(origin), special_fields: SpecialFields::default(), @@ -297,7 +296,7 @@ fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option for u32 { } } -/// Finds the origin of a metric based on its tags and name prefix. -pub fn find_metric_origin(metric: &Metric, tags: SortedTags) -> Option { - let metric_name = metric.name.to_string(); +impl Metric { + /// Finds the origin of a metric based on its tags and name prefix. + pub fn find_origin(&self, tags: SortedTags) -> Option { + let metric_name = self.name.to_string(); + + // First check if it's a Datadog metric + if metric_name.starts_with(DATADOG_PREFIX) { + return None; + } + + let metric_prefix = metric_name + .split('.') + .take(2) + .collect::>() + .join("."); + + // Determine the service based on metric prefix first + let service = if metric_name.starts_with(JVM_PREFIX) || metric_name.starts_with(RUNTIME_PREFIX) + { + OriginService::ServerlessRuntime + } else if metric_prefix == AWS_LAMBDA_PREFIX || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX { + OriginService::ServerlessEnhanced + } else { + OriginService::ServerlessCustom + }; - // First check if it's a Datadog metric - if metric_name.starts_with(DATADOG_PREFIX) { - return None; - } + // Then determine the category based on tags + let category = if has_tag_value(&tags, AWS_LAMBDA_TAG_KEY, "") { + OriginCategory::LambdaMetrics + } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, AZURE_APP_SERVICES_TAG_VALUE) { + OriginCategory::AppServicesMetrics + } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, GOOGLE_CLOUD_RUN_TAG_VALUE) { + OriginCategory::CloudRunMetrics + } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, AZURE_CONTAINER_APP_TAG_VALUE) { + OriginCategory::ContainerAppMetrics + } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, AZURE_FUNCTIONS_TAG_VALUE) { + OriginCategory::AzureFunctionsMetrics + } else { + return None; + }; - let metric_prefix = metric_name - .split('.') - .take(2) - .collect::>() - .join("."); - - // Determine the service based on metric prefix first - let service = if metric_name.starts_with(JVM_PREFIX) || metric_name.starts_with(RUNTIME_PREFIX) - { - OriginService::ServerlessRuntime - } else if metric_prefix == AWS_LAMBDA_PREFIX || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX { - OriginService::ServerlessEnhanced - } else { - OriginService::ServerlessCustom - }; - - // Then determine the category based on tags - let category = if has_tag_value(&tags, AWS_LAMBDA_TAG_KEY, "") { - OriginCategory::LambdaMetrics - } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, AZURE_APP_SERVICES_TAG_VALUE) { - OriginCategory::AppServicesMetrics - } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, GOOGLE_CLOUD_RUN_TAG_VALUE) { - OriginCategory::CloudRunMetrics - } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, AZURE_CONTAINER_APP_TAG_VALUE) { - OriginCategory::ContainerAppMetrics - } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, AZURE_FUNCTIONS_TAG_VALUE) { - OriginCategory::AzureFunctionsMetrics - } else { - return None; - }; - - Some(Origin { - origin_product: OriginProduct::Serverless.into(), - origin_service: service.into(), - origin_category: category.into(), - ..Default::default() - }) + Some(Origin { + origin_product: OriginProduct::Serverless.into(), + origin_service: service.into(), + origin_category: category.into(), + ..Default::default() + }) + } } /// Checks if the given key-value pair exists in the tags. @@ -158,7 +160,7 @@ mod tests { tags: Some(tags.clone()), timestamp: 0, }; - let origin = find_metric_origin(&metric, tags).unwrap(); + let origin = metric.find_origin(tags).unwrap(); assert_eq!( origin.origin_product as u32, OriginProduct::Serverless as u32 @@ -183,7 +185,7 @@ mod tests { tags: Some(tags.clone()), timestamp: 0, }; - let origin = find_metric_origin(&metric, tags).unwrap(); + let origin = metric.find_origin(tags).unwrap(); assert_eq!( origin.origin_product as u32, OriginProduct::Serverless as u32 @@ -208,7 +210,7 @@ mod tests { tags: Some(tags.clone()), timestamp: 0, }; - let origin = find_metric_origin(&metric, tags).unwrap(); + let origin = metric.find_origin(tags).unwrap(); assert_eq!( origin.origin_product as u32, OriginProduct::Serverless as u32 @@ -233,7 +235,7 @@ mod tests { tags: Some(tags.clone()), timestamp: 0, }; - let origin = find_metric_origin(&metric, tags).unwrap(); + let origin = metric.find_origin(tags).unwrap(); assert_eq!( origin.origin_product as u32, OriginProduct::Serverless as u32 @@ -258,7 +260,7 @@ mod tests { tags: Some(tags.clone()), timestamp: 0, }; - let origin = find_metric_origin(&metric, tags).unwrap(); + let origin = metric.find_origin(tags).unwrap(); assert_eq!( origin.origin_product as u32, OriginProduct::Serverless as u32 @@ -283,7 +285,7 @@ mod tests { tags: Some(tags.clone()), timestamp: 0, }; - let origin = find_metric_origin(&metric, tags).unwrap(); + let origin = metric.find_origin(tags).unwrap(); assert_eq!( origin.origin_product as u32, OriginProduct::Serverless as u32 @@ -308,7 +310,7 @@ mod tests { tags: Some(tags.clone()), timestamp: 0, }; - let origin = find_metric_origin(&metric, tags).unwrap(); + let origin = metric.find_origin(tags).unwrap(); assert_eq!( origin.origin_product as u32, OriginProduct::Serverless as u32 @@ -333,7 +335,7 @@ mod tests { tags: Some(tags.clone()), timestamp: 0, }; - let origin = find_metric_origin(&metric, tags).unwrap(); + let origin = metric.find_origin(tags).unwrap(); assert_eq!( origin.origin_product as u32, OriginProduct::Serverless as u32 @@ -358,7 +360,7 @@ mod tests { tags: Some(tags.clone()), timestamp: 0, }; - let origin = find_metric_origin(&metric, tags).unwrap(); + let origin = metric.find_origin(tags).unwrap(); assert_eq!( origin.origin_product as u32, OriginProduct::Serverless as u32 @@ -383,7 +385,7 @@ mod tests { tags: Some(tags.clone()), timestamp: 0, }; - let origin = find_metric_origin(&metric, tags); + let origin = metric.find_origin(tags); assert_eq!(origin, None); } @@ -397,7 +399,7 @@ mod tests { tags: Some(tags.clone()), timestamp: 0, }; - let origin = find_metric_origin(&metric, tags); + let origin = metric.find_origin(tags); assert_eq!(origin, None); } } From 076c45f2b66818ec8d43a878397f886541a74498 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 9 Jul 2025 12:20:14 -0400 Subject: [PATCH 8/8] apply formatting --- crates/dogstatsd/src/origin.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index 5865e19..d0c0952 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -83,7 +83,8 @@ impl Metric { .join("."); // Determine the service based on metric prefix first - let service = if metric_name.starts_with(JVM_PREFIX) || metric_name.starts_with(RUNTIME_PREFIX) + let service = if metric_name.starts_with(JVM_PREFIX) + || metric_name.starts_with(RUNTIME_PREFIX) { OriginService::ServerlessRuntime } else if metric_prefix == AWS_LAMBDA_PREFIX || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX {