Skip to content

Commit

Permalink
Use offset index in ParquetRecordBatchStream (#2526)
Browse files Browse the repository at this point in the history
* Use offset index in ParquetRecordBatchStream

* remove debugging cruft and fix clippy warning

* Do not use ReadOptions

* Fix bug with dictionary pages

* Review comments

* Fix bug in page skipping logic
  • Loading branch information
thinkharderdev committed Aug 20, 2022
1 parent 0f45932 commit 1eb6c45
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 15 deletions.
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Expand Up @@ -207,7 +207,7 @@ pub trait ArrowReader {
#[derive(Debug, Clone, Default)]
pub struct ArrowReaderOptions {
skip_arrow_metadata: bool,
page_index: bool,
pub(crate) page_index: bool,
}

impl ArrowReaderOptions {
Expand Down
33 changes: 32 additions & 1 deletion parquet/src/arrow/arrow_reader/selection.rs
Expand Up @@ -162,7 +162,12 @@ impl RowSelection {
current_selector = selectors.next();
}
} else {
break;
if !(selector.skip || current_page_included) {
let start = page.offset as usize;
let end = start + page.compressed_page_size as usize;
ranges.push(start..end);
}
current_selector = selectors.next()
}
}

Expand Down Expand Up @@ -564,5 +569,31 @@ mod tests {

// assert_eq!(mask, vec![false, true, true, false, true, true, true]);
assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);

let selection = RowSelection::from(vec![
// Skip first page
RowSelector::skip(10),
// Multiple selects in same page
RowSelector::select(3),
RowSelector::skip(3),
RowSelector::select(4),
// Select to page boundary
RowSelector::skip(5),
RowSelector::select(5),
// Skip full page past page boundary
RowSelector::skip(12),
// Select to final page bounday
RowSelector::select(12),
RowSelector::skip(1),
// Skip across final page boundary
RowSelector::skip(8),
// Select from final page
RowSelector::select(4),
]);

let ranges = selection.scan_ranges(&index);

// assert_eq!(mask, vec![false, true, true, false, true, true, true]);
assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
}
}
235 changes: 227 additions & 8 deletions parquet/src/arrow/async_reader.rs
Expand Up @@ -78,7 +78,7 @@
use std::collections::VecDeque;
use std::fmt::Formatter;

use std::io::SeekFrom;
use std::io::{Cursor, SeekFrom};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -88,6 +88,8 @@ use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
use parquet_format::OffsetIndex;
use thrift::protocol::TCompactInputProtocol;

use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

Expand All @@ -96,8 +98,8 @@ use arrow::record_batch::RecordBatch;

use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
use crate::arrow::arrow_reader::{
evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader,
RowFilter, RowSelection,
evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions,
ParquetRecordBatchReader, RowFilter, RowSelection,
};
use crate::arrow::ProjectionMask;

Expand All @@ -108,6 +110,7 @@ use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};

use crate::file::page_index::index_reader;
use crate::file::FOOTER_SIZE;

use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
Expand Down Expand Up @@ -218,6 +221,96 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
Self::new_builder(AsyncReader(input), metadata, Default::default())
}

pub async fn new_with_options(
mut input: T,
options: ArrowReaderOptions,
) -> Result<Self> {
let mut metadata = input.get_metadata().await?;

if options.page_index
&& metadata
.page_indexes()
.zip(metadata.offset_indexes())
.is_none()
{
let mut fetch_ranges = vec![];
let mut index_lengths: Vec<Vec<usize>> = vec![];

for rg in metadata.row_groups() {
let (loc_offset, loc_length) =
index_reader::get_location_offset_and_total_length(rg.columns())?;

let (idx_offset, idx_lengths) =
index_reader::get_index_offset_and_lengths(rg.columns())?;
let idx_length = idx_lengths.iter().sum::<usize>();

// If index data is missing, return without any indexes
if loc_length == 0 || idx_length == 0 {
return Self::new_builder(AsyncReader(input), metadata, options);
}

fetch_ranges.push(loc_offset as usize..loc_offset as usize + loc_length);
fetch_ranges.push(idx_offset as usize..idx_offset as usize + idx_length);
index_lengths.push(idx_lengths);
}

let mut chunks = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut index_lengths = index_lengths.into_iter();

let mut row_groups = metadata.row_groups().to_vec();

let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in row_groups.iter_mut() {
let columns = rg.columns();

let location_data = chunks.next().unwrap();
let mut cursor = Cursor::new(location_data);
let mut offset_index = vec![];

for _ in 0..columns.len() {
let mut prot = TCompactInputProtocol::new(&mut cursor);
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
offset_index.push(offset.page_locations);
}

rg.set_page_offset(offset_index.clone());
offset_indexes.push(offset_index);

let index_data = chunks.next().unwrap();
let index_lengths = index_lengths.next().unwrap();

let mut start = 0;
let data = index_lengths.into_iter().map(|length| {
let r = index_data.slice(start..start + length);
start += length;
r
});

let indexes = rg
.columns()
.iter()
.zip(data)
.map(|(column, data)| {
let column_type = column.column_type();
index_reader::deserialize_column_index(&data, column_type)
})
.collect::<Result<Vec<_>>>()?;
columns_indexes.push(indexes);
}

metadata = Arc::new(ParquetMetaData::new_with_page_index(
metadata.file_metadata().clone(),
row_groups,
Some(columns_indexes),
Some(offset_indexes),
));
}

Self::new_builder(AsyncReader(input), metadata, options)
}

