Skip to content

Commit

Permalink
chore: replace path tuples with actual target paths (vectordotdev#18139)
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Aug 2, 2023
1 parent 5a6ce73 commit 8068f1d
Show file tree
Hide file tree
Showing 17 changed files with 137 additions and 409 deletions.
9 changes: 2 additions & 7 deletions lib/codecs/src/decoding/format/bytes.rs
Expand Up @@ -89,10 +89,8 @@ impl Deserializer for BytesDeserializer {

#[cfg(test)]
mod tests {
use vector_core::config::log_schema;
use vrl::value::Value;

use super::*;
use vrl::value::Value;

#[test]
fn deserialize_bytes_legacy_namespace() {
Expand All @@ -105,10 +103,7 @@ mod tests {
{
let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
"foo".into()
);
assert_eq!(*log.get_message().unwrap(), "foo".into());
}

assert_eq!(events.next(), None);
Expand Down
24 changes: 12 additions & 12 deletions lib/vector-core/src/event/log_event.rs
Expand Up @@ -162,8 +162,8 @@ impl LogEvent {
let mut log = LogEvent::default();
log.maybe_insert(log_schema().message_key_target_path(), msg.into());

if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
log.insert(timestamp_key, Utc::now());
}

log
Expand Down Expand Up @@ -495,8 +495,8 @@ impl LogEvent {
match self.namespace() {
LogNamespace::Vector => self.get_by_meaning("message"),
LogNamespace::Legacy => log_schema()
.message_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
.message_key_target_path()
.and_then(|key| self.get(key)),
}
}

Expand All @@ -506,8 +506,8 @@ impl LogEvent {
match self.namespace() {
LogNamespace::Vector => self.get_by_meaning("timestamp"),
LogNamespace::Legacy => log_schema()
.timestamp_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
.timestamp_key_target_path()
.and_then(|key| self.get(key)),
}
}

Expand All @@ -525,8 +525,8 @@ impl LogEvent {
match self.namespace() {
LogNamespace::Vector => self.get_by_meaning("host"),
LogNamespace::Legacy => log_schema()
.host_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
.host_key_target_path()
.and_then(|key| self.get(key)),
}
}

Expand All @@ -536,8 +536,8 @@ impl LogEvent {
match self.namespace() {
LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
LogNamespace::Legacy => log_schema()
.source_type_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
.source_type_key_target_path()
.and_then(|key| self.get(key)),
}
}
}
Expand Down Expand Up @@ -568,8 +568,8 @@ mod test_utils {
fn from(message: Bytes) -> Self {
let mut log = LogEvent::default();
log.maybe_insert(log_schema().message_key_target_path(), message);
if let Some(timestamp_key) = log_schema().timestamp_key() {
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
log.insert(timestamp_key, Utc::now());
}
log
}
Expand Down
10 changes: 2 additions & 8 deletions src/sinks/clickhouse/integration_tests.rs
Expand Up @@ -178,10 +178,7 @@ async fn insert_events_unix_timestamps() {
format!(
"{}",
exp_event
.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap()
))
.get_timestamp()
.unwrap()
.as_timestamp()
.unwrap()
Expand Down Expand Up @@ -242,10 +239,7 @@ timestamp_format = "unix""#,
format!(
"{}",
exp_event
.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap()
))
.get_timestamp()
.unwrap()
.as_timestamp()
.unwrap()
Expand Down
24 changes: 14 additions & 10 deletions src/source_sender/mod.rs
Expand Up @@ -23,7 +23,6 @@ mod errors;
use crate::config::{ComponentKey, OutputId};
use crate::schema::Definition;
pub use errors::{ClosedError, StreamSendError};
use lookup::PathPrefix;

pub(crate) const CHUNK_SIZE: usize = 1000;

Expand Down Expand Up @@ -356,18 +355,23 @@ impl Inner {
fn emit_lag_time(&self, event: EventRef<'_>, reference: i64) {
if let Some(lag_time_metric) = &self.lag_time {
let timestamp = match event {
EventRef::Log(log) => log_schema().timestamp_key().and_then(|timestamp_key| {
log.get((PathPrefix::Event, timestamp_key))
.and_then(get_timestamp_millis)
}),
EventRef::Log(log) => {
log_schema()
.timestamp_key_target_path()
.and_then(|timestamp_key| {
log.get(timestamp_key).and_then(get_timestamp_millis)
})
}
EventRef::Metric(metric) => metric
.timestamp()
.map(|timestamp| timestamp.timestamp_millis()),
EventRef::Trace(trace) => log_schema().timestamp_key().and_then(|timestamp_key| {
trace
.get((PathPrefix::Event, timestamp_key))
.and_then(get_timestamp_millis)
}),
EventRef::Trace(trace) => {
log_schema()
.timestamp_key_target_path()
.and_then(|timestamp_key| {
trace.get(timestamp_key).and_then(get_timestamp_millis)
})
}
};
if let Some(timestamp) = timestamp {
// This will truncate precision for values larger than 2**52, but at that point the user
Expand Down
10 changes: 2 additions & 8 deletions src/sources/amqp.rs
Expand Up @@ -711,15 +711,9 @@ mod integration_test {

let log = events[0].as_log();
trace!("{:?}", log);
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
"my message".into()
);
assert_eq!(*log.get_message().unwrap(), "my message".into());
assert_eq!(log["routing"], routing_key.into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"amqp".into()
);
assert_eq!(*log.get_source_type().unwrap(), "amqp".into());
let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
.as_timestamp()
.unwrap();
Expand Down
5 changes: 1 addition & 4 deletions src/sources/aws_sqs/source.rs
Expand Up @@ -320,10 +320,7 @@ mod tests {
events[0]
.clone()
.as_log()
.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap()
))
.get_timestamp()
.unwrap()
.to_string_lossy(),
now.to_rfc3339_opts(SecondsFormat::AutoSi, true)
Expand Down
42 changes: 9 additions & 33 deletions src/sources/datadog_agent/tests.rs
Expand Up @@ -30,7 +30,7 @@ use vrl::value::Kind;
use crate::schema::Definition;
use crate::{
common::datadog::{DatadogMetricType, DatadogPoint, DatadogSeriesMetric},
config::{log_schema, SourceConfig, SourceContext},
config::{SourceConfig, SourceContext},
event::{
into_event_stream,
metric::{MetricKind, MetricSketch, MetricValue},
Expand Down Expand Up @@ -238,10 +238,7 @@ async fn full_payload_v1() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -303,10 +300,7 @@ async fn full_payload_v2() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -368,10 +362,7 @@ async fn no_api_key() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -432,10 +423,7 @@ async fn api_key_in_url() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -500,10 +488,7 @@ async fn api_key_in_query_params() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -574,10 +559,7 @@ async fn api_key_in_header() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -724,10 +706,7 @@ async fn ignores_api_key() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(
event.metadata().schema_definition(),
Expand Down Expand Up @@ -1419,10 +1398,7 @@ async fn split_outputs() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(*log.get_source_type().unwrap(), "datadog_agent".into());
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down
20 changes: 4 additions & 16 deletions src/sources/docker_logs/tests.rs
Expand Up @@ -440,10 +440,7 @@ mod integration_tests {
.unwrap()
.assert_valid_for_event(&events[0]);
let log = events[0].as_log();
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
message.into()
);
assert_eq!(*log.get_message().unwrap(), message.into());
assert_eq!(log[CONTAINER], id.into());
assert!(log.get(CREATED_AT).is_some());
assert_eq!(log[IMAGE], "busybox".into());
Expand Down Expand Up @@ -640,10 +637,7 @@ mod integration_tests {
.unwrap()
.assert_valid_for_event(&events[0]);
let log = events[0].as_log();
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
message.into()
);
assert_eq!(*log.get_message().unwrap(), message.into());
assert_eq!(log[CONTAINER], id.into());
assert!(log.get(CREATED_AT).is_some());
assert_eq!(log[IMAGE], "busybox".into());
Expand Down Expand Up @@ -778,10 +772,7 @@ mod integration_tests {
.unwrap()
.assert_valid_for_event(&events[0]);
let log = events[0].as_log();
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
message.into()
);
assert_eq!(*log.get_message().unwrap(), message.into());
assert_eq!(log[CONTAINER], id.into());
assert!(log.get(CREATED_AT).is_some());
assert_eq!(log[IMAGE], "busybox".into());
Expand Down Expand Up @@ -830,10 +821,7 @@ mod integration_tests {
.unwrap()
.assert_valid_for_event(&events[0]);
let log = events[0].as_log();
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
message.into()
);
assert_eq!(*log.get_message().unwrap(), message.into());
})
.await;
}
Expand Down

0 comments on commit 8068f1d

Please sign in to comment.