Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ParquetFileArrowReader::try_new #1782

Merged
merged 3 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 51 additions & 41 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
use crate::arrow::ProjectionMask;
use crate::errors::Result;
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::FileReader;
use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
use crate::schema::types::SchemaDescriptor;

/// Arrow reader api.
Expand Down Expand Up @@ -145,15 +145,40 @@ impl ArrowReader for ParquetFileArrowReader {
}

impl ParquetFileArrowReader {
/// Create a new [`ParquetFileArrowReader`]
/// Create a new [`ParquetFileArrowReader`] with the provided [`ChunkReader`]
tustvold marked this conversation as resolved.
Show resolved Hide resolved
///
/// ```no_run
/// # use std::fs::File;
/// # use bytes::Bytes;
/// # use parquet::arrow::ParquetFileArrowReader;
///
/// let file = File::open("file.parquet").unwrap();
/// let reader = ParquetFileArrowReader::try_new(file).unwrap();
///
/// let bytes = Bytes::from(vec![]);
/// let reader = ParquetFileArrowReader::try_new(bytes).unwrap();
/// ```
pub fn try_new<R: ChunkReader + 'static>(chunk_reader: R) -> Result<Self> {
Self::try_new_with_options(chunk_reader, Default::default())
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`ChunkReader`]
/// and [`ArrowReaderOptions`]
pub fn try_new_with_options<R: ChunkReader + 'static>(
chunk_reader: R,
options: ArrowReaderOptions,
) -> Result<Self> {
let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?);
Ok(Self::new_with_options(file_reader, options))
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`Arc<dyn FileReader>`]
pub fn new(file_reader: Arc<dyn FileReader>) -> Self {
Self {
file_reader,
options: Default::default(),
}
Self::new_with_options(file_reader, Default::default())
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`ArrowReaderOptions`]
/// Create a new [`ParquetFileArrowReader`] with the provided [`Arc<dyn FileReader>`]
/// and [`ArrowReaderOptions`]
pub fn new_with_options(
file_reader: Arc<dyn FileReader>,
options: ArrowReaderOptions,
Expand Down Expand Up @@ -369,8 +394,7 @@ mod tests {

file.rewind().unwrap();

let parquet_reader = SerializedFileReader::try_from(file).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let record_reader = arrow_reader.get_record_reader(2).unwrap();

let batches = record_reader.collect::<ArrowResult<Vec<_>>>().unwrap();
Expand Down Expand Up @@ -601,9 +625,8 @@ mod tests {
let file_variants = vec![("fixed_length", 25), ("int32", 4), ("int64", 10)];
for (prefix, target_precision) in file_variants {
let path = format!("{}/{}_decimal.parquet", testdata, prefix);
let parquet_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let file = File::open(&path).unwrap();
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();

let mut record_reader = arrow_reader.get_record_reader(32).unwrap();

Expand Down Expand Up @@ -871,9 +894,7 @@ mod tests {

file.rewind().unwrap();

let parquet_reader = SerializedFileReader::try_from(file).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));

let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let mut record_reader = arrow_reader
.get_record_reader(opts.record_batch_size)
.unwrap();
Expand Down Expand Up @@ -1022,11 +1043,7 @@ mod tests {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/nested_structs.rust.parquet", testdata);
let file = File::open(&path).unwrap();
let parquet_file_reader = SerializedFileReader::try_from(file).unwrap();
let file_metadata = parquet_file_reader.metadata().file_metadata();
let schema = file_metadata.schema_descr_ptr();

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");
Expand All @@ -1035,7 +1052,7 @@ mod tests {
batch.unwrap();
}

let mask = ProjectionMask::leaves(&schema, [3, 8, 10]);
let mask = ProjectionMask::leaves(arrow_reader.parquet_schema(), [3, 8, 10]);
let projected_reader = arrow_reader
.get_record_reader_by_columns(mask.clone(), 60)
.unwrap();
Expand Down Expand Up @@ -1075,9 +1092,8 @@ mod tests {
fn test_read_maps() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/nested_maps.snappy.parquet", testdata);
let parquet_file_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
let file = File::open(&path).unwrap();
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");
Expand Down Expand Up @@ -1124,14 +1140,12 @@ mod tests {
writer.close().unwrap();
}

let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let file_metadata = file_reader.metadata().file_metadata();
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
let mut reader = ParquetFileArrowReader::try_new(file).unwrap();
let mask = ProjectionMask::leaves(reader.parquet_schema(), [0]);

let mut batch = ParquetFileArrowReader::new(file_reader);
let reader = batch.get_record_reader_by_columns(mask, 1024).unwrap();
let reader = reader.get_record_reader_by_columns(mask, 1024).unwrap();

let expected_schema = arrow::datatypes::Schema::new(vec![Field::new(
let expected_schema = Schema::new(vec![Field::new(
"group",
ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)]),
true,
Expand Down Expand Up @@ -1163,9 +1177,7 @@ mod tests {
];

let file = Bytes::from(data);
let file_reader = SerializedFileReader::new(file).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));

let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let mut record_batch_reader = arrow_reader
.get_record_reader_by_columns(ProjectionMask::all(), 10)
.unwrap();
Expand Down Expand Up @@ -1241,8 +1253,7 @@ mod tests {

file.rewind().unwrap();

let parquet_reader = SerializedFileReader::try_from(file).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();

let record_reader = arrow_reader.get_record_reader(3).unwrap();

Expand Down Expand Up @@ -1280,9 +1291,8 @@ mod tests {
fn test_read_null_list() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/null_list.parquet", testdata);
let parquet_file_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
let file = File::open(&path).unwrap();
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let mut record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");
Expand Down Expand Up @@ -1402,12 +1412,12 @@ mod tests {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let file = File::open(&path).unwrap();
let reader = SerializedFileReader::try_from(file).unwrap();
let file_metadata = reader.metadata().file_metadata();

let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let file_metadata = arrow_reader.metadata().file_metadata();
let expected_rows = file_metadata.num_rows() as usize;
let schema = file_metadata.schema_descr_ptr();

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let mask = ProjectionMask::leaves(&schema, []);
let batch_reader = arrow_reader.get_record_reader_by_columns(mask, 2).unwrap();

Expand Down
19 changes: 8 additions & 11 deletions parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,7 @@ mod tests {
}

let cursor = Bytes::from(buffer);
let reader = SerializedFileReader::new(cursor).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(cursor).unwrap();
let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();

let actual_batch = record_batch_reader
Expand Down Expand Up @@ -1188,8 +1187,8 @@ mod tests {
writer.write(&expected_batch).unwrap();
writer.close().unwrap();

let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let mut arrow_reader =
ParquetFileArrowReader::try_new(file.try_clone().unwrap()).unwrap();
let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();

let actual_batch = record_batch_reader
Expand Down Expand Up @@ -1918,10 +1917,9 @@ mod tests {

writer.close().unwrap();

let reader = SerializedFileReader::new(file).unwrap();
assert_eq!(&row_group_sizes(reader.metadata()), &[200, 200, 50]);
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
assert_eq!(&row_group_sizes(arrow_reader.metadata()), &[200, 200, 50]);

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let batches = arrow_reader
.get_record_reader(100)
.unwrap()
Expand Down Expand Up @@ -2061,13 +2059,12 @@ mod tests {
writer.close().unwrap();

// Read Data
let reader = SerializedFileReader::new(file).unwrap();

// Should have written entire first batch and first row of second to the first row group
// leaving a single row in the second row group
assert_eq!(&row_group_sizes(reader.metadata()), &[6, 1]);

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
assert_eq!(&row_group_sizes(arrow_reader.metadata()), &[6, 1]);

let batches = arrow_reader
.get_record_reader(2)
.unwrap()
Expand Down
7 changes: 2 additions & 5 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@
//! # writer.close().unwrap();
//!
//! let file = File::open("data.parquet").unwrap();
//! let file_reader = SerializedFileReader::new(file).unwrap();
//!
//! let file_metadata = file_reader.metadata().file_metadata();
//! let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
//!
//! let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
//! let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
tustvold marked this conversation as resolved.
Show resolved Hide resolved
//! let mask = ProjectionMask::leaves(arrow_reader.parquet_schema(), [0]);
//!
//! println!("Converted arrow schema is: {}", arrow_reader.get_schema().unwrap());
//! println!("Arrow schema after projection is: {}",
Expand Down
25 changes: 14 additions & 11 deletions parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,11 +478,11 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
mod tests {
use super::*;

use std::{collections::HashMap, convert::TryFrom, sync::Arc};
use std::{collections::HashMap, sync::Arc};

use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};

use crate::file::{metadata::KeyValue, reader::SerializedFileReader};
use crate::file::metadata::KeyValue;
use crate::{
arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader},
schema::{parser::parse_message_type, types::SchemaDescriptor},
Expand Down Expand Up @@ -571,9 +571,12 @@ mod tests {
];
assert_eq!(&arrow_fields, converted_arrow_schema.fields());

let converted_arrow_schema =
parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
.unwrap();
let converted_arrow_schema = parquet_to_arrow_schema_by_columns(
&parquet_schema,
ProjectionMask::all(),
None,
)
.unwrap();
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
}

Expand Down Expand Up @@ -1599,13 +1602,13 @@ mod tests {
writer.close()?;

// read file back
let parquet_reader = SerializedFileReader::try_from(file)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let read_schema = arrow_reader.get_schema()?;
assert_eq!(schema, read_schema);

// read all fields by columns
let partial_read_schema = arrow_reader.get_schema_by_columns(ProjectionMask::all())?;
let partial_read_schema =
arrow_reader.get_schema_by_columns(ProjectionMask::all())?;
assert_eq!(schema, partial_read_schema);

Ok(())
Expand Down Expand Up @@ -1668,13 +1671,13 @@ mod tests {
writer.close()?;

// read file back
let parquet_reader = SerializedFileReader::try_from(file)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let read_schema = arrow_reader.get_schema()?;
assert_eq!(schema, read_schema);

// read all fields by columns
let partial_read_schema = arrow_reader.get_schema_by_columns(ProjectionMask::all())?;
let partial_read_schema =
arrow_reader.get_schema_by_columns(ProjectionMask::all())?;
assert_eq!(schema, partial_read_schema);

Ok(())
Expand Down