Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion rust/parquet/src/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,25 @@
//! assert_eq!(row_group_reader.num_columns(), 1);
//! }
//! ```

//! # Example of reading multiple files
//!
//! ```rust,no_run
//! use parquet::file::reader::SerializedFileReader;
//! use std::convert::TryFrom;
//!
//! let paths = vec![
//! "/path/to/sample.parquet/part-1.snappy.parquet",
//! "/path/to/sample.parquet/part-2.snappy.parquet"
//! ];
//! // Create a reader for each file and flat map rows
//! let rows = paths.iter()
//! .map(|p| SerializedFileReader::try_from(*p).unwrap())
//! .flat_map(|r| r.into_iter());
//!
//! for row in rows {
//! println!("{}", row);
//! }
//! ```
pub mod metadata;
pub mod properties;
pub mod reader;
Expand Down
55 changes: 55 additions & 0 deletions rust/parquet/src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::{metadata::*, statistics, FOOTER_SIZE, PARQUET_MAGIC};
use crate::record::reader::RowIter;
use crate::record::Row;
use crate::schema::types::{
self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, Type as SchemaType,
};
Expand Down Expand Up @@ -313,6 +314,17 @@ impl<'a> TryFrom<&'a str> for SerializedFileReader<File> {
}
}

/// Conversion into a [`RowIter`](crate::record::reader::RowIter)
/// using the full file schema over all row groups.
impl IntoIterator for SerializedFileReader<File> {
type Item = Row;
type IntoIter = RowIter<'static>;

fn into_iter(self) -> Self::IntoIter {
RowIter::from_file_into(Box::new(self))
}
}

