Skip to content

Commit

Permalink
Extract method to drive PageIterator -> RecordReader (#1031) (#1056)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
  • Loading branch information
alamb and tustvold committed Dec 20, 2021
1 parent 7ca3936 commit 1c59023
Showing 1 changed file with 37 additions and 51 deletions.
88 changes: 37 additions & 51 deletions parquet/src/arrow/array_reader.rs
Expand Up @@ -100,6 +100,36 @@ pub trait ArrayReader {
fn get_rep_levels(&self) -> Option<&[i16]>;
}

/// Uses `record_reader` to read up to `batch_size` records from `pages`
///
/// Returns the number of records read, which can be less than batch_size if
/// pages is exhausted.
fn read_records<T: DataType>(
record_reader: &mut RecordReader<T>,
pages: &mut dyn PageIterator,
batch_size: usize,
) -> Result<usize> {
let mut records_read = 0usize;
while records_read < batch_size {
let records_to_read = batch_size - records_read;

let records_read_once = record_reader.read_records(records_to_read)?;
records_read += records_read_once;

// Record reader exhausted
if records_read_once < records_to_read {
if let Some(page_reader) = pages.next() {
// Read from new page reader (i.e. column chunk)
record_reader.set_page_reader(page_reader?)?;
} else {
// Page reader also exhausted
break;
}
}
}
Ok(records_read)
}

/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow
/// NullArray type.
pub struct NullArrayReader<T: DataType> {
Expand All @@ -114,14 +144,8 @@ pub struct NullArrayReader<T: DataType> {

impl<T: DataType> NullArrayReader<T> {
/// Construct null array reader.
pub fn new(
mut pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
) -> Result<Self> {
let mut record_reader = RecordReader::<T>::new(column_desc.clone());
if let Some(page_reader) = pages.next() {
record_reader.set_page_reader(page_reader?)?;
}
pub fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) -> Result<Self> {
let record_reader = RecordReader::<T>::new(column_desc.clone());

Ok(Self {
data_type: ArrowType::Null,
Expand All @@ -148,25 +172,8 @@ impl<T: DataType> ArrayReader for NullArrayReader<T> {

/// Reads at most `batch_size` records into array.
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
let mut records_read = 0usize;
while records_read < batch_size {
let records_to_read = batch_size - records_read;

// NB can be 0 if at end of page
let records_read_once = self.record_reader.read_records(records_to_read)?;
records_read += records_read_once;

// Record reader exhausted
if records_read_once < records_to_read {
if let Some(page_reader) = self.pages.next() {
// Read from new page reader
self.record_reader.set_page_reader(page_reader?)?;
} else {
// Page reader also exhausted
break;
}
}
}
let records_read =
read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?;

// convert to arrays
let array = arrow::array::NullArray::new(records_read);
Expand Down Expand Up @@ -206,7 +213,7 @@ pub struct PrimitiveArrayReader<T: DataType> {
impl<T: DataType> PrimitiveArrayReader<T> {
/// Construct primitive array reader.
pub fn new(
mut pages: Box<dyn PageIterator>,
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
) -> Result<Self> {
Expand All @@ -218,10 +225,7 @@ impl<T: DataType> PrimitiveArrayReader<T> {
.clone(),
};

let mut record_reader = RecordReader::<T>::new(column_desc.clone());
if let Some(page_reader) = pages.next() {
record_reader.set_page_reader(page_reader?)?;
}
let record_reader = RecordReader::<T>::new(column_desc.clone());

Ok(Self {
data_type,
Expand All @@ -248,25 +252,7 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {

/// Reads at most `batch_size` records into array.
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
let mut records_read = 0usize;
while records_read < batch_size {
let records_to_read = batch_size - records_read;

// NB can be 0 if at end of page
let records_read_once = self.record_reader.read_records(records_to_read)?;
records_read += records_read_once;

// Record reader exhausted
if records_read_once < records_to_read {
if let Some(page_reader) = self.pages.next() {
// Read from new page reader
self.record_reader.set_page_reader(page_reader?)?;
} else {
// Page reader also exhausted
break;
}
}
}
read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?;

let target_type = self.get_data_type().clone();
let arrow_data_type = match T::get_physical_type() {
Expand Down

0 comments on commit 1c59023

Please sign in to comment.