From f015b299b0249d082f297f7aee15f42ae091c77b Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Thu, 27 Jul 2023 13:41:10 -0400 Subject: [PATCH] feat: Refactor TraceEvent insert to use TargetPath compatible types (#18090) --- lib/vector-core/src/event/trace.rs | 10 ++--- src/sinks/datadog/traces/tests.rs | 17 ++++---- src/source_sender/mod.rs | 3 +- src/sources/datadog_agent/mod.rs | 15 ++++--- src/sources/datadog_agent/traces.rs | 55 +++++++++++++++----------- src/sources/http_client/client.rs | 7 +--- src/test_util/mock/transforms/basic.rs | 6 +-- src/transforms/sample.rs | 9 ++++- 8 files changed, 66 insertions(+), 56 deletions(-) diff --git a/lib/vector-core/src/event/trace.rs b/lib/vector-core/src/event/trace.rs index 21da7413dbc4a..48744bc73f0af 100644 --- a/lib/vector-core/src/event/trace.rs +++ b/lib/vector-core/src/event/trace.rs @@ -85,18 +85,14 @@ impl TraceEvent { self.0.contains(key.as_ref()) } - // TODO This should eventually use TargetPath for the `key` parameter. - // https://github.com/vectordotdev/vector/issues/18059 - pub fn insert( + pub fn insert<'a>( &mut self, - key: impl AsRef, + key: impl TargetPath<'a>, value: impl Into + Debug, ) -> Option { - self.0.insert(key.as_ref(), value.into()) + self.0.insert(key, value.into()) } - // TODO Audit code and use this if possible. - // https://github.com/vectordotdev/vector/issues/18059 pub fn maybe_insert<'a, F: FnOnce() -> Value>( &mut self, prefix: PathPrefix, diff --git a/src/sinks/datadog/traces/tests.rs b/src/sinks/datadog/traces/tests.rs index 1ba7468c7d9bb..af9723c236970 100644 --- a/src/sinks/datadog/traces/tests.rs +++ b/src/sinks/datadog/traces/tests.rs @@ -9,6 +9,7 @@ use ordered_float::NotNan; use prost::Message; use rmp_serde; use vector_core::event::{BatchNotifier, BatchStatus, Event}; +use vrl::event_path; use super::{apm_stats::StatsPayload, dd_proto, ddsketch_full, DatadogTracesConfig}; @@ -95,15 +96,15 @@ fn simple_span(resource: String) -> BTreeMap { pub fn simple_trace_event(resource: String) -> TraceEvent { let mut t = TraceEvent::default(); - t.insert("language", "a_language"); - t.insert("agent_version", "1.23456"); - t.insert("host", "a_host"); - t.insert("env", "an_env"); - t.insert("trace_id", Value::Integer(123)); - t.insert("target_tps", Value::Integer(10)); - t.insert("error_tps", Value::Integer(5)); + t.insert(event_path!("language"), "a_language"); + t.insert(event_path!("agent_version"), "1.23456"); + t.insert(event_path!("host"), "a_host"); + t.insert(event_path!("env"), "an_env"); + t.insert(event_path!("trace_id"), Value::Integer(123)); + t.insert(event_path!("target_tps"), Value::Integer(10)); + t.insert(event_path!("error_tps"), Value::Integer(5)); t.insert( - "spans", + event_path!("spans"), Value::Array(vec![Value::from(simple_span(resource))]), ); t diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index fea4a3980b64d..2869152f45e54 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -391,6 +391,7 @@ mod tests { use chrono::{DateTime, Duration}; use rand::{thread_rng, Rng}; use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent}; + use vrl::event_path; use super::*; use crate::metrics::{self, Controller}; @@ -424,7 +425,7 @@ mod tests { async fn emits_lag_time_for_trace() { emit_and_test(|timestamp| { let mut trace = TraceEvent::default(); - trace.insert("timestamp", timestamp); + trace.insert(event_path!("timestamp"), timestamp); Event::Trace(trace) }) .await; diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index f089b21844117..04614f1ac25c9 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -34,6 +34,7 @@ use vector_common::internal_event::{EventsReceived, Registered}; use vector_config::configurable_component; use vector_core::config::{LegacyKey, LogNamespace}; use vector_core::event::{BatchNotifier, BatchStatus}; +use vrl::path::OwnedTargetPath; use vrl::value::Kind; use warp::{filters::BoxedFilter, reject::Rejection, reply::Response, Filter, Reply}; @@ -283,8 +284,8 @@ pub struct ApiKeyQueryParams { #[derive(Clone)] pub(crate) struct DatadogAgentSource { pub(crate) api_key_extractor: ApiKeyExtractor, - pub(crate) log_schema_host_key: String, - pub(crate) log_schema_source_type_key: String, + pub(crate) log_schema_host_key: OwnedTargetPath, + pub(crate) log_schema_source_type_key: OwnedTargetPath, pub(crate) log_namespace: LogNamespace, pub(crate) decoder: Decoder, protocol: &'static str, @@ -334,11 +335,13 @@ impl DatadogAgentSource { .expect("static regex always compiles"), }, log_schema_host_key: log_schema() - .host_key() - .map_or("".to_string(), |key| key.to_string()), + .host_key_target_path() + .expect("global log_schema.host_key to be valid path") + .clone(), log_schema_source_type_key: log_schema() - .source_type_key() - .map_or("".to_string(), |key| key.to_string()), + .source_type_key_target_path() + .expect("global log_schema.source_type_key to be valid path") + .clone(), decoder, protocol, logs_schema_definition: Arc::new(logs_schema_definition), diff --git a/src/sources/datadog_agent/traces.rs b/src/sources/datadog_agent/traces.rs index da42dfa23a05e..ec5a56636169f 100644 --- a/src/sources/datadog_agent/traces.rs +++ b/src/sources/datadog_agent/traces.rs @@ -8,6 +8,7 @@ use ordered_float::NotNan; use prost::Message; use vector_common::internal_event::{CountByteSize, InternalEventHandle as _}; use vector_core::EstimatedJsonEncodedSizeOf; +use vrl::event_path; use warp::{filters::BoxedFilter, path, path::FullPath, reply::Response, Filter, Rejection, Reply}; use crate::{ @@ -142,11 +143,11 @@ fn handle_dd_trace_payload_v1( .set_datadog_api_key(Arc::clone(k)); } trace_event.insert( - source.log_schema_source_type_key.as_str(), + &source.log_schema_source_type_key, Bytes::from("datadog_agent"), ); trace_event.insert("payload_version", "v2".to_string()); - trace_event.insert(source.log_schema_host_key.as_str(), hostname.clone()); + trace_event.insert(&source.log_schema_host_key, hostname.clone()); trace_event.insert("env", env.clone()); trace_event.insert("agent_version", agent_version.clone()); trace_event.insert("target_tps", target_tps); @@ -169,15 +170,15 @@ fn convert_dd_tracer_payload(payload: ddtrace_proto::TracerPayload) -> Vec Vec>(), ); - trace_event.insert("container_id", payload.container_id.clone()); - trace_event.insert("language_name", payload.language_name.clone()); - trace_event.insert("language_version", payload.language_version.clone()); - trace_event.insert("tracer_version", payload.tracer_version.clone()); - trace_event.insert("runtime_id", payload.runtime_id.clone()); - trace_event.insert("app_version", payload.app_version.clone()); + trace_event.insert(event_path!("container_id"), payload.container_id.clone()); + trace_event.insert(event_path!("language_name"), payload.language_name.clone()); + trace_event.insert( + event_path!("language_version"), + payload.language_version.clone(), + ); + trace_event.insert( + event_path!("tracer_version"), + payload.tracer_version.clone(), + ); + trace_event.insert(event_path!("runtime_id"), payload.runtime_id.clone()); + trace_event.insert(event_path!("app_version"), payload.app_version.clone()); trace_event }) .collect() @@ -217,11 +224,11 @@ fn handle_dd_trace_payload_v0( // TODO trace_id is being forced into an i64 but // the incoming payload is u64. This is a bug and needs to be fixed per: // https://github.com/vectordotdev/vector/issues/14687 - trace_event.insert("trace_id", dd_trace.trace_id as i64); - trace_event.insert("start_time", Utc.timestamp_nanos(dd_trace.start_time)); - trace_event.insert("end_time", Utc.timestamp_nanos(dd_trace.end_time)); + trace_event.insert(event_path!("trace_id"), dd_trace.trace_id as i64); + trace_event.insert(event_path!("start_time"), Utc.timestamp_nanos(dd_trace.start_time)); + trace_event.insert(event_path!("end_time"), Utc.timestamp_nanos(dd_trace.end_time)); trace_event.insert( - "spans", + event_path!("spans"), dd_trace .spans .into_iter() @@ -233,8 +240,8 @@ fn handle_dd_trace_payload_v0( //... and each APM event is also mapped into its own event .chain(decoded_payload.transactions.into_iter().map(|s| { let mut trace_event = TraceEvent::default(); - trace_event.insert("spans", vec![Value::from(convert_span(s))]); - trace_event.insert("dropped", true); + trace_event.insert(event_path!("spans"), vec![Value::from(convert_span(s))]); + trace_event.insert(event_path!("dropped"), true); trace_event })).collect(); @@ -252,15 +259,15 @@ fn handle_dd_trace_payload_v0( .set_datadog_api_key(Arc::clone(k)); } if let Some(lang) = lang { - trace_event.insert("language_name", lang.clone()); + trace_event.insert(event_path!("language_name"), lang.clone()); } trace_event.insert( - source.log_schema_source_type_key.as_str(), + &source.log_schema_source_type_key, Bytes::from("datadog_agent"), ); - trace_event.insert("payload_version", "v1".to_string()); - trace_event.insert(source.log_schema_host_key.as_str(), hostname.clone()); - trace_event.insert("env", env.clone()); + trace_event.insert(event_path!("payload_version"), "v1".to_string()); + trace_event.insert(&source.log_schema_host_key, hostname.clone()); + trace_event.insert(event_path!("env"), env.clone()); Event::Trace(trace_event) }) .collect(); diff --git a/src/sources/http_client/client.rs b/src/sources/http_client/client.rs index 481f1c697af84..a665a1c09c3e9 100644 --- a/src/sources/http_client/client.rs +++ b/src/sources/http_client/client.rs @@ -350,11 +350,8 @@ impl http_client::HttpClientContext for HttpClientContext { } } Event::Trace(ref mut trace) => { - if let Some(source_type_key) = log_schema().source_type_key() { - trace.insert( - source_type_key.to_string(), - Bytes::from(HttpClientConfig::NAME), - ); + if let Some(source_type_key) = log_schema().source_type_key_target_path() { + trace.insert(source_type_key, Bytes::from(HttpClientConfig::NAME)); } } } diff --git a/src/test_util/mock/transforms/basic.rs b/src/test_util/mock/transforms/basic.rs index c7f877eabb244..1d9b65a5b27e3 100644 --- a/src/test_util/mock/transforms/basic.rs +++ b/src/test_util/mock/transforms/basic.rs @@ -118,14 +118,14 @@ impl FunctionTransform for BasicTransform { } } Event::Trace(trace) => { - if let Some(message_key) = crate::config::log_schema().message_key() { + if let Some(message_key) = crate::config::log_schema().message_key_target_path() { let mut v = trace - .get((PathPrefix::Event, message_key)) + .get(message_key) .unwrap() .to_string_lossy() .into_owned(); v.push_str(&self.suffix); - trace.insert(message_key.to_string().as_str(), Value::from(v)); + trace.insert(message_key, Value::from(v)); } } }; diff --git a/src/transforms/sample.rs b/src/transforms/sample.rs index c919e9e9bad11..53ca4847f5723 100644 --- a/src/transforms/sample.rs +++ b/src/transforms/sample.rs @@ -1,5 +1,6 @@ use vector_config::configurable_component; use vector_core::config::LogNamespace; +use vrl::event_path; use crate::{ conditions::{AnyCondition, Condition}, @@ -145,8 +146,12 @@ impl FunctionTransform for Sample { if num % self.rate == 0 { match event { - Event::Log(ref mut event) => event.insert("sample_rate", self.rate.to_string()), - Event::Trace(ref mut event) => event.insert("sample_rate", self.rate.to_string()), + Event::Log(ref mut event) => { + event.insert(event_path!("sample_rate"), self.rate.to_string()) + } + Event::Trace(ref mut event) => { + event.insert(event_path!("sample_rate"), self.rate.to_string()) + } Event::Metric(_) => panic!("component can never receive metric events"), }; output.push(event);