/// A serialized implementation for Parquet [`RowGroupReader`].
pub struct SerializedRowGroupReader<R: ParquetReader> {
buf: BufReader<R>,
Expand Down Expand Up @@ -639,6 +651,8 @@ mod tests {
use parquet_format::TypeDefinedOrder;

use crate::basic::SortOrder;
use crate::record::RowAccessor;
use crate::schema::parser::parse_message_type;
use crate::util::test_common::{get_temp_file, get_test_file, get_test_path};

#[test]
Expand Down Expand Up @@ -796,6 +810,47 @@ mod tests {
assert!(reader.is_err());
}

#[test]
fn test_file_reader_into_iter() -> Result<()> {
let path = get_test_path("alltypes_plain.parquet");
let vec = vec![path.clone(), path.clone()]
.iter()
.map(|p| SerializedFileReader::try_from(p.as_path()).unwrap())
.flat_map(|r| r.into_iter())
.flat_map(|r| r.get_int(0))
.collect::<Vec<_>>();

// rows in the parquet file are not sorted by "id"
// each file contains [id:4, id:5, id:6, id:7, id:2, id:3, id:0, id:1]
assert_eq!(vec, vec![4, 5, 6, 7, 2, 3, 0, 1, 4, 5, 6, 7, 2, 3, 0, 1]);

Ok(())
}

#[test]
fn test_file_reader_into_iter_project() -> Result<()> {
let path = get_test_path("alltypes_plain.parquet");
let result = vec![path]
.iter()
.map(|p| SerializedFileReader::try_from(p.as_path()).unwrap())
.flat_map(|r| {
let schema = "message schema { OPTIONAL INT32 id; }";
let proj = parse_message_type(&schema).ok();

r.into_iter().project(proj).unwrap()
})
.map(|r| format!("{}", r))
.collect::<Vec<_>>()
.join(",");

assert_eq!(
result,
"{id: 4},{id: 5},{id: 6},{id: 7},{id: 2},{id: 3},{id: 0},{id: 1}"
);

Ok(())
}

#[test]
fn test_reuse_file_chunk() {
// This test covers the case of maintaining the correct start position in a file
Expand Down
192 changes: 155 additions & 37 deletions rust/parquet/src/record/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,36 +612,67 @@ impl fmt::Display for Reader {
// ----------------------------------------------------------------------
// Row iterators

/// The enum Either with variants That represet a reference and a box of
/// [`FileReader`](crate::file::reader::FileReader).
enum Either<'a> {
Left(&'a FileReader),
Right(Box<FileReader>),
}

impl<'a> Either<'a> {
fn reader(&self) -> &FileReader {
match *self {
Either::Left(r) => r,
Either::Right(ref r) => &**r,
}
}
}

/// Iterator of [`Row`](crate::record::api::Row)s.
/// It is used either for a single row group to iterate over data in that row group, or
/// an entire file with auto buffering of all row groups.
pub struct RowIter<'a> {
descr: SchemaDescPtr,
tree_builder: TreeBuilder,
file_reader: Option<&'a FileReader>,
file_reader: Option<Either<'a>>,
current_row_group: usize,
num_row_groups: usize,
row_iter: Option<ReaderIter>,
}

impl<'a> RowIter<'a> {
/// Creates a new iterator of [`Row`](crate::record::api::Row)s.
fn new(
file_reader: Option<Either<'a>>,
row_iter: Option<ReaderIter>,
descr: SchemaDescPtr,
) -> Self {
let tree_builder = Self::tree_builder();
let num_row_groups = match file_reader {
Some(ref r) => r.reader().num_row_groups(),
None => 0,
};

Self {
descr,
file_reader,
tree_builder,
num_row_groups,
row_iter: row_iter,
current_row_group: 0,
}
}

/// Creates iterator of [`Row`](crate::record::api::Row)s for all row groups in a
/// file.
pub fn from_file(proj: Option<Type>, reader: &'a FileReader) -> Result<Self> {
let either = Either::Left(reader);
let descr = Self::get_proj_descr(
proj,
reader.metadata().file_metadata().schema_descr_ptr(),
)?;
let num_row_groups = reader.num_row_groups();

Ok(Self {
descr,
tree_builder: Self::tree_builder(),
file_reader: Some(reader),
current_row_group: 0,
num_row_groups,
row_iter: None,
})
Ok(Self::new(Some(either), None, descr))
}

/// Creates iterator of [`Row`](crate::record::api::Row)s for a specific row group.
Expand All @@ -655,21 +686,41 @@ impl<'a> RowIter<'a> {

// For row group we need to set `current_row_group` >= `num_row_groups`, because
// we only have one row group and can't buffer more.
Ok(Self {
descr,
tree_builder,
file_reader: None,
current_row_group: 0,
num_row_groups: 0,
row_iter: Some(row_iter),
})
Ok(Self::new(None, Some(row_iter), descr))
}

/// Returns common tree builder, so the same settings are applied to both iterators
/// from file reader and row group.
#[inline]
fn tree_builder() -> TreeBuilder {
TreeBuilder::new()
/// Creates a iterator of [`Row`](crate::record::api::Row)s from a
/// [`FileReader`](crate::file::reader::FileReader) using the full file schema.
pub fn from_file_into(reader: Box<FileReader>) -> Self {
let either = Either::Right(reader);
let descr = either
.reader()
.metadata()
.file_metadata()
.schema_descr_ptr();

Self::new(Some(either), None, descr)
}

/// Tries to create a iterator of [`Row`](crate::record::api::Row)s using projections.
/// Returns a error if a file reader is not the source of this iterator.
///
/// The Projected schema can be a subset of or equal to the file schema,
/// when it is None, full file schema is assumed.
pub fn project(self, proj: Option<Type>) -> Result<Self> {
match self.file_reader {
Some(ref either) => {
let schema = either
.reader()
.metadata()
.file_metadata()
.schema_descr_ptr();
let descr = Self::get_proj_descr(proj, schema)?;

Ok(Self::new(self.file_reader, None, descr))
}
None => Err(general_err!("File reader is required to use projections")),
}
}

/// Helper method to get schema descriptor for projected schema.
Expand All @@ -691,6 +742,13 @@ impl<'a> RowIter<'a> {
None => Ok(root_descr),
}
}

/// Returns common tree builder, so the same settings are applied to both iterators
/// from file reader and row group.
#[inline]
fn tree_builder() -> TreeBuilder {
TreeBuilder::new()
}
}

impl<'a> Iterator for RowIter<'a> {
Expand All @@ -705,18 +763,21 @@ impl<'a> Iterator for RowIter<'a> {
while row.is_none() && self.current_row_group < self.num_row_groups {
// We do not expect any failures when accessing a row group, and file reader
// must be set for selecting next row group.
let row_group_reader = &*self
.file_reader
.as_ref()
.expect("File reader is required to advance row group")
.get_row_group(self.current_row_group)
.unwrap();
self.current_row_group += 1;
let mut iter = self
.tree_builder
.as_iter(self.descr.clone(), row_group_reader);
row = iter.next();
self.row_iter = Some(iter);
if let Some(ref either) = self.file_reader {
let file_reader = either.reader();
let row_group_reader = &*file_reader
.get_row_group(self.current_row_group)
.expect("Row group is required to advance");

let mut iter = self
.tree_builder
.as_iter(self.descr.clone(), row_group_reader);

row = iter.next();

self.current_row_group += 1;
self.row_iter = Some(iter);
}
}

row
Expand Down Expand Up @@ -759,9 +820,10 @@ mod tests {

use crate::errors::{ParquetError, Result};
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::record::api::{Field, Row};
use crate::record::api::{Field, Row, RowAccessor, RowFormatter};
use crate::schema::parser::parse_message_type;
use crate::util::test_common::get_test_file;
use crate::util::test_common::{get_test_file, get_test_path};
use std::convert::TryFrom;

// Convenient macros to assemble row, list, map, and group.

Expand Down Expand Up @@ -1440,6 +1502,62 @@ mod tests {
test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
}

#[test]
fn test_file_reader_iter() -> Result<()> {
let path = get_test_path("alltypes_plain.parquet");
let vec = vec![path]
.iter()
.map(|p| SerializedFileReader::try_from(p.as_path()).unwrap())
.flat_map(|r| RowIter::from_file_into(Box::new(r)))
.flat_map(|r| r.get_int(0))
.collect::<Vec<_>>();

assert_eq!(vec, vec![4, 5, 6, 7, 2, 3, 0, 1]);

Ok(())
}

#[test]
fn test_file_reader_iter_projection() -> Result<()> {
let path = get_test_path("alltypes_plain.parquet");
let values = vec![path]
.iter()
.map(|p| SerializedFileReader::try_from(p.as_path()).unwrap())
.flat_map(|r| {
let schema = "message schema { OPTIONAL INT32 id; }";
let proj = parse_message_type(&schema).ok();

RowIter::from_file_into(Box::new(r)).project(proj).unwrap()
})
.map(|r| format!("id:{}", r.fmt(0)))
.collect::<Vec<_>>()
.join(", ");

assert_eq!(values, "id:4, id:5, id:6, id:7, id:2, id:3, id:0, id:1");

Ok(())
}

#[test]
fn test_file_reader_iter_projection_err() {
let schema = "
message spark_schema {
REQUIRED INT32 key;
REQUIRED BOOLEAN value;
}
";
let proj = parse_message_type(&schema).ok();
let path = get_test_path("nested_maps.snappy.parquet");
let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
let res = RowIter::from_file_into(Box::new(reader)).project(proj);

assert!(res.is_err());
assert_eq!(
res.err().unwrap(),
general_err!("Root schema does not contain projection")
);
}

#[test]
fn test_tree_reader_handle_repeated_fields_with_no_annotation() {
// Array field `phoneNumbers` does not contain LIST annotation.
Expand Down