Skip to content

Commit

Permalink
fix: remap behavior for root types when using the Vector namespace (v…
Browse files Browse the repository at this point in the history
…ectordotdev#17807)

This fixes an issue with `remap` when using the `Vector` namespace,
where assigning a non-collection value (such as a string) directly to
the root ended up with that value nested under the `message` field. Now
it stays on the root.
  • Loading branch information
fuchsnj committed Jun 30, 2023
1 parent 3f6df61 commit c19938c
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 214 deletions.
183 changes: 173 additions & 10 deletions lib/vector-core/src/event/vrl_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ use lookup::{OwnedTargetPath, OwnedValuePath, PathPrefix};
use snafu::Snafu;
use vrl::compiler::value::VrlValueConvert;
use vrl::compiler::{ProgramInfo, SecretTarget, Target};
use vrl::value::Value;
use vrl::prelude::Collection;
use vrl::value::{Kind, Value};

use super::{Event, EventMetadata, LogEvent, Metric, MetricKind, TraceEvent};
use crate::config::log_schema;
use crate::config::{log_schema, LogNamespace};
use crate::event::metric::TagValue;
use crate::schema::Definition;

const VALID_METRIC_PATHS_SET: &str = ".name, .namespace, .timestamp, .kind, .tags";

Expand Down Expand Up @@ -114,11 +116,24 @@ impl VrlTarget {
}
}

/// Modifies a schema in the same way that the `into_events` function modifies the event
pub fn modify_schema_definition_for_into_events(input: Definition) -> Definition {
let log_namespaces = input.log_namespaces().clone();

// both namespaces merge arrays, but only `Legacy` moves field definitions into a "message" field.
let merged_arrays = merge_array_definitions(input);
Definition::combine_log_namespaces(
&log_namespaces,
move_field_definitions_into_message(merged_arrays.clone()),
merged_arrays,
)
}

