diff --git a/rust/parquet/src/file/mod.rs b/rust/parquet/src/file/mod.rs index 407a97d5d6e..6dbf131b9c3 100644 --- a/rust/parquet/src/file/mod.rs +++ b/rust/parquet/src/file/mod.rs @@ -77,7 +77,25 @@ //! assert_eq!(row_group_reader.num_columns(), 1); //! } //! ``` - +//! # Example of reading multiple files +//! +//! ```rust,no_run +//! use parquet::file::reader::SerializedFileReader; +//! use std::convert::TryFrom; +//! +//! let paths = vec![ +//! "/path/to/sample.parquet/part-1.snappy.parquet", +//! "/path/to/sample.parquet/part-2.snappy.parquet" +//! ]; +//! // Create a reader for each file and flat map rows +//! let rows = paths.iter() +//! .map(|p| SerializedFileReader::try_from(*p).unwrap()) +//! .flat_map(|r| r.into_iter()); +//! +//! for row in rows { +//! println!("{}", row); +//! } +//! ``` pub mod metadata; pub mod properties; pub mod reader; diff --git a/rust/parquet/src/file/reader.rs b/rust/parquet/src/file/reader.rs index aa9d03dcae0..c2bfcfcd21e 100644 --- a/rust/parquet/src/file/reader.rs +++ b/rust/parquet/src/file/reader.rs @@ -42,6 +42,7 @@ use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::{metadata::*, statistics, FOOTER_SIZE, PARQUET_MAGIC}; use crate::record::reader::RowIter; +use crate::record::Row; use crate::schema::types::{ self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, Type as SchemaType, }; @@ -313,6 +314,17 @@ impl<'a> TryFrom<&'a str> for SerializedFileReader { } } +/// Conversion into a [`RowIter`](crate::record::reader::RowIter) +/// using the full file schema over all row groups. +impl IntoIterator for SerializedFileReader { + type Item = Row; + type IntoIter = RowIter<'static>; + + fn into_iter(self) -> Self::IntoIter { + RowIter::from_file_into(Box::new(self)) + } +} + /// A serialized implementation for Parquet [`RowGroupReader`]. pub struct SerializedRowGroupReader { buf: BufReader, @@ -639,6 +651,8 @@ mod tests { use parquet_format::TypeDefinedOrder; use crate::basic::SortOrder; + use crate::record::RowAccessor; + use crate::schema::parser::parse_message_type; use crate::util::test_common::{get_temp_file, get_test_file, get_test_path}; #[test] @@ -796,6 +810,47 @@ mod tests { assert!(reader.is_err()); } + #[test] + fn test_file_reader_into_iter() -> Result<()> { + let path = get_test_path("alltypes_plain.parquet"); + let vec = vec![path.clone(), path.clone()] + .iter() + .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) + .flat_map(|r| r.into_iter()) + .flat_map(|r| r.get_int(0)) + .collect::>(); + + // rows in the parquet file are not sorted by "id" + // each file contains [id:4, id:5, id:6, id:7, id:2, id:3, id:0, id:1] + assert_eq!(vec, vec![4, 5, 6, 7, 2, 3, 0, 1, 4, 5, 6, 7, 2, 3, 0, 1]); + + Ok(()) + } + + #[test] + fn test_file_reader_into_iter_project() -> Result<()> { + let path = get_test_path("alltypes_plain.parquet"); + let result = vec![path] + .iter() + .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) + .flat_map(|r| { + let schema = "message schema { OPTIONAL INT32 id; }"; + let proj = parse_message_type(&schema).ok(); + + r.into_iter().project(proj).unwrap() + }) + .map(|r| format!("{}", r)) + .collect::>() + .join(","); + + assert_eq!( + result, + "{id: 4},{id: 5},{id: 6},{id: 7},{id: 2},{id: 3},{id: 0},{id: 1}" + ); + + Ok(()) + } + #[test] fn test_reuse_file_chunk() { // This test covers the case of maintaining the correct start position in a file diff --git a/rust/parquet/src/record/reader.rs b/rust/parquet/src/record/reader.rs index fa8f9b5b5fb..4f225d9918f 100644 --- a/rust/parquet/src/record/reader.rs +++ b/rust/parquet/src/record/reader.rs @@ -612,36 +612,67 @@ impl fmt::Display for Reader { // ---------------------------------------------------------------------- // Row iterators +/// The enum Either with variants That represet a reference and a box of +/// [`FileReader`](crate::file::reader::FileReader). +enum Either<'a> { + Left(&'a FileReader), + Right(Box), +} + +impl<'a> Either<'a> { + fn reader(&self) -> &FileReader { + match *self { + Either::Left(r) => r, + Either::Right(ref r) => &**r, + } + } +} + /// Iterator of [`Row`](crate::record::api::Row)s. /// It is used either for a single row group to iterate over data in that row group, or /// an entire file with auto buffering of all row groups. pub struct RowIter<'a> { descr: SchemaDescPtr, tree_builder: TreeBuilder, - file_reader: Option<&'a FileReader>, + file_reader: Option>, current_row_group: usize, num_row_groups: usize, row_iter: Option, } impl<'a> RowIter<'a> { + /// Creates a new iterator of [`Row`](crate::record::api::Row)s. + fn new( + file_reader: Option>, + row_iter: Option, + descr: SchemaDescPtr, + ) -> Self { + let tree_builder = Self::tree_builder(); + let num_row_groups = match file_reader { + Some(ref r) => r.reader().num_row_groups(), + None => 0, + }; + + Self { + descr, + file_reader, + tree_builder, + num_row_groups, + row_iter: row_iter, + current_row_group: 0, + } + } + /// Creates iterator of [`Row`](crate::record::api::Row)s for all row groups in a /// file. pub fn from_file(proj: Option, reader: &'a FileReader) -> Result { + let either = Either::Left(reader); let descr = Self::get_proj_descr( proj, reader.metadata().file_metadata().schema_descr_ptr(), )?; - let num_row_groups = reader.num_row_groups(); - Ok(Self { - descr, - tree_builder: Self::tree_builder(), - file_reader: Some(reader), - current_row_group: 0, - num_row_groups, - row_iter: None, - }) + Ok(Self::new(Some(either), None, descr)) } /// Creates iterator of [`Row`](crate::record::api::Row)s for a specific row group. @@ -655,21 +686,41 @@ impl<'a> RowIter<'a> { // For row group we need to set `current_row_group` >= `num_row_groups`, because // we only have one row group and can't buffer more. - Ok(Self { - descr, - tree_builder, - file_reader: None, - current_row_group: 0, - num_row_groups: 0, - row_iter: Some(row_iter), - }) + Ok(Self::new(None, Some(row_iter), descr)) } - /// Returns common tree builder, so the same settings are applied to both iterators - /// from file reader and row group. - #[inline] - fn tree_builder() -> TreeBuilder { - TreeBuilder::new() + /// Creates a iterator of [`Row`](crate::record::api::Row)s from a + /// [`FileReader`](crate::file::reader::FileReader) using the full file schema. + pub fn from_file_into(reader: Box) -> Self { + let either = Either::Right(reader); + let descr = either + .reader() + .metadata() + .file_metadata() + .schema_descr_ptr(); + + Self::new(Some(either), None, descr) + } + + /// Tries to create a iterator of [`Row`](crate::record::api::Row)s using projections. + /// Returns a error if a file reader is not the source of this iterator. + /// + /// The Projected schema can be a subset of or equal to the file schema, + /// when it is None, full file schema is assumed. + pub fn project(self, proj: Option) -> Result { + match self.file_reader { + Some(ref either) => { + let schema = either + .reader() + .metadata() + .file_metadata() + .schema_descr_ptr(); + let descr = Self::get_proj_descr(proj, schema)?; + + Ok(Self::new(self.file_reader, None, descr)) + } + None => Err(general_err!("File reader is required to use projections")), + } } /// Helper method to get schema descriptor for projected schema. @@ -691,6 +742,13 @@ impl<'a> RowIter<'a> { None => Ok(root_descr), } } + + /// Returns common tree builder, so the same settings are applied to both iterators + /// from file reader and row group. + #[inline] + fn tree_builder() -> TreeBuilder { + TreeBuilder::new() + } } impl<'a> Iterator for RowIter<'a> { @@ -705,18 +763,21 @@ impl<'a> Iterator for RowIter<'a> { while row.is_none() && self.current_row_group < self.num_row_groups { // We do not expect any failures when accessing a row group, and file reader // must be set for selecting next row group. - let row_group_reader = &*self - .file_reader - .as_ref() - .expect("File reader is required to advance row group") - .get_row_group(self.current_row_group) - .unwrap(); - self.current_row_group += 1; - let mut iter = self - .tree_builder - .as_iter(self.descr.clone(), row_group_reader); - row = iter.next(); - self.row_iter = Some(iter); + if let Some(ref either) = self.file_reader { + let file_reader = either.reader(); + let row_group_reader = &*file_reader + .get_row_group(self.current_row_group) + .expect("Row group is required to advance"); + + let mut iter = self + .tree_builder + .as_iter(self.descr.clone(), row_group_reader); + + row = iter.next(); + + self.current_row_group += 1; + self.row_iter = Some(iter); + } } row @@ -759,9 +820,10 @@ mod tests { use crate::errors::{ParquetError, Result}; use crate::file::reader::{FileReader, SerializedFileReader}; - use crate::record::api::{Field, Row}; + use crate::record::api::{Field, Row, RowAccessor, RowFormatter}; use crate::schema::parser::parse_message_type; - use crate::util::test_common::get_test_file; + use crate::util::test_common::{get_test_file, get_test_path}; + use std::convert::TryFrom; // Convenient macros to assemble row, list, map, and group. @@ -1440,6 +1502,62 @@ mod tests { test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap(); } + #[test] + fn test_file_reader_iter() -> Result<()> { + let path = get_test_path("alltypes_plain.parquet"); + let vec = vec![path] + .iter() + .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) + .flat_map(|r| RowIter::from_file_into(Box::new(r))) + .flat_map(|r| r.get_int(0)) + .collect::>(); + + assert_eq!(vec, vec![4, 5, 6, 7, 2, 3, 0, 1]); + + Ok(()) + } + + #[test] + fn test_file_reader_iter_projection() -> Result<()> { + let path = get_test_path("alltypes_plain.parquet"); + let values = vec![path] + .iter() + .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) + .flat_map(|r| { + let schema = "message schema { OPTIONAL INT32 id; }"; + let proj = parse_message_type(&schema).ok(); + + RowIter::from_file_into(Box::new(r)).project(proj).unwrap() + }) + .map(|r| format!("id:{}", r.fmt(0))) + .collect::>() + .join(", "); + + assert_eq!(values, "id:4, id:5, id:6, id:7, id:2, id:3, id:0, id:1"); + + Ok(()) + } + + #[test] + fn test_file_reader_iter_projection_err() { + let schema = " + message spark_schema { + REQUIRED INT32 key; + REQUIRED BOOLEAN value; + } + "; + let proj = parse_message_type(&schema).ok(); + let path = get_test_path("nested_maps.snappy.parquet"); + let reader = SerializedFileReader::try_from(path.as_path()).unwrap(); + let res = RowIter::from_file_into(Box::new(reader)).project(proj); + + assert!(res.is_err()); + assert_eq!( + res.err().unwrap(), + general_err!("Root schema does not contain projection") + ); + } + #[test] fn test_tree_reader_handle_repeated_fields_with_no_annotation() { // Array field `phoneNumbers` does not contain LIST annotation.