Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ memchr = { version = "2", default-features = false }
metrics.workspace = true
opentelemetry-proto = { path = "../opentelemetry-proto", optional = true }
ordered-float.workspace = true
parquet = { version = "39.0.0", default-features = false }
pin-project.workspace = true
prost.workspace = true
prost-reflect.workspace = true
Expand Down
8 changes: 7 additions & 1 deletion lib/codecs/src/encoding/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use vector_config::configurable_component;

use super::{Encoder, EncoderKind, Transformer};
use super::{BatchSerializer, Encoder, EncoderKind, Transformer};
use crate::encoding::{
CharacterDelimitedEncoder, Framer, FramingConfig, LengthDelimitedEncoder,
NewlineDelimitedEncoder, Serializer, SerializerConfig,
Expand Down Expand Up @@ -149,6 +149,12 @@ impl EncodingConfigWithFraming {
let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)));
Ok((self.transformer(), encoder))
}

/// Build a `BatchSerializer` for this config, if the configured serializer is batched
/// (e.g. Parquet). Returns `None` for per-event serializers.
pub fn build_batched(&self) -> vector_common::Result<Option<BatchSerializer>> {
self.encoding.config().build_batched()
}
}

/// The way a sink processes outgoing events.
Expand Down
20 changes: 14 additions & 6 deletions lib/codecs/src/encoding/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use vector_core::event::Event;
#[cfg(feature = "arrow")]
use crate::encoding::ArrowStreamSerializer;
use crate::{
encoding::{Error, Framer, Serializer},
encoding::{Error, Framer, ParquetSerializer, Serializer},
internal_events::{EncoderFramingError, EncoderSerializeError},
};

Expand All @@ -16,6 +16,14 @@ pub enum BatchSerializer {
/// Arrow IPC stream format serializer.
#[cfg(feature = "arrow")]
Arrow(ArrowStreamSerializer),
/// Apache Parquet file format serializer.
Parquet(ParquetSerializer),
}

impl From<ParquetSerializer> for BatchSerializer {
fn from(serializer: ParquetSerializer) -> Self {
Self::Parquet(serializer)
}
}

/// An encoder that encodes batches of events.
Expand All @@ -36,20 +44,19 @@ impl BatchEncoder {
}

/// Get the HTTP content type.
#[cfg(feature = "arrow")]
pub const fn content_type(&self) -> &'static str {
match &self.serializer {
#[cfg(feature = "arrow")]
BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream",
BatchSerializer::Parquet(_) => "application/vnd.apache.parquet",
}
}
}

impl tokio_util::codec::Encoder<Vec<Event>> for BatchEncoder {
type Error = Error;

#[allow(unused_variables)]
fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
#[allow(unreachable_patterns)]
match &mut self.serializer {
#[cfg(feature = "arrow")]
BatchSerializer::Arrow(serializer) => {
Expand All @@ -63,7 +70,9 @@ impl tokio_util::codec::Encoder<Vec<Event>> for BatchEncoder {
}
})
}
_ => unreachable!("BatchSerializer cannot be constructed without encode()"),
BatchSerializer::Parquet(serializer) => serializer
.encode(events, buffer)
.map_err(Error::SerializingError),
}
}
}
Expand All @@ -74,7 +83,6 @@ pub enum EncoderKind {
/// Uses framing to encode individual events
Framed(Box<Encoder<Framer>>),
/// Encodes events in batches without framing
#[cfg(feature = "arrow")]
Batch(BatchEncoder),
}

Expand Down
2 changes: 2 additions & 0 deletions lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod native;
mod native_json;
#[cfg(feature = "opentelemetry")]
mod otlp;
mod parquet;
mod protobuf;
mod raw_message;
#[cfg(feature = "syslog")]
Expand All @@ -39,6 +40,7 @@ pub use native::{NativeSerializer, NativeSerializerConfig};
pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig};
#[cfg(feature = "opentelemetry")]
pub use otlp::{OtlpSerializer, OtlpSerializerConfig};
pub use parquet::{ParquetSerializer, ParquetSerializerConfig, ParquetSerializerOptions};
pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions};
pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig};
#[cfg(feature = "syslog")]
Expand Down
Loading
Loading