Skip to content

Commit

Permalink
Add ParquetFileArrowReader::try_new (#1782)
Browse files Browse the repository at this point in the history
* Add ParquetFileArrowReader::try_new

* Review feedback
  • Loading branch information
tustvold committed Jun 3, 2022
1 parent eb706b7 commit 940b5b5
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 68 deletions.
92 changes: 51 additions & 41 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
use crate::arrow::ProjectionMask;
use crate::errors::Result;
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::FileReader;
use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
use crate::schema::types::SchemaDescriptor;

/// Arrow reader api.
Expand Down Expand Up @@ -145,15 +145,40 @@ impl ArrowReader for ParquetFileArrowReader {
}

impl ParquetFileArrowReader {
/// Create a new [`ParquetFileArrowReader`]
/// Create a new [`ParquetFileArrowReader`] with the provided [`ChunkReader`]
///
/// ```no_run
/// # use std::fs::File;
/// # use bytes::Bytes;
/// # use parquet::arrow::ParquetFileArrowReader;
///
/// let file = File::open("file.parquet").unwrap();
/// let reader = ParquetFileArrowReader::try_new(file).unwrap();
///
/// let bytes = Bytes::from(vec![]);
/// let reader = ParquetFileArrowReader::try_new(bytes).unwrap();
/// ```
pub fn try_new<R: ChunkReader + 'static>(chunk_reader: R) -> Result<Self> {
Self::try_new_with_options(chunk_reader, Default::default())
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`ChunkReader`]
/// and [`ArrowReaderOptions`]
pub fn try_new_with_options<R: ChunkReader + 'static>(
chunk_reader: R,
options: ArrowReaderOptions,
) -> Result<Self> {
let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?);
Ok(Self::new_with_options(file_reader, options))
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`Arc<dyn FileReader>`]
pub fn new(file_reader: Arc<dyn FileReader>) -> Self {
Self {
file_reader,
options: Default::default(),
}
Self::new_with_options(file_reader, Default::default())
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`ArrowReaderOptions`]
/// Create a new [`ParquetFileArrowReader`] with the provided [`Arc<dyn FileReader>`]
/// and [`ArrowReaderOptions`]
pub fn new_with_options(
file_reader: Arc<dyn FileReader>,
options: ArrowReaderOptions,
Expand Down Expand Up @@ -369,8 +394,7 @@ mod tests {

file.rewind().unwrap();

let parquet_reader = SerializedFileReader::try_from(file).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let record_reader = arrow_reader.get_record_reader(2).unwrap();

let batches = record_reader.collect::<ArrowResult<Vec<_>>>().unwrap();
Expand Down Expand Up @@ -601,9 +625,8 @@ mod tests {
let file_variants = vec![("fixed_length", 25), ("int32", 4), ("int64", 10)];
for (prefix, target_precision) in file_variants {
let path = format!("{}/{}_decimal.parquet", testdata, prefix);
let parquet_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let file = File::open(&path).unwrap();
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();

let mut record_reader = arrow_reader.get_record_reader(32).unwrap();

Expand Down Expand Up @@ -871,9 +894,7 @@ mod tests {

file.rewind().unwrap();

let parquet_reader = SerializedFileReader::try_from(file).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));

let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let mut record_reader = arrow_reader
.get_record_reader(opts.record_batch_size)
.unwrap();
Expand Down Expand Up @@ -1022,11 +1043,7 @@ mod tests {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/nested_structs.rust.parquet", testdata);
let file = File::open(&path).unwrap();
let parquet_file_reader = SerializedFileReader::try_from(file).unwrap();
let file_metadata = parquet_file_reader.metadata().file_metadata();
let schema = file_metadata.schema_descr_ptr();

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");
Expand All @@ -1035,7 +1052,7 @@ mod tests {
batch.unwrap();
}

let mask = ProjectionMask::leaves(&schema, [3, 8, 10]);
let mask = ProjectionMask::leaves(arrow_reader.parquet_schema(), [3, 8, 10]);
let projected_reader = arrow_reader
.get_record_reader_by_columns(mask.clone(), 60)
.unwrap();
Expand Down Expand Up @@ -1075,9 +1092,8 @@ mod tests {
fn test_read_maps() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/nested_maps.snappy.parquet", testdata);
let parquet_file_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
let file = File::open(&path).unwrap();
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");
Expand Down Expand Up @@ -1124,14 +1140,12 @@ mod tests {
writer.close().unwrap();
}

let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let file_metadata = file_reader.metadata().file_metadata();
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
let mut reader = ParquetFileArrowReader::try_new(file).unwrap();
let mask = ProjectionMask::leaves(reader.parquet_schema(), [0]);

let mut batch = ParquetFileArrowReader::new(file_reader);
let reader = batch.get_record_reader_by_columns(mask, 1024).unwrap();
let reader = reader.get_record_reader_by_columns(mask, 1024).unwrap();

let expected_schema = arrow::datatypes::Schema::new(vec![Field::new(
let expected_schema = Schema::new(vec![Field::new(
"group",
ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)]),
true,
Expand Down Expand Up @@ -1163,9 +1177,7 @@ mod tests {
];

let file = Bytes::from(data);
let file_reader = SerializedFileReader::new(file).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));

let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let mut record_batch_reader = arrow_reader
.get_record_reader_by_columns(ProjectionMask::all(), 10)
.unwrap();
Expand Down Expand Up @@ -1241,8 +1253,7 @@ mod tests {

file.rewind().unwrap();

let parquet_reader = SerializedFileReader::try_from(file).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();

let record_reader = arrow_reader.get_record_reader(3).unwrap();

Expand Down Expand Up @@ -1280,9 +1291,8 @@ mod tests {
fn test_read_null_list() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/null_list.parquet", testdata);
let parquet_file_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
let file = File::open(&path).unwrap();
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let mut record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");
Expand Down Expand Up @@ -1402,12 +1412,12 @@ mod tests {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let file = File::open(&path).unwrap();
let reader = SerializedFileReader::try_from(file).unwrap();
let file_metadata = reader.metadata().file_metadata();

let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let file_metadata = arrow_reader.metadata().file_metadata();
let expected_rows = file_metadata.num_rows() as usize;
let schema = file_metadata.schema_descr_ptr();

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let mask = ProjectionMask::leaves(&schema, []);
let batch_reader = arrow_reader.get_record_reader_by_columns(mask, 2).unwrap();

Expand Down
19 changes: 8 additions & 11 deletions parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,7 @@ mod tests {
}

let cursor = Bytes::from(buffer);
let reader = SerializedFileReader::new(cursor).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(cursor).unwrap();
let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();

let actual_batch = record_batch_reader
Expand Down Expand Up @@ -1188,8 +1187,8 @@ mod tests {
writer.write(&expected_batch).unwrap();
writer.close().unwrap();

let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let mut arrow_reader =
ParquetFileArrowReader::try_new(file.try_clone().unwrap()).unwrap();
let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();

let actual_batch = record_batch_reader
Expand Down Expand Up @@ -1918,10 +1917,9 @@ mod tests {

writer.close().unwrap();

let reader = SerializedFileReader::new(file).unwrap();
assert_eq!(&row_group_sizes(reader.metadata()), &[200, 200, 50]);
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
assert_eq!(&row_group_sizes(arrow_reader.metadata()), &[200, 200, 50]);

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let batches = arrow_reader
.get_record_reader(100)
.unwrap()
Expand Down Expand Up @@ -2061,13 +2059,12 @@ mod tests {
writer.close().unwrap();

// Read Data
let reader = SerializedFileReader::new(file).unwrap();

// Should have written entire first batch and first row of second to the first row group
// leaving a single row in the second row group
assert_eq!(&row_group_sizes(reader.metadata()), &[6, 1]);

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
assert_eq!(&row_group_sizes(arrow_reader.metadata()), &[6, 1]);

let batches = arrow_reader
.get_record_reader(2)
.unwrap()
Expand Down
7 changes: 2 additions & 5 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@
//! # writer.close().unwrap();
//!
//! let file = File::open("data.parquet").unwrap();
//! let file_reader = SerializedFileReader::new(file).unwrap();
//!
//! let file_metadata = file_reader.metadata().file_metadata();
//! let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
//!
//! let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
//! let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
//! let mask = ProjectionMask::leaves(arrow_reader.parquet_schema(), [0]);
//!
//! println!("Converted arrow schema is: {}", arrow_reader.get_schema().unwrap());
//! println!("Arrow schema after projection is: {}",
Expand Down
25 changes: 14 additions & 11 deletions parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,11 +478,11 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
mod tests {
use super::*;

use std::{collections::HashMap, convert::TryFrom, sync::Arc};
use std::{collections::HashMap, sync::Arc};

use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};

use crate::file::{metadata::KeyValue, reader::SerializedFileReader};
use crate::file::metadata::KeyValue;
use crate::{
arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader},
schema::{parser::parse_message_type, types::SchemaDescriptor},
Expand Down Expand Up @@ -571,9 +571,12 @@ mod tests {
];
assert_eq!(&arrow_fields, converted_arrow_schema.fields());

let converted_arrow_schema =
parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
.unwrap();
let converted_arrow_schema = parquet_to_arrow_schema_by_columns(
&parquet_schema,
ProjectionMask::all(),
None,
)
.unwrap();
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
}

Expand Down Expand Up @@ -1599,13 +1602,13 @@ mod tests {
writer.close()?;

// read file back
let parquet_reader = SerializedFileReader::try_from(file)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let read_schema = arrow_reader.get_schema()?;
assert_eq!(schema, read_schema);

// read all fields by columns
let partial_read_schema = arrow_reader.get_schema_by_columns(ProjectionMask::all())?;
let partial_read_schema =
arrow_reader.get_schema_by_columns(ProjectionMask::all())?;
assert_eq!(schema, partial_read_schema);

Ok(())
Expand Down Expand Up @@ -1668,13 +1671,13 @@ mod tests {
writer.close()?;

// read file back
let parquet_reader = SerializedFileReader::try_from(file)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let read_schema = arrow_reader.get_schema()?;
assert_eq!(schema, read_schema);

// read all fields by columns
let partial_read_schema = arrow_reader.get_schema_by_columns(ProjectionMask::all())?;
let partial_read_schema =
arrow_reader.get_schema_by_columns(ProjectionMask::all())?;
assert_eq!(schema, partial_read_schema);

Ok(())
Expand Down

0 comments on commit 940b5b5

Please sign in to comment.