Skip to content

Commit

Permalink
chore(prometheus_remote_write sink): remote write sink rewrite (vecto…
Browse files Browse the repository at this point in the history
…rdotdev#18676)

* Refactor prometheus remote write sink

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* WIP

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Made tests pass

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Use shared compression

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Clippy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Some comments

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Use HttpResponse

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Update request builder default

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Update PartitionBatcher to use BatchConfig

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Update PartitionBatcher to use BatchConfig

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Remove zorkwonk

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Make into fns as fns instead

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Spelling

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Don't box the closure

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Clippy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Only insert timeout if we add the batch to the list

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Allow the timer to remove an item

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Allow partitions to be types other than Vec

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Clippy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Added test for aggregation

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Allow a custom object to be used for the reducer

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Make aggregating optional

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Adde test for non aggregation

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Clippy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Component docs

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Feedback from Kyle and Doug

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Use generic compression options

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Default compression to Snappy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Update docs

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Add snappy to the compression docs

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Remove proptest file

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Snappy is no longer optional

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

---------

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely committed Oct 25, 2023
1 parent a1863e6 commit 5c1707f
Show file tree
Hide file tree
Showing 41 changed files with 1,597 additions and 959 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ seahash = { version = "4.1.0", default-features = false }
semver = { version = "1.0.20", default-features = false, features = ["serde", "std"], optional = true }
smallvec = { version = "1", default-features = false, features = ["union", "serde"] }
snafu = { version = "0.7.5", default-features = false, features = ["futures"] }
snap = { version = "1.1.0", default-features = false, optional = true }
snap = { version = "1.1.0", default-features = false }
socket2 = { version = "0.5.5", default-features = false }
stream-cancel = { version = "0.8.1", default-features = false }
strip-ansi-escapes = { version = "0.2.0", default-features = false }
Expand Down Expand Up @@ -560,9 +560,9 @@ sources-splunk_hec = ["dep:roaring"]
sources-statsd = ["sources-utils-net", "tokio-util/net"]
sources-stdin = ["tokio-util/io"]
sources-syslog = ["codecs-syslog", "sources-utils-net", "tokio-util/net"]
sources-utils-http = ["dep:snap", "sources-utils-http-auth", "sources-utils-http-encoding", "sources-utils-http-error", "sources-utils-http-prelude"]
sources-utils-http = ["sources-utils-http-auth", "sources-utils-http-encoding", "sources-utils-http-error", "sources-utils-http-prelude"]
sources-utils-http-auth = ["sources-utils-http-error"]
sources-utils-http-encoding = ["dep:snap", "sources-utils-http-error"]
sources-utils-http-encoding = ["sources-utils-http-error"]
sources-utils-http-error = []
sources-utils-http-prelude = ["sources-utils-http", "sources-utils-http-auth", "sources-utils-http-encoding", "sources-utils-http-error"]
sources-utils-http-query = []
Expand Down Expand Up @@ -715,7 +715,7 @@ sinks-nats = ["dep:async-nats", "dep:nkeys"]
sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-papertrail = ["dep:syslog"]
sinks-prometheus = ["dep:base64", "dep:prometheus-parser", "dep:snap"]
sinks-prometheus = ["dep:base64", "dep:prometheus-parser"]
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
sinks-redis = ["dep:redis"]
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
Expand Down
55 changes: 26 additions & 29 deletions lib/vector-stream/src/partitioned_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use vector_core::{partition::Partitioner, time::KeyedTimer, ByteSizeOf};

use crate::batcher::{
config::BatchConfigParts,
data::BatchReduce,
data::BatchData,
limiter::{ByteSizeOfItemSize, ItemBatchSize, SizeLimit},
BatchConfig,
};
Expand Down Expand Up @@ -155,16 +155,15 @@ impl BatcherSettings {
}

/// A batcher config using the `ItemBatchSize` trait to determine batch sizes.
/// The output is built with the supplied reducer function.
pub fn into_reducer_config<I, T, F, S>(
self,
/// The output is built with the supplied object implementing [`BatchData`].
pub fn as_reducer_config<I, T, B>(
&self,
item_size: I,
reducer: F,
) -> BatchConfigParts<SizeLimit<I>, BatchReduce<F, S>>
reducer: B,
) -> BatchConfigParts<SizeLimit<I>, B>
where
I: ItemBatchSize<T>,
F: FnMut(&mut S, T),
S: Default,
B: BatchData<T>,
{
BatchConfigParts {
batch_limiter: SizeLimit {
Expand All @@ -173,14 +172,14 @@ impl BatcherSettings {
current_size: 0,
item_size_calculator: item_size,
},
batch_data: BatchReduce::new(reducer),
batch_data: reducer,
timeout: self.timeout,
}
}
}

#[pin_project]
pub struct PartitionedBatcher<St, Prt, KT, C, F>
pub struct PartitionedBatcher<St, Prt, KT, C, F, B>
where
Prt: Partitioner,
{
Expand All @@ -193,7 +192,7 @@ where
/// The store of 'closed' batches. When this is not empty it will be
/// preferentially flushed prior to consuming any new items from the
/// underlying stream.
closed_batches: Vec<(Prt::Key, Vec<Prt::Item>)>,
closed_batches: Vec<(Prt::Key, B)>,
/// The queue of pending batch expirations
timer: KT,
/// The partitioner for this `Batcher`
Expand All @@ -203,7 +202,7 @@ where
stream: Fuse<St>,
}

impl<St, Prt, C, F> PartitionedBatcher<St, Prt, ExpirationQueue<Prt::Key>, C, F>
impl<St, Prt, C, F, B> PartitionedBatcher<St, Prt, ExpirationQueue<Prt::Key>, C, F, B>
where
St: Stream<Item = Prt::Item>,
Prt: Partitioner + Unpin,
Expand All @@ -226,7 +225,7 @@ where
}

