Skip to content

Commit

Permalink
fix(loki sink): update to use the global list of compression algorith…
Browse files Browse the repository at this point in the history
…ms (vectordotdev#19099)

* Update to use the global list of compression algorithms

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Mention snappy compression using protocol buffers

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Snappy is compressed separately

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Rely on framework compression for Snappy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Update encoder test

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

---------

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely authored and jszwedko committed Nov 15, 2023
1 parent a59329a commit 218963a
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 66 deletions.
12 changes: 5 additions & 7 deletions lib/loki-logproto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ pub mod util {
let streams: Vec<logproto::StreamAdapter> =
self.0.into_iter().map(|stream| stream.into()).collect();
let push_request = logproto::PushRequest { streams };
let buf = push_request.encode_to_vec();
let mut encoder = snap::raw::Encoder::new();
encoder.compress_vec(&buf).expect("out of memory")
push_request.encode_to_vec()
}
}

Expand Down Expand Up @@ -121,10 +119,10 @@ mod tests {
let batch = Batch(vec![Stream(labels, vec![entry1, entry2])]);
// generated by test codes from promtail
let expect = vec![
62, 176, 10, 60, 10, 24, 123, 115, 111, 117, 114, 99, 101, 61, 34, 112, 114, 111, 116,
111, 98, 117, 102, 45, 116, 101, 115, 116, 34, 125, 18, 15, 10, 6, 8, 182, 204, 144,
142, 6, 18, 5, 104, 101, 108, 108, 111, 5, 17, 44, 183, 204, 144, 142, 6, 18, 5, 119,
111, 114, 108, 100,
10, 60, 10, 24, 123, 115, 111, 117, 114, 99, 101, 61, 34, 112, 114, 111, 116, 111, 98,
117, 102, 45, 116, 101, 115, 116, 34, 125, 18, 15, 10, 6, 8, 182, 204, 144, 142, 6, 18,
5, 104, 101, 108, 108, 111, 18, 15, 10, 6, 8, 183, 204, 144, 142, 6, 18, 5, 119, 111,
114, 108, 100,
];
let buf = batch.encode();
assert_eq!(expect, buf);
Expand Down
45 changes: 6 additions & 39 deletions src/sinks/loki/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,8 @@ use crate::{
sinks::{prelude::*, util::UriSerde},
};

/// Loki-specific compression.
#[configurable_component]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ExtendedCompression {
/// Snappy compression.
///
/// This implies sending push requests as Protocol Buffers.
#[serde(rename = "snappy")]
Snappy,
}

/// Compression configuration.
#[configurable_component]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[serde(untagged)]
pub enum CompressionConfigAdapter {
/// Basic compression.
Original(Compression),

/// Loki-specific compression.
Extended(ExtendedCompression),
}

impl CompressionConfigAdapter {
pub const fn content_encoding(self) -> Option<&'static str> {
match self {
CompressionConfigAdapter::Original(compression) => compression.content_encoding(),
CompressionConfigAdapter::Extended(_) => Some("snappy"),
}
}
}

