From 0bf6abd03fc92c80f306a20da9825c8298efe041 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 21 Jul 2023 02:14:54 +0100 Subject: [PATCH] chore(observability): count byte_size after transforming event (#17941) Before this the `component_sent_event_bytes_total` was emitting the json byte size of the event including any fields dropped by `only_fields` and `except_field` settings. This changes it so the the count is made after transforming the event. A complication arose whereby if the service field was dropped we would no longer have access to this value to emit the `service` tag with the metrics. This also adds a `dropped_fields` field to the event metadata where this value can be stored and accessed. --------- Signed-off-by: Stephen Wakely --- lib/vector-core/src/event/log_event.rs | 16 ++- lib/vector-core/src/event/metadata.rs | 22 ++++ lib/vector-core/src/schema/meaning.rs | 17 ++++ lib/vector-core/src/schema/mod.rs | 1 + src/codecs/encoding/transformer.rs | 129 ++++++++++++++++++++++-- src/sinks/amqp/encoder.rs | 13 ++- src/sinks/azure_blob/test.rs | 24 ++++- src/sinks/datadog/logs/config.rs | 16 +-- src/sinks/datadog/logs/sink.rs | 18 +++- src/sinks/datadog/metrics/encoder.rs | 12 ++- src/sinks/elasticsearch/encoder.rs | 12 ++- src/sinks/elasticsearch/tests.rs | 12 +-- src/sinks/gcp/chronicle_unstructured.rs | 13 ++- src/sinks/gcp/cloud_storage.rs | 8 +- src/sinks/kafka/request_builder.rs | 6 +- src/sinks/loki/event.rs | 10 +- src/sinks/new_relic/config.rs | 20 ++-- src/sinks/new_relic/encoding.rs | 44 ++++++-- src/sinks/new_relic/service.rs | 8 +- src/sinks/new_relic/sink.rs | 48 ++------- src/sinks/pulsar/encoder.rs | 14 ++- src/sinks/splunk_hec/logs/encoder.rs | 9 +- src/sinks/splunk_hec/metrics/encoder.rs | 11 +- src/sinks/statsd/request_builder.rs | 11 +- src/sinks/util/encoding.rs | 48 ++++++--- src/sinks/util/metadata.rs | 10 +- src/sinks/util/request_builder.rs | 19 ++-- src/sinks/webhdfs/test.rs | 4 +- src/sources/splunk_hec/mod.rs | 7 +- 29 files changed, 427 insertions(+), 155 deletions(-) create mode 100644 lib/vector-core/src/schema/meaning.rs diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index 41633b32d93d5..87da44d6040c6 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -292,11 +292,19 @@ impl LogEvent { } } + /// Retrieves the value of a field based on it's meaning. + /// This will first check if the value has previously been dropped. It is worth being + /// aware that if the field has been dropped and then some how readded, we still fetch + /// the dropped value here. pub fn get_by_meaning(&self, meaning: impl AsRef) -> Option<&Value> { - self.metadata() - .schema_definition() - .meaning_path(meaning.as_ref()) - .and_then(|path| self.get(path)) + if let Some(dropped) = self.metadata().dropped_field(&meaning) { + Some(dropped) + } else { + self.metadata() + .schema_definition() + .meaning_path(meaning.as_ref()) + .and_then(|path| self.get(path)) + } } // TODO(Jean): Once the event API uses `Lookup`, the allocation here can be removed. diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index d86884be7582c..f577147e11ded 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -46,6 +46,14 @@ pub struct EventMetadata { /// TODO(Jean): must not skip serialization to track schemas across restarts. #[serde(default = "default_schema_definition", skip)] schema_definition: Arc, + + /// A store of values that may be dropped during the encoding process but may be needed + /// later on. The map is indexed by meaning. + /// Currently this is just used for the `service`. If the service field is dropped by `only_fields` + /// we need to ensure it is still available later on for emitting metrics tagged by the service. + /// This field could almost be keyed by `&'static str`, but because it needs to be deserializable + /// we have to use `String`. + dropped_fields: BTreeMap, } fn default_metadata_value() -> Value { @@ -123,6 +131,19 @@ impl EventMetadata { pub fn set_splunk_hec_token(&mut self, secret: Arc) { self.secrets.insert(SPLUNK_HEC_TOKEN, secret); } + + /// Adds the value to the dropped fields list. + /// There is currently no way to remove a field from this list, so if a field is dropped + /// and then the field is re-added with a new value - the dropped value will still be + /// retrieved. + pub fn add_dropped_field(&mut self, meaning: String, value: Value) { + self.dropped_fields.insert(meaning, value); + } + + /// Fetches the dropped field by meaning. + pub fn dropped_field(&self, meaning: impl AsRef) -> Option<&Value> { + self.dropped_fields.get(meaning.as_ref()) + } } impl Default for EventMetadata { @@ -134,6 +155,7 @@ impl Default for EventMetadata { schema_definition: default_schema_definition(), source_id: None, upstream_id: None, + dropped_fields: BTreeMap::new(), } } } diff --git a/lib/vector-core/src/schema/meaning.rs b/lib/vector-core/src/schema/meaning.rs new file mode 100644 index 0000000000000..ab766b0986924 --- /dev/null +++ b/lib/vector-core/src/schema/meaning.rs @@ -0,0 +1,17 @@ +//! Constants for commonly used semantic meanings. + +/// The service typically represents the application that generated the event. +pub const SERVICE: &str = "service"; + +/// The main text message of the event. +pub const MESSAGE: &str = "message"; + +/// The main timestamp of the event. +pub const TIMESTAMP: &str = "timestamp"; + +/// The hostname of the machine where the event was generated. +pub const HOST: &str = "host"; + +pub const SOURCE: &str = "source"; +pub const SEVERITY: &str = "severity"; +pub const TRACE_ID: &str = "trace_id"; diff --git a/lib/vector-core/src/schema/mod.rs b/lib/vector-core/src/schema/mod.rs index 96f6d99442fa8..2d1c01b8d281f 100644 --- a/lib/vector-core/src/schema/mod.rs +++ b/lib/vector-core/src/schema/mod.rs @@ -1,4 +1,5 @@ mod definition; +pub mod meaning; mod requirement; pub use definition::Definition; diff --git a/src/codecs/encoding/transformer.rs b/src/codecs/encoding/transformer.rs index 79137020e6a10..df711bd8f065b 100644 --- a/src/codecs/encoding/transformer.rs +++ b/src/codecs/encoding/transformer.rs @@ -12,6 +12,7 @@ use lookup::{ use serde::{Deserialize, Deserializer}; use vector_config::configurable_component; use vector_core::event::{LogEvent, MaybeAsLogMut}; +use vector_core::schema::meaning; use vrl::value::Value; use crate::{event::Event, serde::skip_serializing_if_default}; @@ -128,20 +129,52 @@ impl Transformer { fn apply_only_fields(&self, log: &mut LogEvent) { if let Some(only_fields) = self.only_fields.as_ref() { - let old_value = std::mem::replace(log.value_mut(), Value::Object(BTreeMap::new())); + let mut old_value = std::mem::replace(log.value_mut(), Value::Object(BTreeMap::new())); for field in only_fields { - if let Some(value) = old_value.get(field) { - log.insert((PathPrefix::Event, field), value.clone()); + if let Some(value) = old_value.remove(field, true) { + log.insert((PathPrefix::Event, field), value); + } + } + + // We may need the service field to apply tags to emitted metrics after the log message has been pruned. If there + // is a service meaning, we move this value to `dropped_fields` in the metadata. + // If the field is still in the new log message after pruning it will have been removed from `old_value` above. + let service_path = log + .metadata() + .schema_definition() + .meaning_path(meaning::SERVICE); + if let Some(service_path) = service_path { + let mut new_log = LogEvent::from(old_value); + if let Some(service) = new_log.remove(service_path) { + log.metadata_mut() + .add_dropped_field(meaning::SERVICE.to_string(), service); } } } } fn apply_except_fields(&self, log: &mut LogEvent) { + use lookup::path::TargetPath; + if let Some(except_fields) = self.except_fields.as_ref() { + let service_path = log + .metadata() + .schema_definition() + .meaning_path(meaning::SERVICE) + .map(|path| path.value_path().to_string()); + for field in except_fields { - log.remove(field.as_str()); + let value = log.remove(field.as_str()); + + // If we are removing the service field we need to store this in a `dropped_fields` list as we may need to + // refer to this later when emitting metrics. + if let Some(v) = value { + if matches!(service_path.as_ref(), Some(path) if path == field) { + log.metadata_mut() + .add_dropped_field(meaning::SERVICE.to_string(), v); + } + } } } } @@ -213,10 +246,15 @@ pub enum TimestampFormat { #[cfg(test)] mod tests { use indoc::indoc; - use vector_core::config::log_schema; + use lookup::path::parse_target_path; + use vector_common::btreemap; + use vector_core::config::{log_schema, LogNamespace}; + use vrl::value::Kind; + + use crate::config::schema; use super::*; - use std::collections::BTreeMap; + use std::{collections::BTreeMap, sync::Arc}; #[test] fn serialize() { @@ -374,4 +412,83 @@ mod tests { "#}); assert!(config.is_err()) } + + #[test] + fn only_fields_with_service() { + let transformer: Transformer = toml::from_str(r#"only_fields = ["message"]"#).unwrap(); + let mut log = LogEvent::default(); + { + log.insert("message", 1); + log.insert("thing.service", "carrot"); + } + + let schema = schema::Definition::new_with_default_metadata( + Kind::object(btreemap! { + "thing" => Kind::object(btreemap! { + "service" => Kind::bytes(), + }) + }), + [LogNamespace::Vector], + ); + + let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service"); + + let mut event = Event::from(log); + + event + .metadata_mut() + .set_schema_definition(&Arc::new(schema)); + + transformer.transform(&mut event); + assert!(event.as_mut_log().contains("message")); + + // Event no longer contains the service field. + assert!(!event.as_mut_log().contains("thing.service")); + + // But we can still get the service by meaning. + assert_eq!( + &Value::from("carrot"), + event.as_log().get_by_meaning("service").unwrap() + ); + } + + #[test] + fn except_fields_with_service() { + let transformer: Transformer = + toml::from_str(r#"except_fields = ["thing.service"]"#).unwrap(); + let mut log = LogEvent::default(); + { + log.insert("message", 1); + log.insert("thing.service", "carrot"); + } + + let schema = schema::Definition::new_with_default_metadata( + Kind::object(btreemap! { + "thing" => Kind::object(btreemap! { + "service" => Kind::bytes(), + }) + }), + [LogNamespace::Vector], + ); + + let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service"); + + let mut event = Event::from(log); + + event + .metadata_mut() + .set_schema_definition(&Arc::new(schema)); + + transformer.transform(&mut event); + assert!(event.as_mut_log().contains("message")); + + // Event no longer contains the service field. + assert!(!event.as_mut_log().contains("thing.service")); + + // But we can still get the service by meaning. + assert_eq!( + &Value::from("carrot"), + event.as_log().get_by_meaning("service").unwrap() + ); + } } diff --git a/src/sinks/amqp/encoder.rs b/src/sinks/amqp/encoder.rs index d3d449811372f..a1af7ec54e77b 100644 --- a/src/sinks/amqp/encoder.rs +++ b/src/sinks/amqp/encoder.rs @@ -3,6 +3,7 @@ use crate::sinks::prelude::*; use bytes::BytesMut; use std::io; use tokio_util::codec::Encoder as _; +use vector_core::config::telemetry; #[derive(Clone, Debug)] pub(super) struct AmqpEncoder { @@ -11,9 +12,17 @@ pub(super) struct AmqpEncoder { } impl encoding::Encoder for AmqpEncoder { - fn encode_input(&self, mut input: Event, writer: &mut dyn io::Write) -> io::Result { + fn encode_input( + &self, + mut input: Event, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { let mut body = BytesMut::new(); self.transformer.transform(&mut input); + + let mut byte_size = telemetry().create_request_count_byte_size(); + byte_size.add_event(&input, input.estimated_json_encoded_size_of()); + let mut encoder = self.encoder.clone(); encoder .encode(input, &mut body) @@ -22,6 +31,6 @@ impl encoding::Encoder for AmqpEncoder { let body = body.freeze(); write_all(writer, 1, body.as_ref())?; - Ok(body.len()) + Ok((body.len(), byte_size)) } } diff --git a/src/sinks/azure_blob/test.rs b/src/sinks/azure_blob/test.rs index de47fd8bdebe4..0c4ad1f38dd53 100644 --- a/src/sinks/azure_blob/test.rs +++ b/src/sinks/azure_blob/test.rs @@ -4,7 +4,8 @@ use codecs::{ encoding::{Framer, FramingConfig}, NewlineDelimitedEncoder, TextSerializerConfig, }; -use vector_core::partition::Partitioner; +use vector_common::request_metadata::GroupedCountByteSize; +use vector_core::{partition::Partitioner, EstimatedJsonEncodedSizeOf}; use super::config::AzureBlobSinkConfig; use super::request_builder::AzureBlobRequestOptions; @@ -68,10 +69,13 @@ fn azure_blob_build_request_without_compression() { compression, }; + let mut byte_size = GroupedCountByteSize::new_untagged(); + byte_size.add_event(&log, log.estimated_json_encoded_size_of()); + let (metadata, request_metadata_builder, _events) = request_options.split_input((key, vec![log])); - let payload = EncodeResult::uncompressed(Bytes::new()); + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); let request_metadata = request_metadata_builder.build(&payload); let request = request_options.build_request(metadata, request_metadata, payload); @@ -112,10 +116,14 @@ fn azure_blob_build_request_with_compression() { ), compression, }; + + let mut byte_size = GroupedCountByteSize::new_untagged(); + byte_size.add_event(&log, log.estimated_json_encoded_size_of()); + let (metadata, request_metadata_builder, _events) = request_options.split_input((key, vec![log])); - let payload = EncodeResult::uncompressed(Bytes::new()); + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); let request_metadata = request_metadata_builder.build(&payload); let request = request_options.build_request(metadata, request_metadata, payload); @@ -157,10 +165,13 @@ fn azure_blob_build_request_with_time_format() { compression, }; + let mut byte_size = GroupedCountByteSize::new_untagged(); + byte_size.add_event(&log, log.estimated_json_encoded_size_of()); + let (metadata, request_metadata_builder, _events) = request_options.split_input((key, vec![log])); - let payload = EncodeResult::uncompressed(Bytes::new()); + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); let request_metadata = request_metadata_builder.build(&payload); let request = request_options.build_request(metadata, request_metadata, payload); @@ -205,10 +216,13 @@ fn azure_blob_build_request_with_uuid() { compression, }; + let mut byte_size = GroupedCountByteSize::new_untagged(); + byte_size.add_event(&log, log.estimated_json_encoded_size_of()); + let (metadata, request_metadata_builder, _events) = request_options.split_input((key, vec![log])); - let payload = EncodeResult::uncompressed(Bytes::new()); + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); let request_metadata = request_metadata_builder.build(&payload); let request = request_options.build_request(metadata, request_metadata, payload); diff --git a/src/sinks/datadog/logs/config.rs b/src/sinks/datadog/logs/config.rs index 80fb5293257a1..c6c93da795fc3 100644 --- a/src/sinks/datadog/logs/config.rs +++ b/src/sinks/datadog/logs/config.rs @@ -3,7 +3,7 @@ use std::{convert::TryFrom, sync::Arc}; use indoc::indoc; use tower::ServiceBuilder; use vector_config::configurable_component; -use vector_core::config::proxy::ProxyConfig; +use vector_core::{config::proxy::ProxyConfig, schema::meaning}; use vrl::value::Kind; use super::{service::LogApiRetry, sink::LogSinkBuilder}; @@ -176,13 +176,13 @@ impl SinkConfig for DatadogLogsConfig { fn input(&self) -> Input { let requirement = schema::Requirement::empty() - .required_meaning("message", Kind::bytes()) - .required_meaning("timestamp", Kind::timestamp()) - .optional_meaning("host", Kind::bytes()) - .optional_meaning("source", Kind::bytes()) - .optional_meaning("severity", Kind::bytes()) - .optional_meaning("service", Kind::bytes()) - .optional_meaning("trace_id", Kind::bytes()); + .required_meaning(meaning::MESSAGE, Kind::bytes()) + .required_meaning(meaning::TIMESTAMP, Kind::timestamp()) + .optional_meaning(meaning::HOST, Kind::bytes()) + .optional_meaning(meaning::SOURCE, Kind::bytes()) + .optional_meaning(meaning::SEVERITY, Kind::bytes()) + .optional_meaning(meaning::SERVICE, Kind::bytes()) + .optional_meaning(meaning::TRACE_ID, Kind::bytes()); Input::log().with_schema_requirement(requirement) } diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index fd2f132d13f85..ca1bb60e8de7a 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -7,7 +7,7 @@ use futures::stream::{BoxStream, StreamExt}; use lookup::event_path; use snafu::Snafu; use tower::Service; -use vector_common::request_metadata::RequestMetadata; +use vector_common::request_metadata::{GroupedCountByteSize, RequestMetadata}; use vector_core::{ event::{Event, EventFinalizers, Finalizable, Value}, partition::Partitioner, @@ -125,7 +125,11 @@ impl JsonEncoding { } impl crate::sinks::util::encoding::Encoder> for JsonEncoding { - fn encode_input(&self, mut input: Vec, writer: &mut dyn io::Write) -> io::Result { + fn encode_input( + &self, + mut input: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { for event in input.iter_mut() { let log = event.as_mut_log(); let message_path = log @@ -219,7 +223,7 @@ impl RequestBuilder<(Option>, Vec)> for LogRequestBuilder { // to (un)compressed size limitations. let mut buf = Vec::new(); let n_events = events.len(); - let uncompressed_size = self.encoder().encode_input(events, &mut buf)?; + let (uncompressed_size, byte_size) = self.encoder().encode_input(events, &mut buf)?; if uncompressed_size > MAX_PAYLOAD_BYTES { return Err(RequestBuildError::PayloadTooBig); } @@ -230,9 +234,13 @@ impl RequestBuilder<(Option>, Vec)> for LogRequestBuilder { let bytes = compressor.into_inner().freeze(); if self.compression.is_compressed() { - Ok(EncodeResult::compressed(bytes, uncompressed_size)) + Ok(EncodeResult::compressed( + bytes, + uncompressed_size, + byte_size, + )) } else { - Ok(EncodeResult::uncompressed(bytes)) + Ok(EncodeResult::uncompressed(bytes, byte_size)) } } diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index 49e0c6e61aacc..b056d672cf46a 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -10,10 +10,12 @@ use chrono::{DateTime, Utc}; use once_cell::sync::OnceCell; use prost::Message; use snafu::{ResultExt, Snafu}; +use vector_common::request_metadata::GroupedCountByteSize; use vector_core::{ - config::{log_schema, LogSchema}, + config::{log_schema, telemetry, LogSchema}, event::{metric::MetricSketch, Metric, MetricTags, MetricValue}, metrics::AgentDDSketch, + EstimatedJsonEncodedSizeOf, }; use super::config::{ @@ -122,6 +124,7 @@ struct EncoderState { written: usize, buf: Vec, processed: Vec, + byte_size: GroupedCountByteSize, } impl Default for EncoderState { @@ -131,6 +134,7 @@ impl Default for EncoderState { written: 0, buf: Vec::with_capacity(1024), processed: Vec::new(), + byte_size: telemetry().create_request_count_byte_size(), } } } @@ -202,6 +206,10 @@ impl DatadogMetricsEncoder { // Clear our temporary buffer before any encoding. self.state.buf.clear(); + self.state + .byte_size + .add_event(&metric, metric.estimated_json_encoded_size_of()); + match self.endpoint { // Series metrics are encoded via JSON, in an incremental fashion. DatadogMetricsEndpoint::Series => { @@ -349,7 +357,7 @@ impl DatadogMetricsEncoder { if recommended_splits == 1 { // "One" split means no splits needed: our payload didn't exceed either of the limits. Ok(( - EncodeResult::compressed(payload, raw_bytes_written), + EncodeResult::compressed(payload, raw_bytes_written, self.state.byte_size.clone()), processed, )) } else { diff --git a/src/sinks/elasticsearch/encoder.rs b/src/sinks/elasticsearch/encoder.rs index f5b39a52b23a0..8f136bd02a195 100644 --- a/src/sinks/elasticsearch/encoder.rs +++ b/src/sinks/elasticsearch/encoder.rs @@ -4,9 +4,9 @@ use serde::Serialize; use vector_buffers::EventCount; use vector_common::{ json_size::JsonSize, - request_metadata::{EventCountTags, GetEventCountTags}, + request_metadata::{EventCountTags, GetEventCountTags, GroupedCountByteSize}, }; -use vector_core::{event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf}; +use vector_core::{config::telemetry, event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf}; use crate::{ codecs::Transformer, @@ -68,12 +68,15 @@ impl Encoder> for ElasticsearchEncoder { &self, input: Vec, writer: &mut dyn Write, - ) -> std::io::Result { + ) -> std::io::Result<(usize, GroupedCountByteSize)> { let mut written_bytes = 0; + let mut byte_size = telemetry().create_request_count_byte_size(); for event in input { let log = { let mut event = Event::from(event.log); self.transformer.transform(&mut event); + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + event.into_log() }; written_bytes += write_bulk_action( @@ -92,7 +95,8 @@ impl Encoder> for ElasticsearchEncoder { Ok(()) })?; } - Ok(written_bytes) + + Ok((written_bytes, byte_size)) } } diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index da0d3cecea965..a9cc8576a6843 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -49,7 +49,7 @@ async fn sets_create_action_when_configured() { log.insert("action", "crea"); let mut encoded = vec![]; - let encoded_size = es + let (encoded_size, _json_size) = es .request_builder .encoder .encode_input( @@ -103,7 +103,7 @@ async fn encode_datastream_mode() { log.insert("data_stream", data_stream_body()); let mut encoded = vec![]; - let encoded_size = es + let (encoded_size, _json_size) = es .request_builder .encoder .encode_input( @@ -154,7 +154,7 @@ async fn encode_datastream_mode_no_routing() { .expect("invalid timestamp"), ); let mut encoded = vec![]; - let encoded_size = es + let (encoded_size, _json_size) = es .request_builder .encoder .encode_input( @@ -299,7 +299,7 @@ async fn encode_datastream_mode_no_sync() { ); let mut encoded = vec![]; - let encoded_size = es + let (encoded_size, _json_size) = es .request_builder .encoder .encode_input( @@ -339,7 +339,7 @@ async fn allows_using_except_fields() { log.insert("idx", "purple"); let mut encoded = vec![]; - let encoded_size = es + let (encoded_size, _json_size) = es .request_builder .encoder .encode_input( @@ -374,7 +374,7 @@ async fn allows_using_only_fields() { log.insert("idx", "purple"); let mut encoded = vec![]; - let encoded_size = es + let (encoded_size, _json_size) = es .request_builder .encoder .encode_input( diff --git a/src/sinks/gcp/chronicle_unstructured.rs b/src/sinks/gcp/chronicle_unstructured.rs index 2e91b15ca3548..5b65089852ea0 100644 --- a/src/sinks/gcp/chronicle_unstructured.rs +++ b/src/sinks/gcp/chronicle_unstructured.rs @@ -12,12 +12,13 @@ use snafu::Snafu; use std::io; use tokio_util::codec::Encoder as _; use tower::{Service, ServiceBuilder}; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; +use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_config::configurable_component; use vector_core::{ - config::{AcknowledgementsConfig, Input}, + config::{telemetry, AcknowledgementsConfig, Input}, event::{Event, EventFinalizers, Finalizable}, sink::VectorSink, + EstimatedJsonEncodedSizeOf, }; use vrl::value::Kind; @@ -307,9 +308,10 @@ impl Encoder<(String, Vec)> for ChronicleEncoder { &self, input: (String, Vec), writer: &mut dyn io::Write, - ) -> io::Result { + ) -> io::Result<(usize, GroupedCountByteSize)> { let (partition_key, events) = input; let mut encoder = self.encoder.clone(); + let mut byte_size = telemetry().create_request_count_byte_size(); let events = events .into_iter() .filter_map(|mut event| { @@ -320,6 +322,9 @@ impl Encoder<(String, Vec)> for ChronicleEncoder { .cloned(); let mut bytes = BytesMut::new(); self.transformer.transform(&mut event); + + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + encoder.encode(event, &mut bytes).ok()?; let mut value = json!({ @@ -349,7 +354,7 @@ impl Encoder<(String, Vec)> for ChronicleEncoder { Ok(()) })?; - Ok(size) + Ok((size, byte_size)) } } diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index ff4f6bb378a21..45b051a87ca3d 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -410,7 +410,9 @@ mod tests { use codecs::encoding::FramingConfig; use codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig}; use futures_util::{future::ready, stream}; + use vector_common::request_metadata::GroupedCountByteSize; use vector_core::partition::Partitioner; + use vector_core::EstimatedJsonEncodedSizeOf; use crate::event::LogEvent; use crate::test_util::{ @@ -491,10 +493,14 @@ mod tests { .unwrap() .partition(&log) .expect("key wasn't provided"); + + let mut byte_size = GroupedCountByteSize::new_untagged(); + byte_size.add_event(&log, log.estimated_json_encoded_size_of()); + let request_settings = request_settings(&sink_config); let (metadata, metadata_request_builder, _events) = request_settings.split_input((key, vec![log])); - let payload = EncodeResult::uncompressed(Bytes::new()); + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); let request_metadata = metadata_request_builder.build(&payload); request_settings.build_request(metadata, request_metadata, payload) diff --git a/src/sinks/kafka/request_builder.rs b/src/sinks/kafka/request_builder.rs index 9d1edd0d97c43..c2d3c7aaa9219 100644 --- a/src/sinks/kafka/request_builder.rs +++ b/src/sinks/kafka/request_builder.rs @@ -37,8 +37,6 @@ impl KafkaRequestBuilder { }) .ok()?; - let metadata_builder = RequestMetadataBuilder::from_event(&event); - let metadata = KafkaRequestMetadata { finalizers: event.take_finalizers(), key: get_key(&event, &self.key_field), @@ -48,6 +46,10 @@ impl KafkaRequestBuilder { }; self.transformer.transform(&mut event); let mut body = BytesMut::new(); + + // Ensure the metadata builder is built after transforming the event so we have the event + // size taking into account any dropped fields. + let metadata_builder = RequestMetadataBuilder::from_event(&event); self.encoder.encode(event, &mut body).ok()?; let body = body.freeze(); diff --git a/src/sinks/loki/event.rs b/src/sinks/loki/event.rs index 22d399f970710..28a115243c3e8 100644 --- a/src/sinks/loki/event.rs +++ b/src/sinks/loki/event.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, io}; use crate::sinks::{prelude::*, util::encoding::Encoder}; use bytes::Bytes; use serde::{ser::SerializeSeq, Serialize}; +use vector_core::config::telemetry; pub type Labels = Vec<(String, String)>; @@ -20,8 +21,13 @@ impl Encoder> for LokiBatchEncoder { &self, input: Vec, writer: &mut dyn io::Write, - ) -> io::Result { + ) -> io::Result<(usize, GroupedCountByteSize)> { let count = input.len(); + let mut byte_size = telemetry().create_request_count_byte_size(); + for event in &input { + byte_size.add_event(event, event.estimated_json_encoded_size_of()); + } + let batch = LokiBatch::from(input); let body = match self.0 { LokiBatchEncoding::Json => { @@ -52,7 +58,7 @@ impl Encoder> for LokiBatchEncoder { batch.encode() } }; - write_all(writer, count, &body).map(|()| body.len()) + write_all(writer, count, &body).map(|()| (body.len(), byte_size)) } } diff --git a/src/sinks/new_relic/config.rs b/src/sinks/new_relic/config.rs index 5900755b12207..af892849f0964 100644 --- a/src/sinks/new_relic/config.rs +++ b/src/sinks/new_relic/config.rs @@ -1,25 +1,15 @@ use std::{fmt::Debug, sync::Arc}; -use futures::FutureExt; use http::Uri; use tower::ServiceBuilder; use vector_common::sensitive_string::SensitiveString; -use vector_config::configurable_component; use super::{ healthcheck, NewRelicApiResponse, NewRelicApiService, NewRelicEncoder, NewRelicSink, NewRelicSinkError, }; -use crate::{ - codecs::Transformer, - config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext}, - http::HttpClient, - sinks::util::{ - retries::RetryLogic, service::ServiceBuilderExt, BatchConfig, Compression, - SinkBatchSettings, TowerRequestConfig, - }, - tls::TlsSettings, -}; + +use crate::{http::HttpClient, sinks::prelude::*}; /// New Relic region. #[configurable_component] @@ -164,8 +154,10 @@ impl SinkConfig for NewRelicConfig { let sink = NewRelicSink { service, - transformer: self.encoding.clone(), - encoder: NewRelicEncoder, + encoder: NewRelicEncoder { + transformer: self.encoding.clone(), + credentials: Arc::clone(&credentials), + }, credentials, compression: self.compression, batcher_settings, diff --git a/src/sinks/new_relic/encoding.rs b/src/sinks/new_relic/encoding.rs index ec71aa4f40879..f058161d20fb9 100644 --- a/src/sinks/new_relic/encoding.rs +++ b/src/sinks/new_relic/encoding.rs @@ -1,28 +1,54 @@ -use std::io; +use std::{io, sync::Arc}; use serde::Serialize; +use vector_common::request_metadata::GroupedCountByteSize; +use vector_core::{config::telemetry, event::Event, EstimatedJsonEncodedSizeOf}; -use super::{NewRelicApiModel, NewRelicSinkError}; -use crate::sinks::util::encoding::{as_tracked_write, Encoder}; +use super::{ + EventsApiModel, LogsApiModel, MetricsApiModel, NewRelicApi, NewRelicApiModel, + NewRelicCredentials, NewRelicSinkError, +}; +use crate::sinks::{ + prelude::*, + util::encoding::{as_tracked_write, Encoder}, +}; -pub struct NewRelicEncoder; +pub struct NewRelicEncoder { + pub(super) transformer: Transformer, + pub(super) credentials: Arc, +} -impl Encoder> for NewRelicEncoder { +impl Encoder> for NewRelicEncoder { fn encode_input( &self, - input: Result, + mut input: Vec, writer: &mut dyn io::Write, - ) -> io::Result { - let json = match input? { + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut byte_size = telemetry().create_request_count_byte_size(); + + for event in input.iter_mut() { + self.transformer.transform(event); + byte_size.add_event(event, event.estimated_json_encoded_size_of()); + } + + let api_model = match self.credentials.api { + NewRelicApi::Events => NewRelicApiModel::Events(EventsApiModel::try_from(input)?), + NewRelicApi::Metrics => NewRelicApiModel::Metrics(MetricsApiModel::try_from(input)?), + NewRelicApi::Logs => NewRelicApiModel::Logs(LogsApiModel::try_from(input)?), + }; + + let json = match api_model { NewRelicApiModel::Events(ev_api_model) => to_json(&ev_api_model)?, NewRelicApiModel::Metrics(met_api_model) => to_json(&met_api_model)?, NewRelicApiModel::Logs(log_api_model) => to_json(&log_api_model)?, }; + let size = as_tracked_write::<_, _, io::Error>(writer, &json, |writer, json| { writer.write_all(json)?; Ok(()) })?; - io::Result::Ok(size) + + io::Result::Ok((size, byte_size)) } } diff --git a/src/sinks/new_relic/service.rs b/src/sinks/new_relic/service.rs index 290276b72b5cf..2b69aae492348 100644 --- a/src/sinks/new_relic/service.rs +++ b/src/sinks/new_relic/service.rs @@ -5,21 +5,15 @@ use std::{ }; use bytes::Bytes; -use futures::future::BoxFuture; use http::{ header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE}, Request, }; use hyper::Body; -use tower::Service; use tracing::Instrument; -use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; -use vector_core::{ - event::{EventFinalizers, EventStatus, Finalizable}, - stream::DriverResponse, -}; use super::{NewRelicCredentials, NewRelicSinkError}; +use crate::sinks::prelude::*; use crate::{http::HttpClient, sinks::util::Compression}; #[derive(Debug, Clone)] diff --git a/src/sinks/new_relic/sink.rs b/src/sinks/new_relic/sink.rs index c56c4d4b9f425..384a1b7a54e52 100644 --- a/src/sinks/new_relic/sink.rs +++ b/src/sinks/new_relic/sink.rs @@ -1,28 +1,11 @@ -use std::{convert::TryFrom, fmt::Debug, num::NonZeroUsize, sync::Arc}; +use std::{fmt::Debug, num::NonZeroUsize, sync::Arc}; use async_trait::async_trait; use bytes::Bytes; -use futures::stream::{BoxStream, StreamExt}; -use tower::Service; -use vector_common::request_metadata::RequestMetadata; -use vector_core::{ - event::{EventFinalizers, Finalizable}, - stream::{BatcherSettings, DriverResponse}, -}; -use super::{ - EventsApiModel, LogsApiModel, MetricsApiModel, NewRelicApi, NewRelicApiModel, - NewRelicApiRequest, NewRelicCredentials, NewRelicEncoder, -}; +use super::{NewRelicApiRequest, NewRelicCredentials, NewRelicEncoder}; use crate::{ - codecs::Transformer, - event::Event, - http::get_http_scheme_from_uri, - internal_events::SinkRequestBuildError, - sinks::util::{ - builder::SinkBuilderExt, metadata::RequestMetadataBuilder, request_builder::EncodeResult, - Compression, RequestBuilder, StreamSink, - }, + http::get_http_scheme_from_uri, internal_events::SinkRequestBuildError, sinks::prelude::*, }; #[derive(Debug)] @@ -69,7 +52,6 @@ impl From for std::io::Error { } struct NewRelicRequestBuilder { - transformer: Transformer, encoder: NewRelicEncoder, compression: Compression, credentials: Arc, @@ -77,7 +59,7 @@ struct NewRelicRequestBuilder { impl RequestBuilder> for NewRelicRequestBuilder { type Metadata = EventFinalizers; - type Events = Result; + type Events = Vec; type Encoder = NewRelicEncoder; type Payload = Bytes; type Request = NewRelicApiRequest; @@ -95,26 +77,10 @@ impl RequestBuilder> for NewRelicRequestBuilder { &self, mut input: Vec, ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { - for event in input.iter_mut() { - self.transformer.transform(event); - } - let builder = RequestMetadataBuilder::from_events(&input); - let finalizers = input.take_finalizers(); - let api_model = || -> Result { - match self.credentials.api { - NewRelicApi::Events => { - Ok(NewRelicApiModel::Events(EventsApiModel::try_from(input)?)) - } - NewRelicApi::Metrics => { - Ok(NewRelicApiModel::Metrics(MetricsApiModel::try_from(input)?)) - } - NewRelicApi::Logs => Ok(NewRelicApiModel::Logs(LogsApiModel::try_from(input)?)), - } - }(); - - (finalizers, builder, api_model) + + (finalizers, builder, input) } fn build_request( @@ -135,7 +101,6 @@ impl RequestBuilder> for NewRelicRequestBuilder { pub struct NewRelicSink { pub service: S, - pub transformer: Transformer, pub encoder: NewRelicEncoder, pub credentials: Arc, pub compression: Compression, @@ -152,7 +117,6 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let builder_limit = NonZeroUsize::new(64); let request_builder = NewRelicRequestBuilder { - transformer: self.transformer, encoder: self.encoder, compression: self.compression, credentials: Arc::clone(&self.credentials), diff --git a/src/sinks/pulsar/encoder.rs b/src/sinks/pulsar/encoder.rs index 63d25f2e05711..e324c4b0ea376 100644 --- a/src/sinks/pulsar/encoder.rs +++ b/src/sinks/pulsar/encoder.rs @@ -6,6 +6,8 @@ use crate::{ use bytes::BytesMut; use std::io; use tokio_util::codec::Encoder as _; +use vector_common::request_metadata::GroupedCountByteSize; +use vector_core::{config::telemetry, EstimatedJsonEncodedSizeOf}; #[derive(Clone, Debug)] pub(super) struct PulsarEncoder { @@ -14,9 +16,17 @@ pub(super) struct PulsarEncoder { } impl Encoder for PulsarEncoder { - fn encode_input(&self, mut input: Event, writer: &mut dyn io::Write) -> io::Result { + fn encode_input( + &self, + mut input: Event, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { let mut body = BytesMut::new(); self.transformer.transform(&mut input); + + let mut byte_size = telemetry().create_request_count_byte_size(); + byte_size.add_event(&input, input.estimated_json_encoded_size_of()); + let mut encoder = self.encoder.clone(); encoder .encode(input, &mut body) @@ -25,6 +35,6 @@ impl Encoder for PulsarEncoder { let body = body.freeze(); write_all(writer, 1, body.as_ref())?; - Ok(body.len()) + Ok((body.len(), byte_size)) } } diff --git a/src/sinks/splunk_hec/logs/encoder.rs b/src/sinks/splunk_hec/logs/encoder.rs index 695294cefae70..0f0270ce28a42 100644 --- a/src/sinks/splunk_hec/logs/encoder.rs +++ b/src/sinks/splunk_hec/logs/encoder.rs @@ -3,6 +3,8 @@ use std::borrow::Cow; use bytes::BytesMut; use serde::Serialize; use tokio_util::codec::Encoder as _; +use vector_common::request_metadata::GroupedCountByteSize; +use vector_core::{config::telemetry, EstimatedJsonEncodedSizeOf}; use super::sink::HecProcessedEvent; use crate::{ @@ -63,8 +65,9 @@ impl Encoder> for HecLogsEncoder { &self, input: Vec, writer: &mut dyn std::io::Write, - ) -> std::io::Result { + ) -> std::io::Result<(usize, GroupedCountByteSize)> { let mut encoder = self.encoder.clone(); + let mut byte_size = telemetry().create_request_count_byte_size(); let encoded_input: Vec = input .into_iter() .filter_map(|processed_event| { @@ -72,6 +75,8 @@ impl Encoder> for HecLogsEncoder { let metadata = processed_event.metadata; self.transformer.transform(&mut event); + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + let mut bytes = BytesMut::new(); match metadata.endpoint_target { @@ -128,6 +133,6 @@ impl Encoder> for HecLogsEncoder { let encoded_size = encoded_input.len(); writer.write_all(encoded_input.as_slice())?; - Ok(encoded_size) + Ok((encoded_size, byte_size)) } } diff --git a/src/sinks/splunk_hec/metrics/encoder.rs b/src/sinks/splunk_hec/metrics/encoder.rs index fea4c058ea72c..5b0198825f9eb 100644 --- a/src/sinks/splunk_hec/metrics/encoder.rs +++ b/src/sinks/splunk_hec/metrics/encoder.rs @@ -1,6 +1,8 @@ use std::{collections::BTreeMap, iter}; use serde::Serialize; +use vector_common::request_metadata::GroupedCountByteSize; +use vector_core::{config::telemetry, EstimatedJsonEncodedSizeOf}; use super::sink::HecProcessedEvent; use crate::{internal_events::SplunkEventEncodeError, sinks::util::encoding::Encoder}; @@ -95,7 +97,12 @@ impl Encoder> for HecMetricsEncoder { &self, input: Vec, writer: &mut dyn std::io::Write, - ) -> std::io::Result { + ) -> std::io::Result<(usize, GroupedCountByteSize)> { + let mut byte_size = telemetry().create_request_count_byte_size(); + for event in &input { + byte_size.add_event(event, event.estimated_json_encoded_size_of()); + } + let encoded_input: Vec = input .into_iter() .filter_map(Self::encode_event) @@ -103,6 +110,6 @@ impl Encoder> for HecMetricsEncoder { .collect(); let encoded_size = encoded_input.len(); writer.write_all(encoded_input.as_slice())?; - Ok(encoded_size) + Ok((encoded_size, byte_size)) } } diff --git a/src/sinks/statsd/request_builder.rs b/src/sinks/statsd/request_builder.rs index 9cfdf119a08d3..08034cb101f5b 100644 --- a/src/sinks/statsd/request_builder.rs +++ b/src/sinks/statsd/request_builder.rs @@ -4,7 +4,11 @@ use bytes::BytesMut; use snafu::Snafu; use tokio_util::codec::Encoder; use vector_common::request_metadata::RequestMetadata; -use vector_core::event::{EventFinalizers, Finalizable, Metric}; +use vector_core::{ + config::telemetry, + event::{EventFinalizers, Finalizable, Metric}, + EstimatedJsonEncodedSizeOf, +}; use super::{encoder::StatsdEncoder, service::StatsdRequest}; use crate::{ @@ -79,6 +83,7 @@ impl IncrementalRequestBuilder> for StatsdRequestBuilder { let mut metrics = input.drain(..); while metrics.len() != 0 || pending.is_some() { + let mut byte_size = telemetry().create_request_count_byte_size(); let mut n = 0; let mut request_buf = Vec::new(); @@ -95,6 +100,8 @@ impl IncrementalRequestBuilder> for StatsdRequestBuilder { }, }; + byte_size.add_event(&metric, metric.estimated_json_encoded_size_of()); + // Encode the metric. Once we've done that, see if it can fit into the request // buffer without exceeding the maximum request size limit. // @@ -131,7 +138,7 @@ impl IncrementalRequestBuilder> for StatsdRequestBuilder { // If we encoded one or more metrics this pass, finalize the request. if n > 0 { - let encode_result = EncodeResult::uncompressed(request_buf); + let encode_result = EncodeResult::uncompressed(request_buf, byte_size); let request_metadata = request_metadata_builder.build(&encode_result); results.push(Ok(( diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index 00dc6944bdbad..b60e7ef177410 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -3,6 +3,8 @@ use std::io; use bytes::BytesMut; use codecs::encoding::Framer; use tokio_util::codec::Encoder as _; +use vector_common::request_metadata::GroupedCountByteSize; +use vector_core::{config::telemetry, EstimatedJsonEncodedSizeOf}; use crate::{codecs::Transformer, event::Event, internal_events::EncoderWriteError}; @@ -12,7 +14,11 @@ pub trait Encoder { /// # Errors /// /// If an I/O error is encountered while encoding the input, an error variant will be returned. - fn encode_input(&self, input: T, writer: &mut dyn io::Write) -> io::Result; + fn encode_input( + &self, + input: T, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)>; } impl Encoder> for (Transformer, crate::codecs::Encoder) { @@ -20,13 +26,16 @@ impl Encoder> for (Transformer, crate::codecs::Encoder) { &self, mut events: Vec, writer: &mut dyn io::Write, - ) -> io::Result { + ) -> io::Result<(usize, GroupedCountByteSize)> { let mut encoder = self.1.clone(); let mut bytes_written = 0; let mut n_events_pending = events.len(); let batch_prefix = encoder.batch_prefix(); write_all(writer, n_events_pending, batch_prefix)?; bytes_written += batch_prefix.len(); + + let mut byte_size = telemetry().create_request_count_byte_size(); + if let Some(last) = events.pop() { for mut event in events { self.0.transform(&mut event); @@ -40,6 +49,11 @@ impl Encoder> for (Transformer, crate::codecs::Encoder) { } let mut event = last; self.0.transform(&mut event); + + // Ensure the json size is calculated after any fields have been removed + // by the transformer. + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + let mut bytes = BytesMut::new(); encoder .serialize(event, &mut bytes) @@ -53,20 +67,28 @@ impl Encoder> for (Transformer, crate::codecs::Encoder) { write_all(writer, 0, batch_suffix)?; bytes_written += batch_suffix.len(); - Ok(bytes_written) + Ok((bytes_written, byte_size)) } } impl Encoder for (Transformer, crate::codecs::Encoder<()>) { - fn encode_input(&self, mut event: Event, writer: &mut dyn io::Write) -> io::Result { + fn encode_input( + &self, + mut event: Event, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { let mut encoder = self.1.clone(); self.0.transform(&mut event); + + let mut byte_size = telemetry().create_request_count_byte_size(); + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + let mut bytes = BytesMut::new(); encoder .serialize(event, &mut bytes) .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?; write_all(writer, 1, &bytes)?; - Ok(bytes.len()) + Ok((bytes.len(), byte_size)) } } @@ -144,7 +166,7 @@ mod tests { ); let mut writer = Vec::new(); - let written = encoding.encode_input(vec![], &mut writer).unwrap(); + let (written, _json_size) = encoding.encode_input(vec![], &mut writer).unwrap(); assert_eq!(written, 2); assert_eq!(String::from_utf8(writer).unwrap(), "[]"); @@ -161,7 +183,7 @@ mod tests { ); let mut writer = Vec::new(); - let written = encoding + let (written, _json_size) = encoding .encode_input( vec![Event::Log(LogEvent::from(BTreeMap::from([( String::from("key"), @@ -186,7 +208,7 @@ mod tests { ); let mut writer = Vec::new(); - let written = encoding + let (written, _json_size) = encoding .encode_input( vec![ Event::Log(LogEvent::from(BTreeMap::from([( @@ -224,7 +246,7 @@ mod tests { ); let mut writer = Vec::new(); - let written = encoding.encode_input(vec![], &mut writer).unwrap(); + let (written, _json_size) = encoding.encode_input(vec![], &mut writer).unwrap(); assert_eq!(written, 0); assert_eq!(String::from_utf8(writer).unwrap(), ""); @@ -241,7 +263,7 @@ mod tests { ); let mut writer = Vec::new(); - let written = encoding + let (written, _json_size) = encoding .encode_input( vec![Event::Log(LogEvent::from(BTreeMap::from([( String::from("key"), @@ -266,7 +288,7 @@ mod tests { ); let mut writer = Vec::new(); - let written = encoding + let (written, _json_size) = encoding .encode_input( vec![ Event::Log(LogEvent::from(BTreeMap::from([( @@ -301,7 +323,7 @@ mod tests { ); let mut writer = Vec::new(); - let written = encoding + let (written, _json_size) = encoding .encode_input( Event::Log(LogEvent::from(BTreeMap::from([( String::from("key"), @@ -323,7 +345,7 @@ mod tests { ); let mut writer = Vec::new(); - let written = encoding + let (written, _json_size) = encoding .encode_input( Event::Log(LogEvent::from(BTreeMap::from([( String::from("message"), diff --git a/src/sinks/util/metadata.rs b/src/sinks/util/metadata.rs index e6f4e7739e4d2..ccf39542c6bd3 100644 --- a/src/sinks/util/metadata.rs +++ b/src/sinks/util/metadata.rs @@ -79,6 +79,8 @@ impl RequestMetadataBuilder { .add_event(&event, json_size); } + /// Builds the [`RequestMetadata`] with the given size. + /// This is used when there is no encoder in the process to provide an `EncodeResult` pub fn with_request_size(&self, size: NonZeroUsize) -> RequestMetadata { let size = size.get(); @@ -91,6 +93,10 @@ impl RequestMetadataBuilder { ) } + /// Builds the [`RequestMetadata`] from the results of encoding. + /// `EncodeResult` provides us with the byte size before and after compression + /// and the json size of the events after transforming (dropping unwanted fields) but + /// before encoding. pub fn build(&self, result: &EncodeResult) -> RequestMetadata { RequestMetadata::new( self.event_count, @@ -99,7 +105,9 @@ impl RequestMetadataBuilder { result .compressed_byte_size .unwrap_or(result.uncompressed_byte_size), - self.events_estimated_json_encoded_byte_size.clone(), + // Building from an encoded result, we take the json size from the encoded since that has the size + // after transforming the event. + result.transformed_json_size.clone(), ) } } diff --git a/src/sinks/util/request_builder.rs b/src/sinks/util/request_builder.rs index 87280d2ec966e..86501f3a96a04 100644 --- a/src/sinks/util/request_builder.rs +++ b/src/sinks/util/request_builder.rs @@ -1,13 +1,14 @@ use std::io; use bytes::Bytes; -use vector_common::request_metadata::RequestMetadata; +use vector_common::request_metadata::{GroupedCountByteSize, RequestMetadata}; use super::{encoding::Encoder, metadata::RequestMetadataBuilder, Compression, Compressor}; pub struct EncodeResult

{ pub payload: P, pub uncompressed_byte_size: usize, + pub transformed_json_size: GroupedCountByteSize, pub compressed_byte_size: Option, } @@ -15,20 +16,26 @@ impl

EncodeResult

where P: AsRef<[u8]>, { - pub fn uncompressed(payload: P) -> Self { + pub fn uncompressed(payload: P, transformed_json_size: GroupedCountByteSize) -> Self { let uncompressed_byte_size = payload.as_ref().len(); Self { payload, uncompressed_byte_size, + transformed_json_size, compressed_byte_size: None, } } - pub fn compressed(payload: P, uncompressed_byte_size: usize) -> Self { + pub fn compressed( + payload: P, + uncompressed_byte_size: usize, + transformed_json_size: GroupedCountByteSize, + ) -> Self { let compressed_byte_size = payload.as_ref().len(); Self { payload, uncompressed_byte_size, + transformed_json_size, compressed_byte_size: Some(compressed_byte_size), } } @@ -74,14 +81,14 @@ pub trait RequestBuilder { // of clash-y with `Self::Metadata`. let mut compressor = Compressor::from(self.compression()); let is_compressed = compressor.is_compressed(); - _ = self.encoder().encode_input(events, &mut compressor)?; + let (_, json_size) = self.encoder().encode_input(events, &mut compressor)?; let payload = compressor.into_inner().freeze(); let result = if is_compressed { let compressed_byte_size = payload.len(); - EncodeResult::compressed(payload.into(), compressed_byte_size) + EncodeResult::compressed(payload.into(), compressed_byte_size, json_size) } else { - EncodeResult::uncompressed(payload.into()) + EncodeResult::uncompressed(payload.into(), json_size) }; Ok(result) diff --git a/src/sinks/webhdfs/test.rs b/src/sinks/webhdfs/test.rs index 0a558c3470225..7d4aa66e2e251 100644 --- a/src/sinks/webhdfs/test.rs +++ b/src/sinks/webhdfs/test.rs @@ -1,5 +1,6 @@ use bytes::Bytes; use codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig}; +use vector_common::request_metadata::GroupedCountByteSize; use vector_core::partition::Partitioner; use super::config::WebHdfsConfig; @@ -66,7 +67,8 @@ fn build_request(compression: Compression) -> OpenDalRequest { let request_builder = request_builder(&sink_config); let (metadata, metadata_request_builder, _events) = request_builder.split_input((key, vec![log])); - let payload = EncodeResult::uncompressed(Bytes::new()); + let byte_size = GroupedCountByteSize::new_untagged(); + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); let request_metadata = metadata_request_builder.build(&payload); request_builder.build_request(metadata, request_metadata, payload) diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 615029f8cdae5..868508b2d0f78 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -22,6 +22,7 @@ use vector_config::configurable_component; use vector_core::{ config::{LegacyKey, LogNamespace}, event::BatchNotifier, + schema::meaning, EstimatedJsonEncodedSizeOf, }; use vrl::value::{kind::Collection, Kind}; @@ -184,7 +185,7 @@ impl SourceConfig for SplunkConfig { .with_event_field( &owned_value_path!(log_schema().message_key()), Kind::bytes().or_undefined(), - Some("message"), + Some(meaning::MESSAGE), ) .with_event_field( &owned_value_path!("line"), @@ -207,7 +208,7 @@ impl SourceConfig for SplunkConfig { .map(LegacyKey::InsertIfEmpty), &owned_value_path!("host"), Kind::bytes(), - Some("host"), + Some(meaning::HOST), ) .with_source_metadata( SplunkConfig::NAME, @@ -228,7 +229,7 @@ impl SourceConfig for SplunkConfig { Some(LegacyKey::Overwrite(owned_value_path!(SOURCE))), &owned_value_path!("source"), Kind::bytes(), - Some("service"), + Some(meaning::SERVICE), ) // Not to be confused with `source_type`. .with_source_metadata(