Skip to content

Commit

Permalink
feat: Refactor TraceEvent insert to use TargetPath compatible types (v…
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Jul 27, 2023
1 parent 28f5c23 commit f015b29
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 56 deletions.
10 changes: 3 additions & 7 deletions lib/vector-core/src/event/trace.rs
Expand Up @@ -85,18 +85,14 @@ impl TraceEvent {
self.0.contains(key.as_ref())
}

// TODO This should eventually use TargetPath for the `key` parameter.
// https://github.com/vectordotdev/vector/issues/18059
pub fn insert(
pub fn insert<'a>(
&mut self,
key: impl AsRef<str>,
key: impl TargetPath<'a>,
value: impl Into<Value> + Debug,
) -> Option<Value> {
self.0.insert(key.as_ref(), value.into())
self.0.insert(key, value.into())
}

// TODO Audit code and use this if possible.
// https://github.com/vectordotdev/vector/issues/18059
pub fn maybe_insert<'a, F: FnOnce() -> Value>(
&mut self,
prefix: PathPrefix,
Expand Down
17 changes: 9 additions & 8 deletions src/sinks/datadog/traces/tests.rs
Expand Up @@ -9,6 +9,7 @@ use ordered_float::NotNan;
use prost::Message;
use rmp_serde;
use vector_core::event::{BatchNotifier, BatchStatus, Event};
use vrl::event_path;

use super::{apm_stats::StatsPayload, dd_proto, ddsketch_full, DatadogTracesConfig};

Expand Down Expand Up @@ -95,15 +96,15 @@ fn simple_span(resource: String) -> BTreeMap<String, Value> {

pub fn simple_trace_event(resource: String) -> TraceEvent {
let mut t = TraceEvent::default();
t.insert("language", "a_language");
t.insert("agent_version", "1.23456");
t.insert("host", "a_host");
t.insert("env", "an_env");
t.insert("trace_id", Value::Integer(123));
t.insert("target_tps", Value::Integer(10));
t.insert("error_tps", Value::Integer(5));
t.insert(event_path!("language"), "a_language");
t.insert(event_path!("agent_version"), "1.23456");
t.insert(event_path!("host"), "a_host");
t.insert(event_path!("env"), "an_env");
t.insert(event_path!("trace_id"), Value::Integer(123));
t.insert(event_path!("target_tps"), Value::Integer(10));
t.insert(event_path!("error_tps"), Value::Integer(5));
t.insert(
"spans",
event_path!("spans"),
Value::Array(vec![Value::from(simple_span(resource))]),
);
t
Expand Down
3 changes: 2 additions & 1 deletion src/source_sender/mod.rs
Expand Up @@ -391,6 +391,7 @@ mod tests {
use chrono::{DateTime, Duration};
use rand::{thread_rng, Rng};
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent};
use vrl::event_path;

use super::*;
use crate::metrics::{self, Controller};
Expand Down Expand Up @@ -424,7 +425,7 @@ mod tests {
async fn emits_lag_time_for_trace() {
emit_and_test(|timestamp| {
let mut trace = TraceEvent::default();
trace.insert("timestamp", timestamp);
trace.insert(event_path!("timestamp"), timestamp);
Event::Trace(trace)
})
.await;
Expand Down
15 changes: 9 additions & 6 deletions src/sources/datadog_agent/mod.rs
Expand Up @@ -34,6 +34,7 @@ use vector_common::internal_event::{EventsReceived, Registered};
use vector_config::configurable_component;
use vector_core::config::{LegacyKey, LogNamespace};
use vector_core::event::{BatchNotifier, BatchStatus};
use vrl::path::OwnedTargetPath;
use vrl::value::Kind;
use warp::{filters::BoxedFilter, reject::Rejection, reply::Response, Filter, Reply};

Expand Down Expand Up @@ -283,8 +284,8 @@ pub struct ApiKeyQueryParams {
#[derive(Clone)]
pub(crate) struct DatadogAgentSource {
pub(crate) api_key_extractor: ApiKeyExtractor,
pub(crate) log_schema_host_key: String,
pub(crate) log_schema_source_type_key: String,
pub(crate) log_schema_host_key: OwnedTargetPath,
pub(crate) log_schema_source_type_key: OwnedTargetPath,
pub(crate) log_namespace: LogNamespace,
pub(crate) decoder: Decoder,
protocol: &'static str,
Expand Down Expand Up @@ -334,11 +335,13 @@ impl DatadogAgentSource {
.expect("static regex always compiles"),
},
log_schema_host_key: log_schema()
.host_key()
.map_or("".to_string(), |key| key.to_string()),
.host_key_target_path()
.expect("global log_schema.host_key to be valid path")
.clone(),
log_schema_source_type_key: log_schema()
.source_type_key()
.map_or("".to_string(), |key| key.to_string()),
.source_type_key_target_path()
.expect("global log_schema.source_type_key to be valid path")
.clone(),
decoder,
protocol,
logs_schema_definition: Arc::new(logs_schema_definition),
Expand Down
55 changes: 31 additions & 24 deletions src/sources/datadog_agent/traces.rs
Expand Up @@ -8,6 +8,7 @@ use ordered_float::NotNan;
use prost::Message;
use vector_common::internal_event::{CountByteSize, InternalEventHandle as _};
use vector_core::EstimatedJsonEncodedSizeOf;
use vrl::event_path;
use warp::{filters::BoxedFilter, path, path::FullPath, reply::Response, Filter, Rejection, Reply};

use crate::{
Expand Down Expand Up @@ -142,11 +143,11 @@ fn handle_dd_trace_payload_v1(
.set_datadog_api_key(Arc::clone(k));
}
trace_event.insert(
source.log_schema_source_type_key.as_str(),
&source.log_schema_source_type_key,
Bytes::from("datadog_agent"),
);
trace_event.insert("payload_version", "v2".to_string());
trace_event.insert(source.log_schema_host_key.as_str(), hostname.clone());
trace_event.insert(&source.log_schema_host_key, hostname.clone());
trace_event.insert("env", env.clone());
trace_event.insert("agent_version", agent_version.clone());
trace_event.insert("target_tps", target_tps);
Expand All @@ -169,28 +170,34 @@ fn convert_dd_tracer_payload(payload: ddtrace_proto::TracerPayload) -> Vec<Trace
.into_iter()
.map(|trace| {
let mut trace_event = TraceEvent::default();
trace_event.insert("priority", trace.priority as i64);
trace_event.insert("origin", trace.origin);
trace_event.insert("dropped", trace.dropped_trace);
trace_event.insert(event_path!("priority"), trace.priority as i64);
trace_event.insert(event_path!("origin"), trace.origin);
trace_event.insert(event_path!("dropped"), trace.dropped_trace);
let mut trace_tags = convert_tags(trace.tags);
trace_tags.extend(tags.clone());
trace_event.insert("tags", Value::from(trace_tags));
trace_event.insert(event_path!("tags"), Value::from(trace_tags));

trace_event.insert(
"spans",
event_path!("spans"),
trace
.spans
.into_iter()
.map(|s| Value::from(convert_span(s)))
.collect::<Vec<Value>>(),
);

trace_event.insert("container_id", payload.container_id.clone());
trace_event.insert("language_name", payload.language_name.clone());
trace_event.insert("language_version", payload.language_version.clone());
trace_event.insert("tracer_version", payload.tracer_version.clone());
trace_event.insert("runtime_id", payload.runtime_id.clone());
trace_event.insert("app_version", payload.app_version.clone());
trace_event.insert(event_path!("container_id"), payload.container_id.clone());
trace_event.insert(event_path!("language_name"), payload.language_name.clone());
trace_event.insert(
event_path!("language_version"),
payload.language_version.clone(),
);
trace_event.insert(
event_path!("tracer_version"),
payload.tracer_version.clone(),
);
trace_event.insert(event_path!("runtime_id"), payload.runtime_id.clone());
trace_event.insert(event_path!("app_version"), payload.app_version.clone());
trace_event
})
.collect()
Expand All @@ -217,11 +224,11 @@ fn handle_dd_trace_payload_v0(
// TODO trace_id is being forced into an i64 but
// the incoming payload is u64. This is a bug and needs to be fixed per:
// https://github.com/vectordotdev/vector/issues/14687
trace_event.insert("trace_id", dd_trace.trace_id as i64);
trace_event.insert("start_time", Utc.timestamp_nanos(dd_trace.start_time));
trace_event.insert("end_time", Utc.timestamp_nanos(dd_trace.end_time));
trace_event.insert(event_path!("trace_id"), dd_trace.trace_id as i64);
trace_event.insert(event_path!("start_time"), Utc.timestamp_nanos(dd_trace.start_time));
trace_event.insert(event_path!("end_time"), Utc.timestamp_nanos(dd_trace.end_time));
trace_event.insert(
"spans",
event_path!("spans"),
dd_trace
.spans
.into_iter()
Expand All @@ -233,8 +240,8 @@ fn handle_dd_trace_payload_v0(
//... and each APM event is also mapped into its own event
.chain(decoded_payload.transactions.into_iter().map(|s| {
let mut trace_event = TraceEvent::default();
trace_event.insert("spans", vec![Value::from(convert_span(s))]);
trace_event.insert("dropped", true);
trace_event.insert(event_path!("spans"), vec![Value::from(convert_span(s))]);
trace_event.insert(event_path!("dropped"), true);
trace_event
})).collect();

Expand All @@ -252,15 +259,15 @@ fn handle_dd_trace_payload_v0(
.set_datadog_api_key(Arc::clone(k));
}
if let Some(lang) = lang {
trace_event.insert("language_name", lang.clone());
trace_event.insert(event_path!("language_name"), lang.clone());
}
trace_event.insert(
source.log_schema_source_type_key.as_str(),
&source.log_schema_source_type_key,
Bytes::from("datadog_agent"),
);
trace_event.insert("payload_version", "v1".to_string());
trace_event.insert(source.log_schema_host_key.as_str(), hostname.clone());
trace_event.insert("env", env.clone());
trace_event.insert(event_path!("payload_version"), "v1".to_string());
trace_event.insert(&source.log_schema_host_key, hostname.clone());
trace_event.insert(event_path!("env"), env.clone());
Event::Trace(trace_event)
})
.collect();
Expand Down
7 changes: 2 additions & 5 deletions src/sources/http_client/client.rs
Expand Up @@ -350,11 +350,8 @@ impl http_client::HttpClientContext for HttpClientContext {
}
}
Event::Trace(ref mut trace) => {
if let Some(source_type_key) = log_schema().source_type_key() {
trace.insert(
source_type_key.to_string(),
Bytes::from(HttpClientConfig::NAME),
);
if let Some(source_type_key) = log_schema().source_type_key_target_path() {
trace.insert(source_type_key, Bytes::from(HttpClientConfig::NAME));
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/test_util/mock/transforms/basic.rs
Expand Up @@ -118,14 +118,14 @@ impl FunctionTransform for BasicTransform {
}
}
Event::Trace(trace) => {
if let Some(message_key) = crate::config::log_schema().message_key() {
if let Some(message_key) = crate::config::log_schema().message_key_target_path() {
let mut v = trace
.get((PathPrefix::Event, message_key))
.get(message_key)
.unwrap()
.to_string_lossy()
.into_owned();
v.push_str(&self.suffix);
trace.insert(message_key.to_string().as_str(), Value::from(v));
trace.insert(message_key, Value::from(v));
}
}
};
Expand Down
9 changes: 7 additions & 2 deletions src/transforms/sample.rs
@@ -1,5 +1,6 @@
use vector_config::configurable_component;
use vector_core::config::LogNamespace;
use vrl::event_path;

use crate::{
conditions::{AnyCondition, Condition},
Expand Down Expand Up @@ -145,8 +146,12 @@ impl FunctionTransform for Sample {

if num % self.rate == 0 {
match event {
Event::Log(ref mut event) => event.insert("sample_rate", self.rate.to_string()),
Event::Trace(ref mut event) => event.insert("sample_rate", self.rate.to_string()),
Event::Log(ref mut event) => {
event.insert(event_path!("sample_rate"), self.rate.to_string())
}
Event::Trace(ref mut event) => {
event.insert(event_path!("sample_rate"), self.rate.to_string())
}
Event::Metric(_) => panic!("component can never receive metric events"),
};
output.push(event);
Expand Down

0 comments on commit f015b29

Please sign in to comment.