impl Default for CompressionConfigAdapter {
fn default() -> Self {
CompressionConfigAdapter::Extended(ExtendedCompression::Snappy)
}
const fn default_compression() -> Compression {
Compression::Snappy
}

fn default_loki_path() -> String {
Expand Down Expand Up @@ -106,9 +72,10 @@ pub struct LokiConfig {
#[serde(default = "crate::serde::default_true")]
pub remove_timestamp: bool,

#[configurable(derived)]
#[serde(default)]
pub compression: CompressionConfigAdapter,
/// Compression configuration.
/// Snappy compression implies sending push requests as Protocol Buffers.
#[serde(default = "default_compression")]
pub compression: Compression,

#[configurable(derived)]
#[serde(default)]
Expand Down
9 changes: 3 additions & 6 deletions src/sinks/loki/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use http::StatusCode;
use snafu::Snafu;
use tracing::Instrument;

use crate::sinks::loki::config::{CompressionConfigAdapter, ExtendedCompression};
use crate::{
http::{Auth, HttpClient},
sinks::{prelude::*, util::UriSerde},
Expand Down Expand Up @@ -60,7 +59,7 @@ impl DriverResponse for LokiResponse {

#[derive(Clone)]
pub struct LokiRequest {
pub compression: CompressionConfigAdapter,
pub compression: Compression,
pub finalizers: EventFinalizers,
pub payload: Bytes,
pub tenant_id: Option<String>,
Expand Down Expand Up @@ -113,10 +112,8 @@ impl Service<LokiRequest> for LokiService {

fn call(&mut self, request: LokiRequest) -> Self::Future {
let content_type = match request.compression {
CompressionConfigAdapter::Original(_) => "application/json",
CompressionConfigAdapter::Extended(ExtendedCompression::Snappy) => {
"application/x-protobuf"
}
Compression::Snappy => "application/x-protobuf",
_ => "application/json",
};
let mut req = http::Request::post(&self.endpoint.uri).header("Content-Type", content_type);

Expand Down
14 changes: 4 additions & 10 deletions src/sinks/loki/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use super::{
event::{LokiBatchEncoder, LokiEvent, LokiRecord, PartitionKey},
service::{LokiRequest, LokiRetryLogic, LokiService},
};
use crate::sinks::loki::config::{CompressionConfigAdapter, ExtendedCompression};
use crate::sinks::loki::event::LokiBatchEncoding;
use crate::{
http::{get_http_scheme_from_uri, HttpClient},
Expand Down Expand Up @@ -65,7 +64,7 @@ impl Partitioner for RecordPartitioner {

#[derive(Clone)]
pub struct LokiRequestBuilder {
compression: CompressionConfigAdapter,
compression: Compression,
encoder: LokiBatchEncoder,
}

Expand All @@ -92,10 +91,7 @@ impl RequestBuilder<(PartitionKey, Vec<LokiRecord>)> for LokiRequestBuilder {
type Error = RequestBuildError;

fn compression(&self) -> Compression {
match self.compression {
CompressionConfigAdapter::Original(compression) => compression,
CompressionConfigAdapter::Extended(_) => Compression::None,
}
self.compression
}

fn encoder(&self) -> &Self::Encoder {
Expand Down Expand Up @@ -415,10 +411,8 @@ impl LokiSink {
let serializer = config.encoding.build()?;
let encoder = Encoder::<()>::new(serializer);
let batch_encoder = match config.compression {
CompressionConfigAdapter::Original(_) => LokiBatchEncoder(LokiBatchEncoding::Json),
CompressionConfigAdapter::Extended(ExtendedCompression::Snappy) => {
LokiBatchEncoder(LokiBatchEncoding::Protobuf)
}
Compression::Snappy => LokiBatchEncoder(LokiBatchEncoding::Protobuf),
_ => LokiBatchEncoder(LokiBatchEncoding::Json),
};

Ok(Self {
Expand Down
11 changes: 7 additions & 4 deletions website/cue/reference/components/sinks/base/loki.cue
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,11 @@ base: components: sinks: loki: configuration: {
}
}
compression: {
description: "Compression configuration."
required: false
description: """
Compression configuration.
Snappy compression implies sending push requests as Protocol Buffers.
"""
required: false
type: string: {
default: "snappy"
enum: {
Expand All @@ -122,9 +125,9 @@ base: components: sinks: loki: configuration: {
"""
none: "No compression."
snappy: """
Snappy compression.
[Snappy][snappy] compression.
This implies sending push requests as Protocol Buffers.
[snappy]: https://github.com/google/snappy/blob/main/docs/README.md
"""
zlib: """
[Zlib][zlib] compression.
Expand Down

0 comments on commit 218963a

Please sign in to comment.