Skip to content

Commit

Permalink
Support peek_next_page and skip_next_page in InMemoryPageReader (#2407)
Browse files Browse the repository at this point in the history
* Support peek_next_page and skip_next_page in InMemoryPageReader

* fix comment
  • Loading branch information
Ted-Jiang committed Aug 12, 2022
1 parent d11b388 commit ee2818e
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 6 deletions.
182 changes: 182 additions & 0 deletions parquet/src/arrow/record_reader/mod.rs
Expand Up @@ -786,4 +786,186 @@ mod tests {
assert_eq!(record_reader.num_records(), 8);
assert_eq!(record_reader.num_values(), 14);
}

#[test]
fn test_skip_required_records() {
// Construct column schema
let message_type = "
message test_schema {
REQUIRED INT32 leaf;
}
";
let desc = parse_message_type(message_type)
.map(|t| SchemaDescriptor::new(Arc::new(t)))
.map(|s| s.column(0))
.unwrap();

// Construct record reader
let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());

// First page

// Records data:
// test_schema
// leaf: 4
// test_schema
// leaf: 7
// test_schema
// leaf: 6
// test_schema
// left: 3
// test_schema
// left: 2
{
let values = [4, 7, 6, 3, 2];
let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.skip_records(2).unwrap());
assert_eq!(0, record_reader.num_records());
assert_eq!(0, record_reader.num_values());
assert_eq!(3, record_reader.read_records(3).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(3, record_reader.num_values());
}

// Second page

// Records data:
// test_schema
// leaf: 8
// test_schema
// leaf: 9
{
let values = [8, 9];
let mut pb = DataPageBuilderImpl::new(desc, 2, true);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.skip_records(10).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(3, record_reader.num_values());
assert_eq!(0, record_reader.read_records(10).unwrap());
}

let mut bb = Int32BufferBuilder::new(3);
bb.append_slice(&[6, 3, 2]);
let expected_buffer = bb.finish();
assert_eq!(expected_buffer, record_reader.consume_record_data());
assert_eq!(None, record_reader.consume_def_levels());
assert_eq!(None, record_reader.consume_bitmap());
}

#[test]
fn test_skip_optional_records() {
// Construct column schema
let message_type = "
message test_schema {
OPTIONAL Group test_struct {
OPTIONAL INT32 leaf;
}
}
";

let desc = parse_message_type(message_type)
.map(|t| SchemaDescriptor::new(Arc::new(t)))
.map(|s| s.column(0))
.unwrap();

// Construct record reader
let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());

// First page

// Records data:
// test_schema
// test_struct
// test_schema
// test_struct
// leaf: 7
// test_schema
// test_schema
// test_struct
// leaf: 6
// test_schema
// test_struct
// leaf: 6
{
let values = [7, 6, 3];
//empty, non-empty, empty, non-empty, non-empty
let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
pb.add_def_levels(2, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.skip_records(2).unwrap());
assert_eq!(0, record_reader.num_records());
assert_eq!(0, record_reader.num_values());
assert_eq!(3, record_reader.read_records(3).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(3, record_reader.num_values());
}

// Second page

// Records data:
// test_schema
// test_schema
// test_struct
// left: 8
{
let values = [8];
//empty, non-empty
let def_levels = [0i16, 2i16];
let mut pb = DataPageBuilderImpl::new(desc, 2, true);
pb.add_def_levels(2, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.skip_records(10).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(3, record_reader.num_values());
assert_eq!(0, record_reader.read_records(10).unwrap());
}

// Verify result def levels
let mut bb = Int16BufferBuilder::new(7);
bb.append_slice(&[0i16, 2i16, 2i16]);
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
record_reader.consume_def_levels()
);

// Verify bitmap
let expected_valid = &[false, true, true];
let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
let expected_bitmap = Bitmap::from(expected_buffer);
assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());

// Verify result record data
let actual = record_reader.consume_record_data();
let actual_values = actual.typed_data::<i32>();

let expected = &[0, 6, 3];
assert_eq!(actual_values.len(), expected.len());

// Only validate valid values are equal
let iter = expected_valid.iter().zip(actual_values).zip(expected);
for ((valid, actual), expected) in iter {
if *valid {
assert_eq!(actual, expected)
}
}
}
}
31 changes: 25 additions & 6 deletions parquet/src/util/test_common/page_util.rs
Expand Up @@ -24,6 +24,7 @@ use crate::encodings::levels::LevelEncoder;
use crate::errors::Result;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
use std::iter::Peekable;
use std::mem;

pub trait DataPageBuilder {
Expand Down Expand Up @@ -127,8 +128,8 @@ impl DataPageBuilder for DataPageBuilderImpl {
encoding: self.encoding.unwrap(),
num_nulls: 0, /* set to dummy value - don't need this when reading
* data page */
num_rows: self.num_values, /* also don't need this when reading
* data page */
num_rows: self.num_values, /* num_rows only needs in skip_records, now we not support skip REPEATED field,
* so we can assume num_values == num_rows */
def_levels_byte_len: self.def_levels_byte_len,
rep_levels_byte_len: self.rep_levels_byte_len,
is_compressed: false,
Expand All @@ -149,13 +150,13 @@ impl DataPageBuilder for DataPageBuilderImpl {

/// A utility page reader which stores pages in memory.
pub struct InMemoryPageReader<P: Iterator<Item = Page>> {
page_iter: P,
page_iter: Peekable<P>,
}

impl<P: Iterator<Item = Page>> InMemoryPageReader<P> {
pub fn new(pages: impl IntoIterator<Item = Page, IntoIter = P>) -> Self {
Self {
page_iter: pages.into_iter(),
page_iter: pages.into_iter().peekable(),
}
}
}
Expand All @@ -166,11 +167,29 @@ impl<P: Iterator<Item = Page> + Send> PageReader for InMemoryPageReader<P> {
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
unimplemented!()
if let Some(x) = self.page_iter.peek() {
match x {
Page::DataPage { num_values, .. } => Ok(Some(PageMetadata {
num_rows: *num_values as usize,
is_dict: false,
})),
Page::DataPageV2 { num_rows, .. } => Ok(Some(PageMetadata {
num_rows: *num_rows as usize,
is_dict: false,
})),
Page::DictionaryPage { .. } => Ok(Some(PageMetadata {
num_rows: 0,
is_dict: true,
})),
}
} else {
Ok(None)
}
}

fn skip_next_page(&mut self) -> Result<()> {
unimplemented!()
self.page_iter.next();
Ok(())
}
}

Expand Down

0 comments on commit ee2818e

Please sign in to comment.