diff --git a/src/segment/block/decoder.rs b/src/segment/block/decoder.rs index 59bc74a5..1f2c9d51 100644 --- a/src/segment/block/decoder.rs +++ b/src/segment/block/decoder.rs @@ -162,10 +162,19 @@ impl<'a, Item: Decodable, Parsed: ParsedItem> Decoder<'a, Item, Pa Item::parse_restart_key(&mut cursor, pos, bytes).expect("should exist") } - fn partition_point( - &self, - pred: impl Fn(&[u8]) -> bool, - ) -> Option<(/* offset */ usize, /* idx */ usize)> { + fn partition_point(&self, pred: F) -> Option<(/* offset */ usize, /* idx */ usize)> + where + F: Fn(&[u8]) -> bool, + { + // The first pass over the binary index emulates `Iterator::partition_point` over the + // restart heads that are in natural key order. We keep track of both the byte offset and + // the restart index because callers need the offset to seed the linear scanner, while the + // index is sometimes reused (for example by `seek_upper`). + // + // In contrast to the usual `partition_point`, we intentionally return the *last* restart + // entry when the predicate continues to hold for every head key. Forward scans rely on + // this behaviour to land on the final restart interval and resume the linear scan there + // instead of erroneously reporting "not found". let binary_index = self.get_binary_index_reader(); debug_assert!( @@ -198,16 +207,27 @@ impl<'a, Item: Decodable, Parsed: ParsedItem> Decoder<'a, Item, Pa return Some((0, 0)); } + if left == binary_index.len() { + let idx = binary_index.len() - 1; + let offset = binary_index.get(idx); + return Some((offset, idx)); + } + let offset = binary_index.get(left - 1); Some((offset, left - 1)) } // TODO: - fn partition_point_2( - &self, - pred: impl Fn(&[u8]) -> bool, - ) -> Option<(/* offset */ usize, /* idx */ usize)> { + fn partition_point_2(&self, pred: F) -> Option<(/* offset */ usize, /* idx */ usize)> + where + F: Fn(&[u8]) -> bool, + { + // `partition_point_2` mirrors `partition_point` but keeps the *next* restart entry instead + // of the previous one. This variant is used exclusively by reverse scans (`seek_upper`) + // that want the first restart whose head key exceeds the predicate. Returning the raw + // offset preserves the ability to reuse linear scanning infrastructure without duplicating + // decoder logic. let binary_index = self.get_binary_index_reader(); debug_assert!( @@ -257,9 +277,9 @@ impl<'a, Item: Decodable, Parsed: ParsedItem> Decoder<'a, Item, Pa pub fn seek(&mut self, pred: impl Fn(&[u8]) -> bool, second_partition: bool) -> bool { // TODO: make this nicer, maybe predicate that can affect the resulting index...? let result = if second_partition { - self.partition_point_2(pred) + self.partition_point_2(&pred) } else { - self.partition_point(pred) + self.partition_point(&pred) }; // Binary index lookup @@ -267,6 +287,26 @@ impl<'a, Item: Decodable, Parsed: ParsedItem> Decoder<'a, Item, Pa return false; }; + if second_partition && self.restart_interval == 1 && pred(self.get_key_at(offset)) { + // `second_partition == true` means we ran the "look one restart ahead" search used by + // index blocks. When the predicate is still true at the chosen restart head it means + // the caller asked us to seek strictly beyond the last entry. In that case we skip any + // costly parsing and flip both scanners into an "exhausted" state so the outer iterator + // immediately reports EOF. + let end = self.block.data.len(); + + self.lo_scanner.offset = end; + self.lo_scanner.remaining_in_interval = 0; + self.lo_scanner.base_key_offset = None; + + self.hi_scanner.offset = end; + self.hi_scanner.ptr_idx = usize::MAX; + self.hi_scanner.stack.clear(); + self.hi_scanner.base_key_offset = Some(0); + + return false; + } + self.lo_scanner.offset = offset; true @@ -275,11 +315,11 @@ impl<'a, Item: Decodable, Parsed: ParsedItem> Decoder<'a, Item, Pa /// Seeks the upper bound using the given predicate. /// /// Returns `false` if the key does not possible exist. - pub fn seek_upper(&mut self, pred: impl Fn(&[u8]) -> bool, second_partition: bool) -> bool { + pub fn seek_upper(&mut self, mut pred: impl Fn(&[u8]) -> bool, second_partition: bool) -> bool { let result = if second_partition { - self.partition_point_2(pred) + self.partition_point_2(&pred) } else { - self.partition_point(pred) + self.partition_point(&pred) }; // Binary index lookup diff --git a/src/segment/index_block/iter.rs b/src/segment/index_block/iter.rs index f4da5681..e12680ed 100644 --- a/src/segment/index_block/iter.rs +++ b/src/segment/index_block/iter.rs @@ -271,11 +271,7 @@ mod tests { Ok(()) } - // TODO: seek and seek_upper need separate binary search routines...? - // TODO: because seeking in [a,b,c] to e should return None for seek, - // TODO: but not for seek_upper #[test] - #[ignore = "should not seek"] fn v3_index_block_iter_too_far() -> crate::Result<()> { let items = [ KeyedBlockHandle::new(b"b".into(), BlockOffset(0), 6_000), @@ -309,6 +305,38 @@ mod tests { Ok(()) } + #[test] + fn v3_index_block_iter_too_far_next_back() -> crate::Result<()> { + let items = [ + KeyedBlockHandle::new(b"b".into(), BlockOffset(0), 6_000), + KeyedBlockHandle::new(b"bcdef".into(), BlockOffset(6_000), 7_000), + KeyedBlockHandle::new(b"def".into(), BlockOffset(13_000), 5_000), + ]; + + let bytes = IndexBlock::encode_into_vec(&items)?; + + let index_block = IndexBlock::new(Block { + data: bytes.into(), + header: Header { + block_type: BlockType::Index, + checksum: Checksum::from_raw(0), + data_length: 0, + uncompressed_length: 0, + }, + }); + + let mut iter = index_block.iter(); + assert!(!iter.seek(b"zzz"), "should not seek"); + + assert!(iter.next().is_none(), "iterator should be exhausted"); + assert!( + iter.next_back().is_none(), + "reverse iterator should also be exhausted" + ); + + Ok(()) + } + #[test] fn v3_index_block_iter_span() -> crate::Result<()> { let items = [ diff --git a/src/segment/iter.rs b/src/segment/iter.rs index 66ac0a3d..dead432f 100644 --- a/src/segment/iter.rs +++ b/src/segment/iter.rs @@ -130,6 +130,8 @@ impl Iterator for Iter { type Item = crate::Result; fn next(&mut self) -> Option { + // Always try to keep iterating inside the already-materialized low data block first; this + // lets callers consume multiple entries without touching the index or cache again. if let Some(block) = &mut self.lo_data_block { if let Some(item) = block.next().map(Ok) { return Some(item); @@ -137,32 +139,54 @@ impl Iterator for Iter { } if !self.index_initialized { + // The index iterator is lazy-initialized on the first call so that the constructor does + // not eagerly seek. This is important because range bounds might be configured *after* + // `Iter::new`, and we only want to pay the seek cost if iteration actually happens. + let mut ok = true; + if let Some(key) = &self.range.0 { - self.index_iter.seek_lower(key); + // Seek to the first block whose end key is ≥ lower bound. If this fails we can + // immediately conclude the range is empty. + ok = self.index_iter.seek_lower(key); } - if let Some(key) = &self.range.1 { - self.index_iter.seek_upper(key); + + if ok { + if let Some(key) = &self.range.1 { + // Narrow the iterator further by skipping any blocks strictly above the upper + // bound. Again, a miss means the range is empty. + ok = self.index_iter.seek_upper(key); + } } + self.index_initialized = true; + + if !ok { + // No block in the index overlaps the requested window, so we clear state and return + // EOF without attempting to touch any data blocks. + self.lo_data_block = None; + self.hi_data_block = None; + return None; + } } loop { let Some(handle) = self.index_iter.next() else { - // NOTE: No more block handles from index, - // Now check hi buffer if it exists + // No more block handles coming from the index. Flush any pending items buffered on + // the high side (used by reverse iteration) before signalling completion. if let Some(block) = &mut self.hi_data_block { if let Some(item) = block.next().map(Ok) { return Some(item); } } - // NOTE: If there is no more item, we are done + // Nothing left to serve; drop both buffers so the iterator can be reused safely. self.lo_data_block = None; self.hi_data_block = None; return None; }; - // NOTE: Load next lo block + // Load the next data block referenced by the index handle. We try the shared block + // cache first to avoid hitting the filesystem, and fall back to `load_block` on miss. #[allow(clippy::single_match_else)] let block = match self.cache.get_block(self.segment_id, handle.offset()) { Some(block) => block, @@ -185,9 +209,12 @@ impl Iterator for Iter { let mut reader = create_data_block_reader(block); if let Some(key) = &self.range.0 { + // Each block is self-contained, so we have to apply range bounds again to discard + // entries that precede the requested lower key. reader.seek_lower(key, SeqNo::MAX); } if let Some(key) = &self.range.1 { + // Ditto for the upper bound: advance the block-local iterator to the right spot. reader.seek_upper(key, SeqNo::MAX); } @@ -197,6 +224,8 @@ impl Iterator for Iter { self.lo_data_block = Some(reader); if let Some(item) = item { + // Serving the first item immediately avoids stashing it in a temporary buffer and + // keeps block iteration semantics identical to the simple case at the top. return Some(Ok(item)); } } @@ -205,6 +234,8 @@ impl Iterator for Iter { impl DoubleEndedIterator for Iter { fn next_back(&mut self) -> Option { + // Mirror the forward iterator: prefer consuming buffered items from the high data block to + // avoid touching the index once a block has been materialized. if let Some(block) = &mut self.hi_data_block { if let Some(item) = block.next_back().map(Ok) { return Some(item); @@ -212,32 +243,48 @@ impl DoubleEndedIterator for Iter { } if !self.index_initialized { + // As in `next`, set up the index iterator lazily so that callers can configure range + // bounds before we spend time seeking or loading blocks. + let mut ok = true; + if let Some(key) = &self.range.0 { - self.index_iter.seek_lower(key); + ok = self.index_iter.seek_lower(key); } - if let Some(key) = &self.range.1 { - self.index_iter.seek_upper(key); + + if ok { + if let Some(key) = &self.range.1 { + ok = self.index_iter.seek_upper(key); + } } + self.index_initialized = true; + + if !ok { + // No index span overlaps the requested window; clear both buffers and finish early. + self.lo_data_block = None; + self.hi_data_block = None; + return None; + } } loop { let Some(handle) = self.index_iter.next_back() else { - // NOTE: No more block handles from index, - // Now check lo buffer if it exists + // Once we exhaust the index in reverse order, flush any items that were buffered on + // the low side (set when iterating forward first) before signalling completion. if let Some(block) = &mut self.lo_data_block { if let Some(item) = block.next_back().map(Ok) { return Some(item); } } - // NOTE: If there is no more item, we are done + // Nothing left to produce; reset both buffers to keep the iterator reusable. self.lo_data_block = None; self.hi_data_block = None; return None; }; - // NOTE: Load next hi block + // Retrieve the next data block from the cache (or disk on miss) so the high-side reader + // can serve entries in reverse order. #[allow(clippy::single_match_else)] let block = match self.cache.get_block(self.segment_id, handle.offset()) { Some(block) => block, @@ -259,12 +306,16 @@ impl DoubleEndedIterator for Iter { let mut reader = create_data_block_reader(block); - if let Some(key) = &self.range.0 { - reader.seek_lower(key, SeqNo::MAX); - } if let Some(key) = &self.range.1 { + // Reverse iteration needs to clamp the upper bound first so that `next_back` only + // sees entries ≤ the requested high key. reader.seek_upper(key, SeqNo::MAX); } + if let Some(key) = &self.range.0 { + // Apply the lower bound as well so that we never step past the low key when + // iterating backwards through the block. + reader.seek_lower(key, SeqNo::MAX); + } let item = reader.next_back(); @@ -272,6 +323,8 @@ impl DoubleEndedIterator for Iter { self.hi_data_block = Some(reader); if let Some(item) = item { + // Emit the first materialized entry immediately to match the forward path and avoid + // storing it in a temporary buffer. return Some(Ok(item)); } }