Skip to content

Commit

Permalink
fix(buffers): Actually read 100 events at a time (vectordotdev#8541)
Browse files Browse the repository at this point in the history
Works around skade/leveldb#43

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: dbcfd <bdbrowning2@gmail.com>
  • Loading branch information
jszwedko authored and dbcfd committed Aug 18, 2021
1 parent 23ea521 commit ced7ea6
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions lib/vector-core/buffers/src/disk/leveldb_buffer/reader.rs
Expand Up @@ -71,7 +71,7 @@ where
/// Sizes in bytes of read, not acked/deleted, events.
pub(crate) unacked_sizes: VecDeque<usize>,
/// Buffer for internal use.
pub(crate) buffer: VecDeque<Vec<u8>>,
pub(crate) buffer: VecDeque<(Key, Vec<u8>)>,
/// Limit on uncompacted_size after which we trigger compaction.
pub(crate) max_uncompacted_size: usize,
/// Last time that compaction was triggered.
Expand Down Expand Up @@ -111,16 +111,16 @@ where
tokio::task::block_in_place(|| {
this.buffer.extend(
this.db
.value_iter(ReadOptions::new())
.iter(ReadOptions::new())
.from(&Key(this.read_offset))
.to(&Key(this.read_offset + 100)),
.take(100),
);
});
}

if let Some(value) = this.buffer.pop_front() {
if let Some((key, value)) = this.buffer.pop_front() {
this.unacked_sizes.push_back(value.len());
this.read_offset += 1;
this.read_offset = key.0;

let buffer: Bytes = Bytes::from(value);
match T::decode(buffer) {
Expand Down

0 comments on commit ced7ea6

Please sign in to comment.