Skip to content

Commit

Permalink
feat(file sink): add zstd compression
Browse files Browse the repository at this point in the history
  • Loading branch information
hdhoang committed Dec 19, 2020
1 parent 7cc42be commit ea578aa
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -135,6 +135,7 @@ hyper-openssl = "0.8"
openssl = "0.10.30"
openssl-probe = "0.1.2"
flate2 = "1.0.19"
zstd = "0.5.1"
async-compression = { version = "0.3.6", features = ["tokio-02", "gzip", "zstd"] }
structopt = "0.3.21"
indexmap = {version = "1.5.1", features = ["serde-1"]}
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/components/sinks.cue
Expand Up @@ -127,6 +127,9 @@ components: sinks: [Name=string]: {
if list.Contains(sinks[Name].features.send.compression.algorithms, "gzip") {
gzip: "[Gzip](\(urls.gzip)) standard DEFLATE compression."
}
if list.Contains(sinks[Name].features.send.compression.algorithms, "zstd") {
zstd: "[Zstd](\(urls.zstd)) Zstandard compression."
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/components/sinks/file.cue
Expand Up @@ -19,7 +19,7 @@ components: sinks: file: {
compression: {
enabled: true
default: "none"
algorithms: ["none", "gzip"]
algorithms: ["none", "gzip", "zstd"]
levels: ["none", "fast", "default", "best", 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
}
encoding: {
Expand Down
38 changes: 35 additions & 3 deletions src/sinks/file/mod.rs
Expand Up @@ -11,6 +11,7 @@ use crate::{
template::Template,
};
use async_compression::tokio_02::write::GzipEncoder;
use async_compression::tokio_02::write::ZstdEncoder;
use async_trait::async_trait;
use bytes::Bytes;
use futures::{
Expand Down Expand Up @@ -68,8 +69,9 @@ pub enum Encoding {
#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub enum Compression {
Gzip,
None,
Gzip,
Zstd,
}

impl Default for Compression {
Expand All @@ -81,34 +83,39 @@ impl Default for Compression {
enum OutFile {
Regular(File),
Gzip(GzipEncoder<File>),
Zstd(ZstdEncoder<File>),
}

impl OutFile {
fn new(file: File, compression: Compression) -> Self {
match compression {
Compression::None => OutFile::Regular(file),
Compression::Gzip => OutFile::Gzip(GzipEncoder::new(file)),
Compression::Zstd => OutFile::Zstd(ZstdEncoder::new(file)),
}
}

async fn sync_all(&mut self) -> Result<(), std::io::Error> {
match self {
OutFile::Regular(file) => file.sync_all().await,
OutFile::Gzip(gzip) => gzip.get_mut().sync_all().await,
OutFile::Zstd(zstd) => zstd.get_mut().sync_all().await,
}
}

async fn shutdown(&mut self) -> Result<(), std::io::Error> {
match self {
OutFile::Regular(file) => file.shutdown().await,
OutFile::Gzip(gzip) => gzip.shutdown().await,
OutFile::Zstd(zstd) => zstd.shutdown().await,
}
}

async fn write_all(&mut self, src: &[u8]) -> Result<(), std::io::Error> {
match self {
OutFile::Regular(file) => file.write_all(src).await,
OutFile::Gzip(gzip) => gzip.write_all(src).await,
OutFile::Zstd(zstd) => zstd.write_all(src).await,
}
}

Expand Down Expand Up @@ -344,8 +351,8 @@ impl StreamSink for FileSink {
mod tests {
use super::*;
use crate::test_util::{
lines_from_file, lines_from_gzip_file, random_events_with_stream, random_lines_with_stream,
temp_dir, temp_file, trace_init,
lines_from_file, lines_from_gzip_file, lines_from_zstd_file, random_events_with_stream,
random_lines_with_stream, temp_dir, temp_file, trace_init,
};
use futures::stream;
use std::convert::TryInto;
Expand Down Expand Up @@ -405,6 +412,31 @@ mod tests {
}
}

#[tokio::test]
async fn single_partition_zstd() {
trace_init();

let template = temp_file();

let config = FileSinkConfig {
path: template.clone().try_into().unwrap(),
idle_timeout_secs: None,
encoding: Encoding::Text.into(),
compression: Compression::Zstd,
};

let mut sink = FileSink::new(&config, Acker::Null);
let (input, _) = random_lines_with_stream(100, 64);

let events = Box::pin(stream::iter(input.clone().into_iter().map(Event::from)));
sink.run(events).await.unwrap();

let output = lines_from_zstd_file(template);
for (input, output) in input.into_iter().zip(output) {
assert_eq!(input, output);
}
}

#[tokio::test]
async fn many_partitions() {
trace_init();
Expand Down
12 changes: 12 additions & 0 deletions src/test_util/mod.rs
Expand Up @@ -38,6 +38,7 @@ use tokio::{
time::{delay_for, Duration, Instant},
};
use tokio_util::codec::{Encoder, FramedRead, FramedWrite, LinesCodec};
use zstd::Decoder as ZstdDecoder;

pub mod stats;

Expand Down Expand Up @@ -284,6 +285,17 @@ pub fn lines_from_gzip_file<P: AsRef<Path>>(path: P) -> Vec<String> {
output.lines().map(|s| s.to_owned()).collect()
}

pub fn lines_from_zstd_file<P: AsRef<Path>>(path: P) -> Vec<String> {
trace!(message = "Reading zstd file.", path = %path.as_ref().display());
let file = File::open(path).unwrap();
let mut output = String::new();
ZstdDecoder::new(file)
.unwrap()
.read_to_string(&mut output)
.unwrap();
output.lines().map(|s| s.to_owned()).collect()
}

pub fn runtime() -> runtime::Runtime {
runtime::Builder::new()
.threaded_scheduler()
Expand Down

0 comments on commit ea578aa

Please sign in to comment.