Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 53 additions & 13 deletions src/segment/block/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,19 @@ impl<'a, Item: Decodable<Parsed>, Parsed: ParsedItem<Item>> Decoder<'a, Item, Pa
Item::parse_restart_key(&mut cursor, pos, bytes).expect("should exist")
}

fn partition_point(
&self,
pred: impl Fn(&[u8]) -> bool,
) -> Option<(/* offset */ usize, /* idx */ usize)> {
fn partition_point<F>(&self, pred: F) -> Option<(/* offset */ usize, /* idx */ usize)>
where
F: Fn(&[u8]) -> bool,
{
// The first pass over the binary index emulates `Iterator::partition_point` over the
// restart heads that are in natural key order. We keep track of both the byte offset and
// the restart index because callers need the offset to seed the linear scanner, while the
// index is sometimes reused (for example by `seek_upper`).
//
// In contrast to the usual `partition_point`, we intentionally return the *last* restart
// entry when the predicate continues to hold for every head key. Forward scans rely on
// this behaviour to land on the final restart interval and resume the linear scan there
// instead of erroneously reporting "not found".
let binary_index = self.get_binary_index_reader();

debug_assert!(
Expand Down Expand Up @@ -198,16 +207,27 @@ impl<'a, Item: Decodable<Parsed>, Parsed: ParsedItem<Item>> Decoder<'a, Item, Pa
return Some((0, 0));
}

if left == binary_index.len() {
let idx = binary_index.len() - 1;
let offset = binary_index.get(idx);
return Some((offset, idx));
}

let offset = binary_index.get(left - 1);

Some((offset, left - 1))
}

// TODO:
fn partition_point_2(
&self,
pred: impl Fn(&[u8]) -> bool,
) -> Option<(/* offset */ usize, /* idx */ usize)> {
fn partition_point_2<F>(&self, pred: F) -> Option<(/* offset */ usize, /* idx */ usize)>
where
F: Fn(&[u8]) -> bool,
{
// `partition_point_2` mirrors `partition_point` but keeps the *next* restart entry instead
// of the previous one. This variant is used exclusively by reverse scans (`seek_upper`)
// that want the first restart whose head key exceeds the predicate. Returning the raw
// offset preserves the ability to reuse linear scanning infrastructure without duplicating
// decoder logic.
let binary_index = self.get_binary_index_reader();

debug_assert!(
Expand Down Expand Up @@ -257,16 +277,36 @@ impl<'a, Item: Decodable<Parsed>, Parsed: ParsedItem<Item>> Decoder<'a, Item, Pa
pub fn seek(&mut self, pred: impl Fn(&[u8]) -> bool, second_partition: bool) -> bool {
// TODO: make this nicer, maybe predicate that can affect the resulting index...?
let result = if second_partition {
self.partition_point_2(pred)
self.partition_point_2(&pred)
} else {
self.partition_point(pred)
self.partition_point(&pred)
};

// Binary index lookup
let Some((offset, _)) = result else {
return false;
};

if second_partition && self.restart_interval == 1 && pred(self.get_key_at(offset)) {
// `second_partition == true` means we ran the "look one restart ahead" search used by
// index blocks. When the predicate is still true at the chosen restart head it means
// the caller asked us to seek strictly beyond the last entry. In that case we skip any
// costly parsing and flip both scanners into an "exhausted" state so the outer iterator
// immediately reports EOF.
let end = self.block.data.len();

self.lo_scanner.offset = end;
self.lo_scanner.remaining_in_interval = 0;
self.lo_scanner.base_key_offset = None;

self.hi_scanner.offset = end;
self.hi_scanner.ptr_idx = usize::MAX;
self.hi_scanner.stack.clear();
self.hi_scanner.base_key_offset = Some(0);

return false;
}

self.lo_scanner.offset = offset;

true
Expand All @@ -275,11 +315,11 @@ impl<'a, Item: Decodable<Parsed>, Parsed: ParsedItem<Item>> Decoder<'a, Item, Pa
/// Seeks the upper bound using the given predicate.
///
/// Returns `false` if the key does not possible exist.
pub fn seek_upper(&mut self, pred: impl Fn(&[u8]) -> bool, second_partition: bool) -> bool {
pub fn seek_upper(&mut self, mut pred: impl Fn(&[u8]) -> bool, second_partition: bool) -> bool {
let result = if second_partition {
self.partition_point_2(pred)
self.partition_point_2(&pred)
} else {
self.partition_point(pred)
self.partition_point(&pred)
};

// Binary index lookup
Expand Down
36 changes: 32 additions & 4 deletions src/segment/index_block/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,7 @@ mod tests {
Ok(())
}

// TODO: seek and seek_upper need separate binary search routines...?
// TODO: because seeking in [a,b,c] to e should return None for seek,
// TODO: but not for seek_upper
#[test]
#[ignore = "should not seek"]
fn v3_index_block_iter_too_far() -> crate::Result<()> {
let items = [
KeyedBlockHandle::new(b"b".into(), BlockOffset(0), 6_000),
Expand Down Expand Up @@ -309,6 +305,38 @@ mod tests {
Ok(())
}

#[test]
fn v3_index_block_iter_too_far_next_back() -> crate::Result<()> {
let items = [
KeyedBlockHandle::new(b"b".into(), BlockOffset(0), 6_000),
KeyedBlockHandle::new(b"bcdef".into(), BlockOffset(6_000), 7_000),
KeyedBlockHandle::new(b"def".into(), BlockOffset(13_000), 5_000),
];

let bytes = IndexBlock::encode_into_vec(&items)?;

let index_block = IndexBlock::new(Block {
data: bytes.into(),
header: Header {
block_type: BlockType::Index,
checksum: Checksum::from_raw(0),
data_length: 0,
uncompressed_length: 0,
},
});

let mut iter = index_block.iter();
assert!(!iter.seek(b"zzz"), "should not seek");

assert!(iter.next().is_none(), "iterator should be exhausted");
assert!(
iter.next_back().is_none(),
"reverse iterator should also be exhausted"
);

Ok(())
}

#[test]
fn v3_index_block_iter_span() -> crate::Result<()> {
let items = [
Expand Down
87 changes: 70 additions & 17 deletions src/segment/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,39 +130,63 @@ impl Iterator for Iter {
type Item = crate::Result<InternalValue>;

fn next(&mut self) -> Option<Self::Item> {
// Always try to keep iterating inside the already-materialized low data block first; this
// lets callers consume multiple entries without touching the index or cache again.
if let Some(block) = &mut self.lo_data_block {
if let Some(item) = block.next().map(Ok) {
return Some(item);
}
}

if !self.index_initialized {
// The index iterator is lazy-initialized on the first call so that the constructor does
// not eagerly seek. This is important because range bounds might be configured *after*
// `Iter::new`, and we only want to pay the seek cost if iteration actually happens.
let mut ok = true;

if let Some(key) = &self.range.0 {
self.index_iter.seek_lower(key);
// Seek to the first block whose end key is ≥ lower bound. If this fails we can
// immediately conclude the range is empty.
ok = self.index_iter.seek_lower(key);
}
if let Some(key) = &self.range.1 {
self.index_iter.seek_upper(key);

if ok {
if let Some(key) = &self.range.1 {
// Narrow the iterator further by skipping any blocks strictly above the upper
// bound. Again, a miss means the range is empty.
ok = self.index_iter.seek_upper(key);
}
}

self.index_initialized = true;

if !ok {
// No block in the index overlaps the requested window, so we clear state and return
// EOF without attempting to touch any data blocks.
self.lo_data_block = None;
self.hi_data_block = None;
return None;
}
}

loop {
let Some(handle) = self.index_iter.next() else {
// NOTE: No more block handles from index,
// Now check hi buffer if it exists
// No more block handles coming from the index. Flush any pending items buffered on
// the high side (used by reverse iteration) before signalling completion.
if let Some(block) = &mut self.hi_data_block {
if let Some(item) = block.next().map(Ok) {
return Some(item);
}
}

// NOTE: If there is no more item, we are done
// Nothing left to serve; drop both buffers so the iterator can be reused safely.
self.lo_data_block = None;
self.hi_data_block = None;
return None;
};

// NOTE: Load next lo block
// Load the next data block referenced by the index handle. We try the shared block
// cache first to avoid hitting the filesystem, and fall back to `load_block` on miss.
#[allow(clippy::single_match_else)]
let block = match self.cache.get_block(self.segment_id, handle.offset()) {
Some(block) => block,
Expand All @@ -185,9 +209,12 @@ impl Iterator for Iter {
let mut reader = create_data_block_reader(block);

if let Some(key) = &self.range.0 {
// Each block is self-contained, so we have to apply range bounds again to discard
// entries that precede the requested lower key.
reader.seek_lower(key, SeqNo::MAX);
}
if let Some(key) = &self.range.1 {
// Ditto for the upper bound: advance the block-local iterator to the right spot.
reader.seek_upper(key, SeqNo::MAX);
}

Expand All @@ -197,6 +224,8 @@ impl Iterator for Iter {
self.lo_data_block = Some(reader);

if let Some(item) = item {
// Serving the first item immediately avoids stashing it in a temporary buffer and
// keeps block iteration semantics identical to the simple case at the top.
return Some(Ok(item));
}
}
Expand All @@ -205,39 +234,57 @@ impl Iterator for Iter {

impl DoubleEndedIterator for Iter {
fn next_back(&mut self) -> Option<Self::Item> {
// Mirror the forward iterator: prefer consuming buffered items from the high data block to
// avoid touching the index once a block has been materialized.
if let Some(block) = &mut self.hi_data_block {
if let Some(item) = block.next_back().map(Ok) {
return Some(item);
}
}

if !self.index_initialized {
// As in `next`, set up the index iterator lazily so that callers can configure range
// bounds before we spend time seeking or loading blocks.
let mut ok = true;

if let Some(key) = &self.range.0 {
self.index_iter.seek_lower(key);
ok = self.index_iter.seek_lower(key);
}
if let Some(key) = &self.range.1 {
self.index_iter.seek_upper(key);

if ok {
if let Some(key) = &self.range.1 {
ok = self.index_iter.seek_upper(key);
}
}

self.index_initialized = true;

if !ok {
// No index span overlaps the requested window; clear both buffers and finish early.
self.lo_data_block = None;
self.hi_data_block = None;
return None;
}
}

loop {
let Some(handle) = self.index_iter.next_back() else {
// NOTE: No more block handles from index,
// Now check lo buffer if it exists
// Once we exhaust the index in reverse order, flush any items that were buffered on
// the low side (set when iterating forward first) before signalling completion.
if let Some(block) = &mut self.lo_data_block {
if let Some(item) = block.next_back().map(Ok) {
return Some(item);
}
}

// NOTE: If there is no more item, we are done
// Nothing left to produce; reset both buffers to keep the iterator reusable.
self.lo_data_block = None;
self.hi_data_block = None;
return None;
};

// NOTE: Load next hi block
// Retrieve the next data block from the cache (or disk on miss) so the high-side reader
// can serve entries in reverse order.
#[allow(clippy::single_match_else)]
let block = match self.cache.get_block(self.segment_id, handle.offset()) {
Some(block) => block,
Expand All @@ -259,19 +306,25 @@ impl DoubleEndedIterator for Iter {

let mut reader = create_data_block_reader(block);

if let Some(key) = &self.range.0 {
reader.seek_lower(key, SeqNo::MAX);
}
if let Some(key) = &self.range.1 {
// Reverse iteration needs to clamp the upper bound first so that `next_back` only
// sees entries ≤ the requested high key.
reader.seek_upper(key, SeqNo::MAX);
}
if let Some(key) = &self.range.0 {
// Apply the lower bound as well so that we never step past the low key when
// iterating backwards through the block.
reader.seek_lower(key, SeqNo::MAX);
}

let item = reader.next_back();

self.hi_offset = handle.offset();
self.hi_data_block = Some(reader);

if let Some(item) = item {
// Emit the first materialized entry immediately to match the forward path and avoid
// storing it in a temporary buffer.
return Some(Ok(item));
}
}
Expand Down