Skip to content

Commit

Permalink
chore: replace various string paths with actual paths (vectordotdev#1…
Browse files Browse the repository at this point in the history
…8109)

* chore: string paths should not be used outside test code

* format fixes

* Update benches/event.rs

Co-authored-by: Stephen Wakely <fungus.humungus@gmail.com>

* Update benches/event.rs

Co-authored-by: Stephen Wakely <fungus.humungus@gmail.com>

* parse dynamic strings before inserting to the event

* use event_path! for tracing fields

* Update benches/event.rs

Co-authored-by: Nathan Fox <fuchsnj@gmail.com>

* rebase on master and add comment

---------

Co-authored-by: Stephen Wakely <fungus.humungus@gmail.com>
Co-authored-by: Nathan Fox <fuchsnj@gmail.com>
  • Loading branch information
3 people committed Aug 8, 2023
1 parent 4a049d4 commit d8eefe3
Show file tree
Hide file tree
Showing 24 changed files with 206 additions and 149 deletions.
45 changes: 29 additions & 16 deletions benches/event.rs
@@ -1,6 +1,7 @@
use bytes::Bytes;
use criterion::{criterion_group, BatchSize, Criterion};
use vector::event::LogEvent;
use vrl::event_path;

fn benchmark_event_iterate(c: &mut Criterion) {
let mut group = c.benchmark_group("event/iterate");
Expand All @@ -9,9 +10,9 @@ fn benchmark_event_iterate(c: &mut Criterion) {
b.iter_batched_ref(
|| {
let mut log = LogEvent::default();
log.insert("key1", Bytes::from("value1"));
log.insert("key2", Bytes::from("value2"));
log.insert("key3", Bytes::from("value3"));
log.insert(event_path!("key1"), Bytes::from("value1"));
log.insert(event_path!("key2"), Bytes::from("value2"));
log.insert(event_path!("key3"), Bytes::from("value3"));
log
},
|e| e.all_fields().unwrap().count(),
Expand All @@ -23,9 +24,15 @@ fn benchmark_event_iterate(c: &mut Criterion) {
b.iter_batched_ref(
|| {
let mut log = LogEvent::default();
log.insert("key1.nested1.nested2", Bytes::from("value1"));
log.insert("key1.nested1.nested3", Bytes::from("value4"));
log.insert("key3", Bytes::from("value3"));
log.insert(
event_path!("key1", "nested1", "nested2"),
Bytes::from("value1"),
);
log.insert(
event_path!("key1", "nested1", "nested3"),
Bytes::from("value4"),
);
log.insert(event_path!("key3"), Bytes::from("value3"));
log
},
|e| e.all_fields().unwrap().count(),
Expand All @@ -37,8 +44,8 @@ fn benchmark_event_iterate(c: &mut Criterion) {
b.iter_batched_ref(
|| {
let mut log = LogEvent::default();
log.insert("key1.nested1[0]", Bytes::from("value1"));
log.insert("key1.nested1[1]", Bytes::from("value2"));
log.insert(event_path!("key1", "nested1", 0), Bytes::from("value1"));
log.insert(event_path!("key1", "nested1", 1), Bytes::from("value2"));
log
},
|e| e.all_fields().unwrap().count(),
Expand All @@ -53,25 +60,31 @@ fn benchmark_event_create(c: &mut Criterion) {
group.bench_function("single-level", |b| {
b.iter(|| {
let mut log = LogEvent::default();
log.insert("key1", Bytes::from("value1"));
log.insert("key2", Bytes::from("value2"));
log.insert("key3", Bytes::from("value3"));
log.insert(event_path!("key1"), Bytes::from("value1"));
log.insert(event_path!("key2"), Bytes::from("value2"));
log.insert(event_path!("key3"), Bytes::from("value3"));
})
});

group.bench_function("nested-keys", |b| {
b.iter(|| {
let mut log = LogEvent::default();
log.insert("key1.nested1.nested2", Bytes::from("value1"));
log.insert("key1.nested1.nested3", Bytes::from("value4"));
log.insert("key3", Bytes::from("value3"));
log.insert(
event_path!("key1", "nested1", "nested2"),
Bytes::from("value1"),
);
log.insert(
event_path!("key1", "nested1", "nested3"),
Bytes::from("value4"),
);
log.insert(event_path!("key3"), Bytes::from("value3"));
})
});
group.bench_function("array", |b| {
b.iter(|| {
let mut log = LogEvent::default();
log.insert("key1.nested1[0]", Bytes::from("value1"));
log.insert("key1.nested1[1]", Bytes::from("value2"));
log.insert(event_path!("key1", "nested1", 0), Bytes::from("value1"));
log.insert(event_path!("key1", "nested1", 1), Bytes::from("value2"));
})
});
}
Expand Down
3 changes: 2 additions & 1 deletion benches/lua.rs
Expand Up @@ -9,6 +9,7 @@ use vector::{
test_util::collect_ready,
transforms::{self, OutputBuffer, Transform},
};
use vrl::event_path;

fn bench_add_fields(c: &mut Criterion) {
let event = Event::from(LogEvent::default());
Expand Down Expand Up @@ -87,7 +88,7 @@ fn bench_field_filter(c: &mut Criterion) {
let events = (0..num_events)
.map(|i| {
let mut event = LogEvent::default();
event.insert("the_field", (i % 10).to_string());
event.insert(event_path!("the_field"), (i % 10).to_string());
Event::from(event)
})
.collect::<Vec<_>>();
Expand Down
38 changes: 28 additions & 10 deletions benches/remap.rs
Expand Up @@ -12,6 +12,7 @@ use vector::{
},
};
use vector_common::TimeZone;
use vrl::event_path;
use vrl::prelude::*;

criterion_group!(
Expand All @@ -35,9 +36,18 @@ fn benchmark_remap(c: &mut Criterion) {
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();

debug_assert_eq!(output_1.get("foo").unwrap().to_string_lossy(), "bar");
debug_assert_eq!(output_1.get("bar").unwrap().to_string_lossy(), "baz");
debug_assert_eq!(output_1.get("copy").unwrap().to_string_lossy(), "buz");
debug_assert_eq!(
output_1.get(event_path!("foo")).unwrap().to_string_lossy(),
"bar"
);
debug_assert_eq!(
output_1.get(event_path!("bar")).unwrap().to_string_lossy(),
"baz"
);
debug_assert_eq!(
output_1.get(event_path!("copy")).unwrap().to_string_lossy(),
"buz"
);

result
};
Expand Down Expand Up @@ -67,7 +77,9 @@ fn benchmark_remap(c: &mut Criterion) {

let event = {
let mut event = Event::Log(LogEvent::from("augment me"));
event.as_mut_log().insert("copy_from", "buz".to_owned());
event
.as_mut_log()
.insert(event_path!("copy_from"), "buz".to_owned());
event
};

Expand All @@ -88,11 +100,11 @@ fn benchmark_remap(c: &mut Criterion) {
let output_1 = result.first().unwrap().as_log();

debug_assert_eq!(
output_1.get("foo").unwrap().to_string_lossy(),
output_1.get(event_path!("foo")).unwrap().to_string_lossy(),
r#"{"key": "value"}"#
);
debug_assert_eq!(
output_1.get("bar").unwrap().to_string_lossy(),
output_1.get(event_path!("bar")).unwrap().to_string_lossy(),
r#"{"key":"value"}"#
);

Expand Down Expand Up @@ -141,10 +153,16 @@ fn benchmark_remap(c: &mut Criterion) {
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();

debug_assert_eq!(output_1.get("number").unwrap(), &Value::Integer(1234));
debug_assert_eq!(output_1.get("bool").unwrap(), &Value::Boolean(true));
debug_assert_eq!(
output_1.get("timestamp").unwrap(),
output_1.get(event_path!("number")).unwrap(),
&Value::Integer(1234)
);
debug_assert_eq!(
output_1.get(event_path!("bool")).unwrap(),
&Value::Boolean(true)
);
debug_assert_eq!(
output_1.get(event_path!("timestamp")).unwrap(),
&Value::Timestamp(timestamp),
);

Expand Down Expand Up @@ -176,7 +194,7 @@ fn benchmark_remap(c: &mut Criterion) {
("bool", "yes"),
("timestamp", "19/06/2019:17:20:49 -0400"),
] {
event.as_mut_log().insert(key, value.to_owned());
event.as_mut_log().insert(event_path!(key), value.to_owned());
}

let timestamp =
Expand Down
15 changes: 8 additions & 7 deletions lib/codecs/src/decoding/format/gelf.rs
Expand Up @@ -17,6 +17,7 @@ use vrl::value::kind::Collection;
use vrl::value::{Kind, Value};

use super::{default_lossy, Deserializer};
use crate::gelf::GELF_TARGET_PATHS;
use crate::{gelf_fields::*, VALID_FIELD_REGEX};

/// On GELF decoding behavior:
Expand Down Expand Up @@ -123,11 +124,11 @@ impl GelfDeserializer {
.into());
}

log.insert(VERSION, parsed.version.to_string());
log.insert(HOST, parsed.host.to_string());
log.insert(&GELF_TARGET_PATHS.version, parsed.version.to_string());
log.insert(&GELF_TARGET_PATHS.host, parsed.host.to_string());

if let Some(full_message) = &parsed.full_message {
log.insert(FULL_MESSAGE, full_message.to_string());
log.insert(&GELF_TARGET_PATHS.full_message, full_message.to_string());
}

if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
Expand All @@ -145,19 +146,19 @@ impl GelfDeserializer {
}

if let Some(level) = parsed.level {
log.insert(LEVEL, level);
log.insert(&GELF_TARGET_PATHS.level, level);
}
if let Some(facility) = &parsed.facility {
log.insert(FACILITY, facility.to_string());
log.insert(&GELF_TARGET_PATHS.facility, facility.to_string());
}
if let Some(line) = parsed.line {
log.insert(
LINE,
&GELF_TARGET_PATHS.line,
Value::Float(ordered_float::NotNan::new(line).expect("JSON doesn't allow NaNs")),
);
}
if let Some(file) = &parsed.file {
log.insert(FILE, file.to_string());
log.insert(&GELF_TARGET_PATHS.file, file.to_string());
}

if let Some(add) = &parsed.additional_fields {
Expand Down
18 changes: 8 additions & 10 deletions lib/codecs/src/encoding/format/gelf.rs
@@ -1,3 +1,4 @@
use crate::gelf::GELF_TARGET_PATHS;
use crate::{gelf_fields::*, VALID_FIELD_REGEX};
use bytes::{BufMut, BytesMut};
use lookup::event_path;
Expand All @@ -12,7 +13,6 @@ use vector_core::{
event::Value,
schema,
};
use vrl::path::PathPrefix;

/// On GELF encoding behavior:
/// Graylog has a relaxed parsing. They are much more lenient than the spec would
Expand Down Expand Up @@ -131,20 +131,18 @@ fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent>
}

// add the VERSION if it does not exist
if !log.contains(VERSION) {
log.insert(VERSION, GELF_VERSION);
if !log.contains(&GELF_TARGET_PATHS.version) {
log.insert(&GELF_TARGET_PATHS.version, GELF_VERSION);
}

if !log.contains(HOST) {
if !log.contains(&GELF_TARGET_PATHS.host) {
err_missing_field(HOST)?;
}

if !log.contains(SHORT_MESSAGE) {
if let Some(message_key) = log_schema().message_key() {
// rename the log_schema().message_key() to SHORT_MESSAGE
let target_path = (PathPrefix::Event, message_key);
if log.contains(target_path) {
log.rename_key(target_path, SHORT_MESSAGE);
if !log.contains(&GELF_TARGET_PATHS.short_message) {
if let Some(message_key) = log_schema().message_key_target_path() {
if log.contains(message_key) {
log.rename_key(message_key, &GELF_TARGET_PATHS.short_message);
} else {
err_missing_field(SHORT_MESSAGE)?;
}
Expand Down
27 changes: 26 additions & 1 deletion lib/codecs/src/gelf.rs
Expand Up @@ -2,10 +2,11 @@

use once_cell::sync::Lazy;
use regex::Regex;
use vrl::owned_value_path;
use vrl::path::OwnedTargetPath;

/// GELF Message fields. Definitions from <https://docs.graylog.org/docs/gelf>.
pub mod gelf_fields {

/// (not a field) The latest version of the GELF specification.
pub const GELF_VERSION: &str = "1.1";

Expand Down Expand Up @@ -40,6 +41,30 @@ pub mod gelf_fields {
// < Every field with an underscore (_) prefix will be treated as an additional field. >
}

/// GELF owned target paths.
pub(crate) struct GelfTargetPaths {
pub version: OwnedTargetPath,
pub host: OwnedTargetPath,
pub full_message: OwnedTargetPath,
pub level: OwnedTargetPath,
pub facility: OwnedTargetPath,
pub line: OwnedTargetPath,
pub file: OwnedTargetPath,
pub short_message: OwnedTargetPath,
}

/// Lazily initialized singleton.
pub(crate) static GELF_TARGET_PATHS: Lazy<GelfTargetPaths> = Lazy::new(|| GelfTargetPaths {
version: OwnedTargetPath::event(owned_value_path!(gelf_fields::VERSION)),
host: OwnedTargetPath::event(owned_value_path!(gelf_fields::HOST)),
full_message: OwnedTargetPath::event(owned_value_path!(gelf_fields::FULL_MESSAGE)),
level: OwnedTargetPath::event(owned_value_path!(gelf_fields::LEVEL)),
facility: OwnedTargetPath::event(owned_value_path!(gelf_fields::FACILITY)),
line: OwnedTargetPath::event(owned_value_path!(gelf_fields::LINE)),
file: OwnedTargetPath::event(owned_value_path!(gelf_fields::FILE)),
short_message: OwnedTargetPath::event(owned_value_path!(gelf_fields::SHORT_MESSAGE)),
});

/// Regex for matching valid field names. Must contain only word chars, periods and dashes.
/// Additional field names must also be prefixed with an `_` , however that is intentionally
/// omitted from this regex to be checked separately to create a specific error message.
Expand Down
24 changes: 10 additions & 14 deletions lib/vector-core/benches/event/log_event.rs
Expand Up @@ -6,20 +6,22 @@ use criterion::{
use lookup::event_path;
use vector_core::event::LogEvent;

fn default_log_event() -> LogEvent {
let mut log_event = LogEvent::default();
log_event.insert(event_path!("one"), 1);
log_event.insert(event_path!("two"), 2);
log_event.insert(event_path!("three"), 3);
log_event
}

fn rename_key_flat(c: &mut Criterion) {
let mut group: BenchmarkGroup<WallTime> =
c.benchmark_group("vector_core::event::log_event::LogEvent::rename_key_flat");
group.sampling_mode(SamplingMode::Auto);

group.bench_function("rename_flat_key (key is present)", move |b| {
b.iter_batched(
|| {
let mut log_event = LogEvent::default();
log_event.insert("one", 1);
log_event.insert("two", 2);
log_event.insert("three", 3);
log_event
},
default_log_event,
|mut log_event| {
log_event.rename_key(event_path!("one"), event_path!("1"));
},
Expand All @@ -29,13 +31,7 @@ fn rename_key_flat(c: &mut Criterion) {

group.bench_function("rename_flat_key (key is NOT present)", move |b| {
b.iter_batched(
|| {
let mut log_event = LogEvent::default();
log_event.insert("one", 1);
log_event.insert("two", 2);
log_event.insert("three", 3);
log_event
},
default_log_event,
|mut log_event| {
log_event.rename_key(event_path!("four"), event_path!("4"));
},
Expand Down

0 comments on commit d8eefe3

Please sign in to comment.