Skip to content

Commit

Permalink
refactor: Make BatchIterator supertrait of Iterator (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Jul 14, 2022
1 parent 81790b4 commit 09fe717
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 18 deletions.
4 changes: 2 additions & 2 deletions src/storage/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ impl ChunkReader for ChunkReaderImpl {
}

async fn next_chunk(&mut self) -> Result<Option<Chunk>> {
let mut batch = match self.iter.next()? {
Some(b) => b,
let mut batch = match self.iter.next() {
Some(b) => b?,
None => return Ok(None),
};

Expand Down
9 changes: 1 addition & 8 deletions src/storage/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,13 @@ pub struct Batch {
pub values: Vec<VectorRef>,
}

// TODO(yingwen): [flush] Let BatchIterator be a supertrait of Iterator.
/// Iterator of memtable.
pub trait BatchIterator: Send + Sync {
pub trait BatchIterator: Iterator<Item = Result<Batch>> + Send + Sync {
/// Returns the schema of this iterator.
fn schema(&self) -> &MemtableSchema;

/// Returns the ordering of the output rows from this iterator.
fn ordering(&self) -> RowOrdering;

/// Fetch next batch from the memtable.
///
/// # Panics
/// Panics if the iterator has already been exhausted.
fn next(&mut self) -> Result<Option<Batch>>;
}

pub type BatchIteratorPtr = Box<dyn BatchIterator>;
Expand Down
8 changes: 6 additions & 2 deletions src/storage/src/memtable/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,13 @@ impl BatchIterator for BTreeIterator {
fn ordering(&self) -> RowOrdering {
RowOrdering::Key
}
}

impl Iterator for BTreeIterator {
type Item = Result<Batch>;

fn next(&mut self) -> Result<Option<Batch>> {
Ok(self.next_batch())
fn next(&mut self) -> Option<Result<Batch>> {
self.next_batch().map(Ok)
}
}

Expand Down
34 changes: 30 additions & 4 deletions src/storage/src/memtable/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ fn check_iter_content(
values: &[Option<u64>],
) {
let mut index = 0;
while let Some(batch) = iter.next().unwrap() {
for batch in iter {
let batch = batch.unwrap();
check_batch_valid(&batch);

let row_num = batch.keys[0].len();
Expand Down Expand Up @@ -177,7 +178,9 @@ struct TestContext {
fn write_iter_memtable_case(ctx: &TestContext) {
// Test iterating an empty memtable.
let mut iter = ctx.memtable.iter(IterContext::default()).unwrap();
assert!(iter.next().unwrap().is_none());
assert!(iter.next().is_none());
// Poll the empty iterator again.
assert!(iter.next().is_none());
assert_eq!(0, ctx.memtable.bytes_allocated());

// Init test data.
Expand Down Expand Up @@ -265,7 +268,8 @@ fn test_write_iter_memtable() {

fn check_iter_batch_size(iter: &mut dyn BatchIterator, total: usize, batch_size: usize) {
let mut remains = total;
while let Some(batch) = iter.next().unwrap() {
for batch in iter {
let batch = batch.unwrap();
check_batch_valid(&batch);

let row_num = batch.keys[0].len();
Expand Down Expand Up @@ -471,4 +475,26 @@ fn test_sequence_visibility() {
});
}

// TODO(yingwen): Test key overwrite in same batch.
#[test]
fn test_iter_after_none() {
let tester = MemtableTester::default();
tester.run_testcase(|ctx| {
write_kvs(
&*ctx.memtable,
10, // sequence
ValueType::Put,
&[(1000, 0), (1001, 1), (1002, 2)], // keys
&[Some(0), Some(1), Some(2)], // values
);

let iter_ctx = IterContext {
batch_size: 4,
..Default::default()
};

let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
assert!(iter.next().is_some());
assert!(iter.next().is_none());
assert!(iter.next().is_none());
});
}
5 changes: 3 additions & 2 deletions src/storage/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<'a> ParquetWriter<'a> {
/// Iterates memtable and writes rows to Parquet file.
/// A chunk of records yielded from each iteration with a size given
/// in config will be written to a single row group.
async fn write_rows(mut self, extra_meta: Option<HashMap<String, String>>) -> Result<()> {
async fn write_rows(self, extra_meta: Option<HashMap<String, String>>) -> Result<()> {
let schema = memtable_schema_to_arrow_schema(self.iter.schema());
let object = self.object_store.object(self.file_name);

Expand All @@ -70,7 +70,8 @@ impl<'a> ParquetWriter<'a> {
)
.context(WriteParquetSnafu)?;

while let Some(batch) = self.iter.next()? {
for batch in self.iter {
let batch = batch?;
sink.send(Chunk::new(
batch
.keys
Expand Down

0 comments on commit 09fe717

Please sign in to comment.