Skip to content

Commit

Permalink
Add explicit column mask construction in parquet: ProjectionMask (#…
Browse files Browse the repository at this point in the history
…1701) (#1716)

* Add explicit column mask construction (#1701)

* Fix ParquetRecordBatchStream

* Fix docs

* Fix async_reader test

* Review feedback
  • Loading branch information
tustvold committed May 24, 2022
1 parent 6fbc9a4 commit ca1d85f
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 265 deletions.
22 changes: 9 additions & 13 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::arrow::converter::{
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
};
use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
use crate::basic::Type as PhysicalType;
use crate::data_type::{
BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type,
Expand All @@ -40,21 +41,15 @@ use crate::data_type::{
use crate::errors::Result;
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type};

/// Create array reader from parquet schema, column indices, and parquet file reader.
pub fn build_array_reader<T>(
/// Create array reader from parquet schema, projection mask, and parquet file reader.
pub fn build_array_reader(
parquet_schema: SchemaDescPtr,
arrow_schema: SchemaRef,
column_indices: T,
mask: ProjectionMask,
row_groups: Box<dyn RowGroupCollection>,
) -> Result<Box<dyn ArrayReader>>
where
T: IntoIterator<Item = usize>,
{
let field = convert_schema(
parquet_schema.as_ref(),
column_indices,
Some(arrow_schema.as_ref()),
)?;
) -> Result<Box<dyn ArrayReader>> {
let field =
convert_schema(parquet_schema.as_ref(), mask, Some(arrow_schema.as_ref()))?;

match &field {
Some(field) => build_reader(field, row_groups.as_ref()),
Expand Down Expand Up @@ -346,6 +341,7 @@ mod tests {
Arc::new(SerializedFileReader::new(file).unwrap());

let file_metadata = file_reader.metadata().file_metadata();
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
let arrow_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
Expand All @@ -355,7 +351,7 @@ mod tests {
let array_reader = build_array_reader(
file_reader.metadata().file_metadata().schema_descr_ptr(),
Arc::new(arrow_schema),
vec![0usize].into_iter(),
mask,
Box::new(file_reader),
)
.unwrap();
Expand Down
9 changes: 6 additions & 3 deletions parquet/src/arrow/array_reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ mod tests {
use crate::arrow::array_reader::build_array_reader;
use crate::arrow::array_reader::list_array::ListArrayReader;
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
use crate::arrow::{parquet_to_arrow_schema, ArrowWriter};
use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask};
use crate::file::properties::WriterProperties;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::schema::parser::parse_message_type;
Expand Down Expand Up @@ -582,10 +582,13 @@ mod tests {
)
.unwrap();

let schema = file_metadata.schema_descr_ptr();
let mask = ProjectionMask::leaves(&schema, vec![0]);

let mut array_reader = build_array_reader(
file_reader.metadata().file_metadata().schema_descr_ptr(),
schema,
Arc::new(arrow_schema),
vec![0usize].into_iter(),
mask,
Box::new(file_reader),
)
.unwrap();
Expand Down
111 changes: 43 additions & 68 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ use arrow::{array::StructArray, error::ArrowError};

use crate::arrow::array_reader::{build_array_reader, ArrayReader};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::schema::{
parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns,
};
use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
use crate::arrow::ProjectionMask;
use crate::errors::Result;
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::FileReader;
Expand All @@ -44,15 +43,8 @@ pub trait ArrowReader {
fn get_schema(&mut self) -> Result<Schema>;

/// Read parquet schema and convert it into arrow schema.
/// This schema only includes columns identified by `column_indices`.
/// To select leaf columns (i.e. `a.b.c` instead of `a`), set `leaf_columns = true`
fn get_schema_by_columns<T>(
&mut self,
column_indices: T,
leaf_columns: bool,
) -> Result<Schema>
where
T: IntoIterator<Item = usize>;
/// This schema only includes columns identified by `mask`.
fn get_schema_by_columns(&mut self, mask: ProjectionMask) -> Result<Schema>;

/// Returns record batch reader from whole parquet file.
///
Expand All @@ -64,19 +56,17 @@ pub trait ArrowReader {
fn get_record_reader(&mut self, batch_size: usize) -> Result<Self::RecordReader>;

/// Returns record batch reader whose record batch contains columns identified by
/// `column_indices`.
/// `mask`.
///
/// # Arguments
///
/// `column_indices`: The columns that should be included in record batches.
/// `mask`: The columns that should be included in record batches.
/// `batch_size`: Please refer to `get_record_reader`.
fn get_record_reader_by_columns<T>(
fn get_record_reader_by_columns(
&mut self,
column_indices: T,
mask: ProjectionMask,
batch_size: usize,
) -> Result<Self::RecordReader>
where
T: IntoIterator<Item = usize>;
) -> Result<Self::RecordReader>;
}

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -118,59 +108,34 @@ impl ArrowReader for ParquetFileArrowReader {
parquet_to_arrow_schema(file_metadata.schema_descr(), self.get_kv_metadata())
}

fn get_schema_by_columns<T>(
&mut self,
column_indices: T,
leaf_columns: bool,
) -> Result<Schema>
where
T: IntoIterator<Item = usize>,
{
fn get_schema_by_columns(&mut self, mask: ProjectionMask) -> Result<Schema> {
let file_metadata = self.file_reader.metadata().file_metadata();
if leaf_columns {
parquet_to_arrow_schema_by_columns(
file_metadata.schema_descr(),
column_indices,
self.get_kv_metadata(),
)
} else {
parquet_to_arrow_schema_by_root_columns(
file_metadata.schema_descr(),
column_indices,
self.get_kv_metadata(),
)
}
parquet_to_arrow_schema_by_columns(
file_metadata.schema_descr(),
mask,
self.get_kv_metadata(),
)
}

fn get_record_reader(
&mut self,
batch_size: usize,
) -> Result<ParquetRecordBatchReader> {
let column_indices = 0..self
.file_reader
.metadata()
.file_metadata()
.schema_descr()
.num_columns();

self.get_record_reader_by_columns(column_indices, batch_size)
self.get_record_reader_by_columns(ProjectionMask::all(), batch_size)
}

fn get_record_reader_by_columns<T>(
fn get_record_reader_by_columns(
&mut self,
column_indices: T,
mask: ProjectionMask,
batch_size: usize,
) -> Result<ParquetRecordBatchReader>
where
T: IntoIterator<Item = usize>,
{
) -> Result<ParquetRecordBatchReader> {
let array_reader = build_array_reader(
self.file_reader
.metadata()
.file_metadata()
.schema_descr_ptr(),
Arc::new(self.get_schema()?),
column_indices,
mask,
Box::new(self.file_reader.clone()),
)?;

Expand Down Expand Up @@ -296,7 +261,7 @@ mod tests {
IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter,
};
use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
use crate::arrow::ArrowWriter;
use crate::arrow::{ArrowWriter, ProjectionMask};
use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
use crate::column::writer::get_typed_column_writer_mut;
use crate::data_type::{
Expand Down Expand Up @@ -351,12 +316,14 @@ mod tests {
let parquet_file_reader =
get_test_reader("parquet/generated_simple_numerics/blogs.parquet");

let max_len = parquet_file_reader.metadata().file_metadata().num_rows() as usize;
let file_metadata = parquet_file_reader.metadata().file_metadata();
let max_len = file_metadata.num_rows() as usize;

let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [2]);
let mut arrow_reader = ParquetFileArrowReader::new(parquet_file_reader);

let mut record_batch_reader = arrow_reader
.get_record_reader_by_columns(vec![2], 60)
.get_record_reader_by_columns(mask, 60)
.expect("Failed to read into array!");

// Verify that the schema was correctly parsed
Expand Down Expand Up @@ -1040,8 +1007,11 @@ mod tests {
// (see: ARROW-11452)
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/nested_structs.rust.parquet", testdata);
let parquet_file_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let file = File::open(&path).unwrap();
let parquet_file_reader = SerializedFileReader::try_from(file).unwrap();
let file_metadata = parquet_file_reader.metadata().file_metadata();
let schema = file_metadata.schema_descr_ptr();

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
let record_batch_reader = arrow_reader
.get_record_reader(60)
Expand All @@ -1051,12 +1021,11 @@ mod tests {
batch.unwrap();
}

let mask = ProjectionMask::leaves(&schema, [3, 8, 10]);
let projected_reader = arrow_reader
.get_record_reader_by_columns(vec![3, 8, 10], 60)
.unwrap();
let projected_schema = arrow_reader
.get_schema_by_columns(vec![3, 8, 10], true)
.get_record_reader_by_columns(mask.clone(), 60)
.unwrap();
let projected_schema = arrow_reader.get_schema_by_columns(mask).unwrap();

let expected_schema = Schema::new(vec![
Field::new(
Expand Down Expand Up @@ -1139,8 +1108,11 @@ mod tests {
}

let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let file_metadata = file_reader.metadata().file_metadata();
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);

let mut batch = ParquetFileArrowReader::new(file_reader);
let reader = batch.get_record_reader_by_columns(vec![0], 1024).unwrap();
let reader = batch.get_record_reader_by_columns(mask, 1024).unwrap();

let expected_schema = arrow::datatypes::Schema::new(vec![Field::new(
"group",
Expand Down Expand Up @@ -1178,7 +1150,7 @@ mod tests {
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));

let mut record_batch_reader = arrow_reader
.get_record_reader_by_columns(vec![0], 10)
.get_record_reader_by_columns(ProjectionMask::all(), 10)
.unwrap();

let error = record_batch_reader.next().unwrap().unwrap_err();
Expand Down Expand Up @@ -1414,10 +1386,13 @@ mod tests {
let path = format!("{}/alltypes_plain.parquet", testdata);
let file = File::open(&path).unwrap();
let reader = SerializedFileReader::try_from(file).unwrap();
let expected_rows = reader.metadata().file_metadata().num_rows() as usize;
let file_metadata = reader.metadata().file_metadata();
let expected_rows = file_metadata.num_rows() as usize;
let schema = file_metadata.schema_descr_ptr();

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let batch_reader = arrow_reader.get_record_reader_by_columns([], 2).unwrap();
let mask = ProjectionMask::leaves(&schema, []);
let batch_reader = arrow_reader.get_record_reader_by_columns(mask, 2).unwrap();

let mut total_rows = 0;
for maybe_batch in batch_reader {
Expand Down
Loading

0 comments on commit ca1d85f

Please sign in to comment.