/// Build a new [`ParquetRecordBatchStream`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let num_row_groups = self.metadata.row_groups().len();
Expand Down Expand Up @@ -493,13 +586,26 @@ impl<'a> InMemoryRowGroup<'a> {
let fetch_ranges = self
.column_chunks
.iter()
.zip(self.metadata.columns())
.enumerate()
.into_iter()
.filter_map(|(idx, chunk)| {
.filter_map(|(idx, (chunk, chunk_meta))| {
(chunk.is_none() && projection.leaf_included(idx)).then(|| {
let ranges = selection.scan_ranges(&page_locations[idx]);
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
}
_ => (),
}

ranges.extend(selection.scan_ranges(&page_locations[idx]));
page_start_offsets
.push(ranges.iter().map(|range| range.start).collect());

ranges
})
})
Expand Down Expand Up @@ -687,7 +793,6 @@ mod tests {
use crate::file::page_index::index_reader;
use arrow::array::{Array, ArrayRef, Int32Array, StringArray};
use arrow::error::Result as ArrowResult;

use futures::TryStreamExt;
use std::sync::Mutex;

Expand Down Expand Up @@ -763,6 +868,70 @@ mod tests {
);
}

#[tokio::test]
async fn test_async_reader_with_index() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();

// The builder should have page and offset indexes loaded now
let metadata_with_index = builder.metadata();

// Check offset indexes are present for all columns
for rg in metadata_with_index.row_groups() {
let page_locations = rg
.page_offset_index()
.as_ref()
.expect("expected page offset index");
assert_eq!(page_locations.len(), rg.columns().len())
}

// Check page indexes are present for all columns
let page_indexes = metadata_with_index
.page_indexes()
.expect("expected page indexes");
for (idx, rg) in metadata_with_index.row_groups().iter().enumerate() {
assert_eq!(page_indexes[idx].len(), rg.columns().len())
}

let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.build()
.unwrap();

let async_batches: Vec<_> = stream.try_collect().await.unwrap();

let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(1024)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();

assert_eq!(async_batches, sync_batches);
}

#[tokio::test]
async fn test_row_filter() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
Expand Down Expand Up @@ -832,6 +1001,56 @@ mod tests {
assert_eq!(requests.lock().unwrap().len(), 3);
}

#[tokio::test]
async fn test_row_filter_with_index() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let a_filter = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![1]),
|batch| arrow::compute::eq_dyn_bool_scalar(batch.column(0), true),
);

let b_filter = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![2]),
|batch| arrow::compute::eq_dyn_scalar(batch.column(0), 2_i32),
);

let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);

let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);

let options = ArrowReaderOptions::new().with_page_index(true);
let stream =
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap()
.with_projection(mask.clone())
.with_batch_size(1024)
.with_row_filter(filter)
.build()
.unwrap();

let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();

let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();

assert_eq!(total_rows, 730);
}

#[tokio::test]
async fn test_in_memory_row_group_sparse() {
let testdata = arrow::util::test_util::parquet_test_data();
Expand Down Expand Up @@ -882,7 +1101,7 @@ mod tests {
let mut skip = true;
let mut pages = offset_index[0].iter().peekable();

// Setup `RowSelection` so that we can skip every other page
// Setup `RowSelection` so that we can skip every other page, selecting the last page
let mut selectors = vec![];
let mut expected_page_requests: Vec<Range<usize>> = vec![];
while let Some(page) = pages.next() {
Expand All @@ -906,7 +1125,7 @@ mod tests {
let selection = RowSelection::from(selectors);

let (_factory, _reader) = reader_factory
.read_row_group(0, Some(selection), projection, 48)
.read_row_group(0, Some(selection), projection.clone(), 48)
.await
.expect("reading row group");

Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/reader.rs
Expand Up @@ -312,7 +312,7 @@ where

// If page has less rows than the remaining records to
// be skipped, skip entire page
if metadata.num_rows < remaining {
if metadata.num_rows <= remaining {
self.page_reader.skip_next_page()?;
remaining -= metadata.num_rows;
continue;
Expand Down

0 comments on commit 1eb6c45

Please sign in to comment.