/// Turn the target back into events.
///
/// This returns an iterator of events as one event can be turned into multiple by assigning an
/// array to `.` in VRL.
pub fn into_events(self) -> TargetEvents {
pub fn into_events(self, log_namespace: LogNamespace) -> TargetEvents {
match self {
VrlTarget::LogEvent(value, metadata) => match value {
value @ Value::Object(_) => {
Expand All @@ -131,11 +146,16 @@ impl VrlTarget {
_marker: PhantomData,
}),

v => {
let mut log = LogEvent::new_with_metadata(metadata);
log.insert(log_schema().message_key(), v);
TargetEvents::One(log.into())
}
v => match log_namespace {
LogNamespace::Vector => {
TargetEvents::One(LogEvent::from_parts(v, metadata).into())
}
LogNamespace::Legacy => {
let mut log = LogEvent::new_with_metadata(metadata);
log.insert(log_schema().message_key(), v);
TargetEvents::One(log.into())
}
},
},
VrlTarget::Trace(value, metadata) => match value {
value @ Value::Object(_) => {
Expand Down Expand Up @@ -174,6 +194,53 @@ impl VrlTarget {
}
}

/// If the VRL returns a value that is not an array (see [`merge_array_definitions`]),
/// or an object, that data is moved into the `message` field.
fn move_field_definitions_into_message(mut definition: Definition) -> Definition {
let mut message = definition.event_kind().clone();
message.remove_object();
message.remove_array();

if !message.is_never() {
// We need to add the given message type to a field called `message`
// in the event.
let message = Kind::object(Collection::from(BTreeMap::from([(
log_schema().message_key().into(),
message,
)])));

definition.event_kind_mut().remove_bytes();
definition.event_kind_mut().remove_integer();
definition.event_kind_mut().remove_float();
definition.event_kind_mut().remove_boolean();
definition.event_kind_mut().remove_timestamp();
definition.event_kind_mut().remove_regex();
definition.event_kind_mut().remove_null();

*definition.event_kind_mut() = definition.event_kind().union(message);
}

definition
}

/// If the transform returns an array, the elements of this array will be separated
/// out into it's individual elements and passed downstream.
///
/// The potential types that the transform can output are any of the arrays
/// elements or any non-array elements that are within the definition. All these
/// definitions need to be merged together.
fn merge_array_definitions(mut definition: Definition) -> Definition {
if let Some(array) = definition.event_kind().as_array() {
let array_kinds = array.reduced_kind();

let kind = definition.event_kind_mut();
kind.remove_array();
*kind = kind.union(array_kinds);
}

definition
}

fn set_metric_tag_values(name: String, value: &Value, metric: &mut Metric, multi_value_tags: bool) {
if multi_value_tags {
let tag_values = value
Expand Down Expand Up @@ -589,11 +656,107 @@ mod test {
use lookup::owned_value_path;
use similar_asserts::assert_eq;
use vrl::btreemap;
use vrl::value::kind::Index;

use super::super::MetricValue;
use super::*;
use crate::metric_tags;

#[test]
fn test_field_definitions_in_message() {
let definition =
Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Legacy]);
assert_eq!(
Definition::new_with_default_metadata(
Kind::object(BTreeMap::from([("message".into(), Kind::bytes())])),
[LogNamespace::Legacy]
),
move_field_definitions_into_message(definition)
);

// Test when a message field already exists.
let definition = Definition::new_with_default_metadata(
Kind::object(BTreeMap::from([("message".into(), Kind::integer())])).or_bytes(),
[LogNamespace::Legacy],
);
assert_eq!(
Definition::new_with_default_metadata(
Kind::object(BTreeMap::from([(
"message".into(),
Kind::bytes().or_integer()
)])),
[LogNamespace::Legacy]
),
move_field_definitions_into_message(definition)
);
}

#[test]
fn test_merged_array_definitions_simple() {
// Test merging the array definitions where the schema definition
// is simple, containing only one possible type in the array.
let object: BTreeMap<vrl::value::kind::Field, Kind> = [
("carrot".into(), Kind::bytes()),
("potato".into(), Kind::integer()),
]
.into();

let kind = Kind::array(Collection::from_unknown(Kind::object(object)));

let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);

let kind = Kind::object(BTreeMap::from([
("carrot".into(), Kind::bytes()),
("potato".into(), Kind::integer()),
]));

let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
let merged = merge_array_definitions(definition);

assert_eq!(wanted, merged);
}

#[test]
fn test_merged_array_definitions_complex() {
// Test merging the array definitions where the schema definition
// is fairly complex containing multiple different possible types.
let object: BTreeMap<vrl::value::kind::Field, Kind> = [
("carrot".into(), Kind::bytes()),
("potato".into(), Kind::integer()),
]
.into();

let array: BTreeMap<Index, Kind> = [
(Index::from(0), Kind::integer()),
(Index::from(1), Kind::boolean()),
(
Index::from(2),
Kind::object(BTreeMap::from([("peas".into(), Kind::bytes())])),
),
]
.into();

let mut kind = Kind::bytes();
kind.add_object(object);
kind.add_array(array);

let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);

let mut kind = Kind::bytes();
kind.add_integer();
kind.add_boolean();
kind.add_object(BTreeMap::from([
("carrot".into(), Kind::bytes().or_undefined()),
("potato".into(), Kind::integer().or_undefined()),
("peas".into(), Kind::bytes().or_undefined()),
]));

let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
let merged = merge_array_definitions(definition);

assert_eq!(wanted, merged);
}

#[test]
fn log_get() {
let cases = vec![
Expand Down Expand Up @@ -755,7 +918,7 @@ mod test {
Ok(Some(value))
);
assert_eq!(
match target.into_events() {
match target.into_events(LogNamespace::Legacy) {
TargetEvents::One(event) => vec![event],
TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
Expand Down Expand Up @@ -901,7 +1064,7 @@ mod test {
Target::target_insert(&mut target, &OwnedTargetPath::event_root(), value).unwrap();

assert_eq!(
match target.into_events() {
match target.into_events(LogNamespace::Legacy) {
TargetEvents::One(event) => vec![event],
TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
Expand Down
19 changes: 19 additions & 0 deletions lib/vector-core/src/schema/definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,25 @@ impl Definition {
self
}

/// If the schema definition depends on the `LogNamespace`, this combines the individual
/// definitions for each `LogNamespace`.
pub fn combine_log_namespaces(
log_namespaces: &BTreeSet<LogNamespace>,
legacy: Self,
vector: Self,
) -> Self {
let mut combined =
Definition::new_with_default_metadata(Kind::never(), log_namespaces.clone());

if log_namespaces.contains(&LogNamespace::Legacy) {
combined = combined.merge(legacy);
}
if log_namespaces.contains(&LogNamespace::Vector) {
combined = combined.merge(vector);
}
combined
}

/// Returns an `OwnedTargetPath` into an event, based on the provided `meaning`, if the meaning exists.
pub fn meaning_path(&self, meaning: &str) -> Option<&OwnedTargetPath> {
match self.meaning.get(meaning) {
Expand Down
7 changes: 6 additions & 1 deletion src/conditions/vrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use vrl::compiler::{CompilationResult, CompileConfig, Program, TypeState, VrlRun
use vrl::diagnostic::Formatter;
use vrl::value::Value;

use crate::config::LogNamespace;
use crate::event::TargetEvents;
use crate::{
conditions::{Condition, Conditional, ConditionalConfig},
Expand Down Expand Up @@ -84,12 +85,16 @@ pub struct Vrl {

impl Vrl {
fn run(&self, event: Event) -> (Event, RuntimeResult) {
let log_namespace = event
.maybe_as_log()
.map(|log| log.namespace())
.unwrap_or(LogNamespace::Legacy);
let mut target = VrlTarget::new(event, self.program.info(), false);
// TODO: use timezone from remap config
let timezone = TimeZone::default();

let result = Runtime::default().resolve(&mut target, &self.program, &timezone);
let original_event = match target.into_events() {
let original_event = match target.into_events(log_namespace) {
TargetEvents::One(event) => event,
_ => panic!("Event was modified in a condition. This is an internal compiler error."),
};
Expand Down
Loading

0 comments on commit c19938c

Please sign in to comment.