Skip to content

Commit

Permalink
feat(codecs): Add lossy option to JSON deserializer (vectordotdev#17628)
Browse files Browse the repository at this point in the history
Closes: vectordotdev#16406.

Adds a `decoding.json.lossy` option.

<!--
**Your PR title must conform to the conventional commit spec!**

  <type>(<scope>)!: <description>

  * `type` = chore, enhancement, feat, fix, docs
  * `!` = OPTIONAL: signals a breaking change
* `scope` = Optional when `type` is "chore" or "docs", available scopes
https://github.com/vectordotdev/vector/blob/master/.github/semantic.yml#L20
  * `description` = short description of the change

Examples:

  * enhancement(file source): Add `sort` option to sort discovered files
  * feat(new source): Initial `statsd` source
  * fix(file source): Fix a bug discovering new files
  * chore(external docs): Clarify `batch_size` option
-->
  • Loading branch information
dsmith3197 committed Jun 9, 2023
1 parent 45a28f8 commit bf7d796
Show file tree
Hide file tree
Showing 28 changed files with 1,131 additions and 686 deletions.
107 changes: 92 additions & 15 deletions lib/codecs/src/decoding/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use std::convert::TryInto;

use bytes::Bytes;
use chrono::Utc;
use derivative::Derivative;
use lookup::PathPrefix;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use vector_config::configurable_component;
use vector_core::{
config::{log_schema, DataType, LogNamespace},
event::Event,
Expand All @@ -16,7 +18,36 @@ use super::Deserializer;

/// Config used to build a `JsonDeserializer`.
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct JsonDeserializerConfig;
pub struct JsonDeserializerConfig {
#[serde(
default,
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
/// Options for the JSON deserializer.
pub json: JsonDeserializerOptions,
}

/// JSON-specific decoding options.
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct JsonDeserializerOptions {
/// Determines whether or not to replace invalid UTF-8 sequences instead of returning an error.
///
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
///
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
#[serde(
default = "default_lossy",
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
#[derivative(Default(value = "default_lossy()"))]
lossy: bool,
}

const fn default_lossy() -> bool {
true
}

impl JsonDeserializerConfig {
/// Build the `JsonDeserializer` from this configuration.
Expand Down Expand Up @@ -56,19 +87,23 @@ impl JsonDeserializerConfig {

impl JsonDeserializerConfig {
/// Creates a new `JsonDeserializerConfig`.
pub fn new() -> Self {
Default::default()
pub fn new(options: JsonDeserializerOptions) -> Self {
Self { json: options }
}
}

/// Deserializer that builds `Event`s from a byte frame containing JSON.
#[derive(Debug, Clone, Default)]
pub struct JsonDeserializer;
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct JsonDeserializer {
#[derivative(Default(value = "default_lossy()"))]
lossy: bool,
}

impl JsonDeserializer {
/// Creates a new `JsonDeserializer`.
pub fn new() -> Self {
Default::default()
pub fn new(lossy: bool) -> Self {
Self { lossy }
}
}

Expand All @@ -84,8 +119,11 @@ impl Deserializer for JsonDeserializer {
return Ok(smallvec![]);
}

let json: serde_json::Value = serde_json::from_slice(&bytes)
.map_err(|error| format!("Error parsing JSON: {:?}", error))?;
let json: serde_json::Value = match self.lossy {
true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
false => serde_json::from_slice(&bytes),
}
.map_err(|error| format!("Error parsing JSON: {:?}", error))?;

// If the root is an Array, split it into multiple events
let mut events = match json {
Expand Down Expand Up @@ -119,8 +157,10 @@ impl Deserializer for JsonDeserializer {
}

impl From<&JsonDeserializerConfig> for JsonDeserializer {
fn from(_: &JsonDeserializerConfig) -> Self {
Self
fn from(config: &JsonDeserializerConfig) -> Self {
Self {
lossy: config.json.lossy,
}
}
}

Expand All @@ -133,7 +173,7 @@ mod tests {
#[test]
fn deserialize_json() {
let input = Bytes::from(r#"{ "foo": 123 }"#);
let deserializer = JsonDeserializer::new();
let deserializer = JsonDeserializer::default();

for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
let events = deserializer.parse(input.clone(), namespace).unwrap();
Expand All @@ -160,7 +200,7 @@ mod tests {
#[test]
fn deserialize_json_array() {
let input = Bytes::from(r#"[{ "foo": 123 }, { "bar": 456 }]"#);
let deserializer = JsonDeserializer::new();
let deserializer = JsonDeserializer::default();
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
let events = deserializer.parse(input.clone(), namespace).unwrap();
let mut events = events.into_iter();
Expand Down Expand Up @@ -197,7 +237,7 @@ mod tests {
#[test]
fn deserialize_skip_empty() {
let input = Bytes::from("");
let deserializer = JsonDeserializer::new();
let deserializer = JsonDeserializer::default();

for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
let events = deserializer.parse(input.clone(), namespace).unwrap();
Expand All @@ -208,7 +248,44 @@ mod tests {
#[test]
fn deserialize_error_invalid_json() {
let input = Bytes::from("{ foo");
let deserializer = JsonDeserializer::new();
let deserializer = JsonDeserializer::default();

for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
assert!(deserializer.parse(input.clone(), namespace).is_err());
}
}

#[test]
fn deserialize_lossy_replace_invalid_utf8() {
let input = Bytes::from(b"{ \"foo\": \"Hello \xF0\x90\x80World\" }".as_slice());
let deserializer = JsonDeserializer::new(true);

for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
let events = deserializer.parse(input.clone(), namespace).unwrap();
let mut events = events.into_iter();

{
let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(log["foo"], b"Hello \xEF\xBF\xBDWorld".into());
assert_eq!(
log.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap()
))
.is_some(),
namespace == LogNamespace::Legacy
);
}

assert_eq!(events.next(), None);
}
}

#[test]
fn deserialize_non_lossy_error_invalid_utf8() {
let input = Bytes::from(b"{ \"foo\": \"Hello \xF0\x90\x80World\" }".as_slice());
let deserializer = JsonDeserializer::new(false);

for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
assert!(deserializer.parse(input.clone(), namespace).is_err());
Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod syslog;
use ::bytes::Bytes;
use dyn_clone::DynClone;
pub use gelf::{GelfDeserializer, GelfDeserializerConfig};
pub use json::{JsonDeserializer, JsonDeserializerConfig};
pub use json::{JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions};
pub use native::{NativeDeserializer, NativeDeserializerConfig};
pub use native_json::{NativeJsonDeserializer, NativeJsonDeserializerConfig};
use smallvec::SmallVec;
Expand Down
38 changes: 26 additions & 12 deletions lib/codecs/src/decoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use bytes::{Bytes, BytesMut};
pub use error::StreamDecodingError;
pub use format::{
BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
GelfDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, NativeDeserializer,
NativeDeserializerConfig, NativeJsonDeserializer, NativeJsonDeserializerConfig,
GelfDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
NativeJsonDeserializerConfig,
};
#[cfg(feature = "syslog")]
pub use format::{SyslogDeserializer, SyslogDeserializerConfig};
Expand Down Expand Up @@ -243,7 +244,14 @@ pub enum DeserializerConfig {
/// Decodes the raw bytes as [JSON][json].
///
/// [json]: https://www.json.org/
Json,
Json {
/// Options for the JSON deserializer.
#[serde(
default,
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
json: JsonDeserializerOptions,
},

#[cfg(feature = "syslog")]
/// Decodes the raw bytes as a Syslog message.
Expand Down Expand Up @@ -284,8 +292,8 @@ impl From<BytesDeserializerConfig> for DeserializerConfig {
}

impl From<JsonDeserializerConfig> for DeserializerConfig {
fn from(_: JsonDeserializerConfig) -> Self {
Self::Json
fn from(config: JsonDeserializerConfig) -> Self {
Self::Json { json: config.json }
}
}

Expand All @@ -307,7 +315,9 @@ impl DeserializerConfig {
pub fn build(&self) -> Deserializer {
match self {
DeserializerConfig::Bytes => Deserializer::Bytes(BytesDeserializerConfig.build()),
DeserializerConfig::Json => Deserializer::Json(JsonDeserializerConfig.build()),
DeserializerConfig::Json { json } => {
Deserializer::Json(JsonDeserializerConfig::new(json.clone()).build())
}
#[cfg(feature = "syslog")]
DeserializerConfig::Syslog => {
Deserializer::Syslog(SyslogDeserializerConfig::default().build())
Expand All @@ -325,7 +335,7 @@ impl DeserializerConfig {
match self {
DeserializerConfig::Native => FramingConfig::LengthDelimited,
DeserializerConfig::Bytes
| DeserializerConfig::Json
| DeserializerConfig::Json { .. }
| DeserializerConfig::Gelf
| DeserializerConfig::NativeJson => FramingConfig::NewlineDelimited {
newline_delimited: Default::default(),
Expand All @@ -341,7 +351,9 @@ impl DeserializerConfig {
pub fn output_type(&self) -> DataType {
match self {
DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
DeserializerConfig::Json => JsonDeserializerConfig.output_type(),
DeserializerConfig::Json { json } => {
JsonDeserializerConfig::new(json.clone()).output_type()
}
#[cfg(feature = "syslog")]
DeserializerConfig::Syslog => SyslogDeserializerConfig::default().output_type(),
DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
Expand All @@ -354,7 +366,9 @@ impl DeserializerConfig {
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
match self {
DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
DeserializerConfig::Json => JsonDeserializerConfig.schema_definition(log_namespace),
DeserializerConfig::Json { json } => {
JsonDeserializerConfig::new(json.clone()).schema_definition(log_namespace)
}
#[cfg(feature = "syslog")]
DeserializerConfig::Syslog => {
SyslogDeserializerConfig::default().schema_definition(log_namespace)
Expand All @@ -371,12 +385,12 @@ impl DeserializerConfig {
pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
match (&self, framer) {
(
DeserializerConfig::Json | DeserializerConfig::NativeJson,
DeserializerConfig::Json { .. } | DeserializerConfig::NativeJson,
FramingConfig::NewlineDelimited { .. },
) => "application/x-ndjson",
(
DeserializerConfig::Gelf
| DeserializerConfig::Json
| DeserializerConfig::Json { .. }
| DeserializerConfig::NativeJson,
FramingConfig::CharacterDelimited {
character_delimited:
Expand All @@ -388,7 +402,7 @@ impl DeserializerConfig {
) => "application/json",
(DeserializerConfig::Native, _) => "application/octet-stream",
(
DeserializerConfig::Json
DeserializerConfig::Json { .. }
| DeserializerConfig::NativeJson
| DeserializerConfig::Bytes
| DeserializerConfig::Gelf,
Expand Down
2 changes: 1 addition & 1 deletion src/codecs/decoding/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ mod tests {
let reader = StreamReader::new(stream);
let decoder = Decoder::new(
Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
Deserializer::Json(JsonDeserializer::new()),
Deserializer::Json(JsonDeserializer::default()),
);
let mut stream = FramedRead::new(reader, decoder);

Expand Down
6 changes: 4 additions & 2 deletions src/components/validation/resources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::S
// "bytes" can be a top-level field and we aren't implicitly decoding everything into the
// `message` field... but it's close enough for now.
DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()),
DeserializerConfig::Json => SerializerConfig::Json(JsonSerializerConfig::default()),
DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()),
// TODO: We need to create an Avro serializer because, certainly, for any source decoding
// the data as Avro, we can't possibly send anything else without the source just
// immediately barfing.
Expand Down Expand Up @@ -184,7 +184,9 @@ fn serializer_config_to_deserializer(config: &SerializerConfig) -> decoding::Des
SerializerConfig::Avro { .. } => todo!(),
SerializerConfig::Csv { .. } => todo!(),
SerializerConfig::Gelf => DeserializerConfig::Gelf,
SerializerConfig::Json(_) => DeserializerConfig::Json,
SerializerConfig::Json(_) => DeserializerConfig::Json {
json: Default::default(),
},
SerializerConfig::Logfmt => todo!(),
SerializerConfig::Native => DeserializerConfig::Native,
SerializerConfig::NativeJson => DeserializerConfig::NativeJson,
Expand Down
8 changes: 6 additions & 2 deletions src/sources/datadog_agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,9 @@ fn test_config_outputs() {
(
"json / single output",
TestCase {
decoding: DeserializerConfig::Json,
decoding: DeserializerConfig::Json {
json: Default::default(),
},
multiple_outputs: false,
want: HashMap::from([(
None,
Expand All @@ -1620,7 +1622,9 @@ fn test_config_outputs() {
(
"json / multiple output",
TestCase {
decoding: DeserializerConfig::Json,
decoding: DeserializerConfig::Json {
json: Default::default(),
},
multiple_outputs: true,
want: HashMap::from([
(
Expand Down
4 changes: 3 additions & 1 deletion src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ impl ValidatableComponent for HttpClientConfig {
let config = Self {
endpoint: uri.to_string(),
interval: Duration::from_secs(1),
decoding: DeserializerConfig::Json,
decoding: DeserializerConfig::Json {
json: Default::default(),
},
..Default::default()
};

Expand Down
Loading

0 comments on commit bf7d796

Please sign in to comment.