diff --git a/crates/dogstatsd/src/aggregator.rs b/crates/dogstatsd/src/aggregator.rs index f31a3b0..7fd17e7 100644 --- a/crates/dogstatsd/src/aggregator.rs +++ b/crates/dogstatsd/src/aggregator.rs @@ -4,14 +4,16 @@ //! The aggregation of metrics. use crate::constants; -use crate::datadog::{self, Metric as MetricToShip, Series}; +use crate::datadog::{ + self, Metadata as MetadataToShip, Metric as MetricToShip, Origin as OriginToShip, Series, +}; use crate::errors; use crate::metric::{self, Metric, MetricValue, SortedTags}; -use datadog_protos::metrics::{Dogsketch, Sketch, SketchPayload}; +use datadog_protos::metrics::{Dogsketch, Metadata, Sketch, SketchPayload}; use ddsketch_agent::DDSketch; use hashbrown::hash_table; -use protobuf::Message; +use protobuf::{Message, MessageField, SpecialFields}; use tracing::{error, warn}; use ustr::Ustr; @@ -261,6 +263,14 @@ fn build_sketch(entry: &Metric, mut base_tag_vec: SortedTags) -> Option base_tag_vec.extend(&tags); } sketch.set_tags(base_tag_vec.to_chars()); + + if let Some(origin) = entry.find_origin(base_tag_vec) { + sketch.set_metadata(Metadata { + origin: MessageField::some(origin), + special_fields: SpecialFields::default(), + }); + } + Some(sketch) } @@ -286,12 +296,22 @@ fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option, + /// Optional metadata associated with the metric + pub(crate) metadata: Option, +} + +#[derive(Debug, Serialize, Clone)] +pub struct Metadata { + pub(crate) origin: Option, +} + +#[derive(Debug, Serialize, Clone)] +pub struct Origin { + pub(crate) origin_product: u32, + pub(crate) origin_sub_product: u32, + pub(crate) origin_product_detail: u32, } #[derive(Debug, Serialize, Clone)] diff --git a/crates/dogstatsd/src/lib.rs b/crates/dogstatsd/src/lib.rs index 6aea675..59cbd5b 100644 --- a/crates/dogstatsd/src/lib.rs +++ b/crates/dogstatsd/src/lib.rs @@ -15,3 +15,4 @@ pub mod dogstatsd; pub mod errors; pub mod flusher; pub mod metric; +pub mod origin; diff --git a/crates/dogstatsd/src/metric.rs b/crates/dogstatsd/src/metric.rs index c7d9e37..71e6099 100644 --- a/crates/dogstatsd/src/metric.rs +++ b/crates/dogstatsd/src/metric.rs @@ -130,6 +130,13 @@ impl SortedTags { tags_as_vec } + pub fn find_all(&self, tag_key: &str) -> Vec<&Ustr> { + self.values + .iter() + .filter_map(|(k, v)| if k == tag_key { Some(v) } else { None }) + .collect() + } + pub(crate) fn to_resources(&self) -> Vec { let mut resources = Vec::with_capacity(constants::MAX_TAGS); for (key, val) in &self.values { @@ -586,6 +593,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn parse_tag_no_value() { let result = parse("datadog.tracer.flush_triggered:1|c|#lang:go,lang_version:go1.22.10,_dd.origin:lambda,runtime-id:d66f501c-d09b-4d0d-970f-515235c4eb56,v1.65.1,service:aws.lambda,reason:scheduled"); assert!(result.is_ok()); @@ -599,6 +607,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn parse_tag_multi_column() { let result = parse("datadog.tracer.flush_triggered:1|c|#lang:go:and:something:else"); assert!(result.is_ok()); @@ -609,6 +618,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn parse_tracer_metric() { let input = "datadog.tracer.flush_duration:0.785551|ms|#lang:go,lang_version:go1.23.2,env:redacted_env,_dd.origin:lambda,runtime-id:redacted_runtime,tracer_version:v1.70.1,service:redacted_service,env:redacted_env,service:redacted_service,version:redacted_version"; let expected_error = "ms".to_string(); @@ -620,6 +630,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn parse_metric_timestamp() { // Important to test that we round down to the nearest 10 seconds // for our buckets @@ -636,6 +647,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn parse_metric_no_timestamp() { // *wince* this could be a race condition // we round the timestamp down to a 10s bucket and I want to test now @@ -663,4 +675,13 @@ mod tests { assert_eq!(first_element.0, Ustr::from("a")); assert_eq!(first_element.1, Ustr::from("a1")); } + + #[test] + fn sorted_tags_find_all() { + let tags = SortedTags::parse("a,a:1,b:2,c:3").unwrap(); + assert_eq!(tags.find_all("a"), vec![&Ustr::from(""), &Ustr::from("1")]); + assert_eq!(tags.find_all("b"), vec![&Ustr::from("2")]); + assert_eq!(tags.find_all("c"), vec![&Ustr::from("3")]); + assert_eq!(tags.find_all("d"), Vec::<&Ustr>::new()); + } } diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs new file mode 100644 index 0000000..d0c0952 --- /dev/null +++ b/crates/dogstatsd/src/origin.rs @@ -0,0 +1,406 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::metric::{Metric, SortedTags}; +use datadog_protos::metrics::Origin; + +// Metric tag keys +const DD_ORIGIN_TAG_KEY: &str = "origin"; +const AWS_LAMBDA_TAG_KEY: &str = "function_arn"; + +// Metric tag values +const GOOGLE_CLOUD_RUN_TAG_VALUE: &str = "cloudrun"; +const AZURE_APP_SERVICES_TAG_VALUE: &str = "appservice"; +const AZURE_CONTAINER_APP_TAG_VALUE: &str = "containerapp"; +const AZURE_FUNCTIONS_TAG_VALUE: &str = "azurefunction"; + +// Metric prefixes +const DATADOG_PREFIX: &str = "datadog."; +const AWS_LAMBDA_PREFIX: &str = "aws.lambda"; +const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run"; +const JVM_PREFIX: &str = "jvm."; +const RUNTIME_PREFIX: &str = "runtime."; + +/// Represents the product origin of a metric. +/// The full enum is exhaustive so we only include what we need. +pub enum OriginProduct { + Other = 0, + Serverless = 1, +} + +impl From for u32 { + fn from(product: OriginProduct) -> u32 { + product as u32 + } +} + +/// Represents the category origin of a metric. +/// The full enum is exhaustive so we only include what we need. +pub enum OriginCategory { + Other = 0, + AppServicesMetrics = 35, + CloudRunMetrics = 36, + ContainerAppMetrics = 37, + LambdaMetrics = 38, + AzureFunctionsMetrics = 71, +} + +impl From for u32 { + fn from(category: OriginCategory) -> u32 { + category as u32 + } +} + +/// Represents the service origin of a metric. +/// The full enum is exhaustive so we only include what we need. +pub enum OriginService { + Other = 0, + ServerlessCustom = 472, + ServerlessEnhanced = 473, + ServerlessRuntime = 474, +} + +impl From for u32 { + fn from(service: OriginService) -> u32 { + service as u32 + } +} + +impl Metric { + /// Finds the origin of a metric based on its tags and name prefix. + pub fn find_origin(&self, tags: SortedTags) -> Option { + let metric_name = self.name.to_string(); + + // First check if it's a Datadog metric + if metric_name.starts_with(DATADOG_PREFIX) { + return None; + } + + let metric_prefix = metric_name + .split('.') + .take(2) + .collect::>() + .join("."); + + // Determine the service based on metric prefix first + let service = if metric_name.starts_with(JVM_PREFIX) + || metric_name.starts_with(RUNTIME_PREFIX) + { + OriginService::ServerlessRuntime + } else if metric_prefix == AWS_LAMBDA_PREFIX || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX { + OriginService::ServerlessEnhanced + } else { + OriginService::ServerlessCustom + }; + + // Then determine the category based on tags + let category = if has_tag_value(&tags, AWS_LAMBDA_TAG_KEY, "") { + OriginCategory::LambdaMetrics + } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, AZURE_APP_SERVICES_TAG_VALUE) { + OriginCategory::AppServicesMetrics + } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, GOOGLE_CLOUD_RUN_TAG_VALUE) { + OriginCategory::CloudRunMetrics + } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, AZURE_CONTAINER_APP_TAG_VALUE) { + OriginCategory::ContainerAppMetrics + } else if has_tag_value(&tags, DD_ORIGIN_TAG_KEY, AZURE_FUNCTIONS_TAG_VALUE) { + OriginCategory::AzureFunctionsMetrics + } else { + return None; + }; + + Some(Origin { + origin_product: OriginProduct::Serverless.into(), + origin_service: service.into(), + origin_category: category.into(), + ..Default::default() + }) + } +} + +/// Checks if the given key-value pair exists in the tags. +fn has_tag_value(tags: &SortedTags, key: &str, value: &str) -> bool { + if value.is_empty() { + return !tags.find_all(key).is_empty(); + } + tags.find_all(key) + .iter() + .any(|tag_value| tag_value.as_str() == value) +} + +#[cfg(test)] +mod tests { + use crate::metric::MetricValue; + + use super::*; + + #[test] + fn test_origin_product() { + let origin_product: u32 = OriginProduct::Serverless.into(); + assert_eq!(origin_product, 1); + } + + #[test] + fn test_origin_category() { + let origin_category: u32 = OriginCategory::LambdaMetrics.into(); + assert_eq!(origin_category, 38); + } + + #[test] + fn test_origin_service() { + let origin_service: u32 = OriginService::ServerlessRuntime.into(); + assert_eq!(origin_service, 474); + } + + #[test] + fn test_find_metric_origin_lambda_runtime() { + let tags = SortedTags::parse("function_arn:hello123").unwrap(); + let metric = Metric { + id: 0, + name: "runtime.memory.used".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::LambdaMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessRuntime as u32 + ); + } + + #[test] + fn test_find_metric_origin_lambda_enhanced() { + let tags = SortedTags::parse("function_arn:hello123").unwrap(); + let metric = Metric { + id: 0, + name: "aws.lambda.enhanced.invocations".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::LambdaMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 + ); + } + + #[test] + fn test_find_metric_origin_lambda_custom() { + let tags = SortedTags::parse("function_arn:hello123").unwrap(); + let metric = Metric { + id: 0, + name: "my.custom.metric".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::LambdaMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessCustom as u32 + ); + } + + #[test] + fn test_find_metric_origin_cloudrun_enhanced() { + let tags = SortedTags::parse("origin:cloudrun").unwrap(); + let metric = Metric { + id: 0, + name: "gcp.run.enhanced.cold_start".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::CloudRunMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 + ); + } + + #[test] + fn test_find_metric_origin_cloudrun_custom() { + let tags = SortedTags::parse("origin:cloudrun").unwrap(); + let metric = Metric { + id: 0, + name: "my.custom.metric".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::CloudRunMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessCustom as u32 + ); + } + + #[test] + fn test_find_metric_origin_azure_app_services() { + let tags = SortedTags::parse("origin:appservice").unwrap(); + let metric = Metric { + id: 0, + name: "my.custom.metric".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::AppServicesMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessCustom as u32 + ); + } + + #[test] + fn test_find_metric_origin_azure_container_app() { + let tags = SortedTags::parse("origin:containerapp").unwrap(); + let metric = Metric { + id: 0, + name: "my.custom.metric".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::ContainerAppMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessCustom as u32 + ); + } + + #[test] + fn test_find_metric_origin_azure_functions() { + let tags = SortedTags::parse("origin:azurefunction").unwrap(); + let metric = Metric { + id: 0, + name: "my.custom.metric".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::AzureFunctionsMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessCustom as u32 + ); + } + + #[test] + fn test_find_metric_origin_jvm() { + let tags = SortedTags::parse("function_arn:hello123").unwrap(); + let metric = Metric { + id: 0, + name: "jvm.memory.used".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::LambdaMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessRuntime as u32 + ); + } + + #[test] + fn test_find_metric_origin_datadog() { + let tags = SortedTags::parse("function_arn:hello123").unwrap(); + let metric = Metric { + id: 0, + name: "datadog.agent.running".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags); + assert_eq!(origin, None); + } + + #[test] + fn test_find_metric_origin_unknown() { + let tags = SortedTags::parse("unknown:tag").unwrap(); + let metric = Metric { + id: 0, + name: "unknown.metric".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags); + assert_eq!(origin, None); + } +}