#[cfg(test)]
impl<St, Prt, KT, C, F> PartitionedBatcher<St, Prt, KT, C, F>
impl<St, Prt, KT, C, F, B> PartitionedBatcher<St, Prt, KT, C, F, B>
where
St: Stream<Item = Prt::Item>,
Prt: Partitioner + Unpin,
Expand All @@ -247,17 +246,17 @@ where
}
}

impl<St, Prt, KT, C, F> Stream for PartitionedBatcher<St, Prt, KT, C, F>
impl<St, Prt, KT, C, F, B> Stream for PartitionedBatcher<St, Prt, KT, C, F, B>
where
St: Stream<Item = Prt::Item>,
Prt: Partitioner + Unpin,
Prt::Key: Eq + Hash + Clone,
Prt::Item: ByteSizeOf,
KT: KeyedTimer<Prt::Key>,
C: BatchConfig<Prt::Item, Batch = Vec<Prt::Item>>,
C: BatchConfig<Prt::Item, Batch = B>,
F: Fn() -> C + Send,
{
type Item = (Prt::Key, Vec<Prt::Item>);
type Item = (Prt::Key, B);

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
Expand All @@ -270,20 +269,18 @@ where
return Poll::Ready(this.closed_batches.pop());
}
match this.stream.as_mut().poll_next(cx) {
Poll::Pending => {
match this.timer.poll_expired(cx) {
// Unlike normal streams, `DelayQueue` can return `None`
// here but still be usable later if more entries are added.
Poll::Pending | Poll::Ready(None) => return Poll::Pending,
Poll::Ready(Some(item_key)) => {
let mut batch = this
.batches
.remove(&item_key)
.expect("batch should exist if it is set to expire");
this.closed_batches.push((item_key, batch.take_batch()));
}
Poll::Pending => match this.timer.poll_expired(cx) {
// Unlike normal streams, `DelayQueue` can return `None`
// here but still be usable later if more entries are added.
Poll::Pending | Poll::Ready(None) => return Poll::Pending,
Poll::Ready(Some(item_key)) => {
let mut batch = this
.batches
.remove(&item_key)
.expect("batch should exist if it is set to expire");
this.closed_batches.push((item_key, batch.take_batch()));
}
}
},
Poll::Ready(None) => {
// Now that the underlying stream is closed, we need to
// clear out our batches, including all expiration
Expand Down
Loading

0 comments on commit 5c1707f

Please sign in to comment.