From 62eeb30990fb68d0346f39fbe9e1c8316b0f9be3 Mon Sep 17 00:00:00 2001 From: "Fabio B. Silva" Date: Wed, 15 May 2019 18:34:58 -0400 Subject: [PATCH 1/3] ARROW-5317: [Rust] [Parquet] impl IntoIterator for SerializedFileReader --- rust/parquet/src/file/mod.rs | 20 ++- rust/parquet/src/file/reader.rs | 30 ++++- rust/parquet/src/record/reader.rs | 204 ++++++++++++++++++++++++++---- 3 files changed, 228 insertions(+), 26 deletions(-) diff --git a/rust/parquet/src/file/mod.rs b/rust/parquet/src/file/mod.rs index 407a97d5d6e..6dbf131b9c3 100644 --- a/rust/parquet/src/file/mod.rs +++ b/rust/parquet/src/file/mod.rs @@ -77,7 +77,25 @@ //! assert_eq!(row_group_reader.num_columns(), 1); //! } //! ``` - +//! # Example of reading multiple files +//! +//! ```rust,no_run +//! use parquet::file::reader::SerializedFileReader; +//! use std::convert::TryFrom; +//! +//! let paths = vec![ +//! "/path/to/sample.parquet/part-1.snappy.parquet", +//! "/path/to/sample.parquet/part-2.snappy.parquet" +//! ]; +//! // Create a reader for each file and flat map rows +//! let rows = paths.iter() +//! .map(|p| SerializedFileReader::try_from(*p).unwrap()) +//! .flat_map(|r| r.into_iter()); +//! +//! for row in rows { +//! println!("{}", row); +//! } +//! ``` pub mod metadata; pub mod properties; pub mod reader; diff --git a/rust/parquet/src/file/reader.rs b/rust/parquet/src/file/reader.rs index aa9d03dcae0..2abaca0d539 100644 --- a/rust/parquet/src/file/reader.rs +++ b/rust/parquet/src/file/reader.rs @@ -41,7 +41,8 @@ use crate::column::{ use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::{metadata::*, statistics, FOOTER_SIZE, PARQUET_MAGIC}; -use crate::record::reader::RowIter; +use crate::record::reader::{Iter, RowIter}; +use crate::record::Row; use crate::schema::types::{ self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, Type as SchemaType, }; @@ -313,6 +314,15 @@ impl<'a> TryFrom<&'a str> for SerializedFileReader { } } +impl IntoIterator for SerializedFileReader { + type Item = Row; + type IntoIter = Iter>; + + fn into_iter(self) -> Self::IntoIter { + Iter::from(self) + } +} + /// A serialized implementation for Parquet [`RowGroupReader`]. pub struct SerializedRowGroupReader { buf: BufReader, @@ -639,6 +649,7 @@ mod tests { use parquet_format::TypeDefinedOrder; use crate::basic::SortOrder; + use crate::record::RowAccessor; use crate::util::test_common::{get_temp_file, get_test_file, get_test_path}; #[test] @@ -796,6 +807,23 @@ mod tests { assert!(reader.is_err()); } + #[test] + fn test_file_reader_into_iter() -> Result<()> { + let path = get_test_path("alltypes_plain.parquet"); + let mut vec = vec![path.clone(), path.clone()] + .iter() + .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) + .flat_map(|r| r.into_iter()) + .flat_map(|r| r.get_int(0)) + .collect::>(); + + vec.sort(); + + assert_eq!(vec, vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7]); + + Ok(()) + } + #[test] fn test_reuse_file_chunk() { // This test covers the case of maintaining the correct start position in a file diff --git a/rust/parquet/src/record/reader.rs b/rust/parquet/src/record/reader.rs index fa8f9b5b5fb..c8725449bc1 100644 --- a/rust/parquet/src/record/reader.rs +++ b/rust/parquet/src/record/reader.rs @@ -612,6 +612,26 @@ impl fmt::Display for Reader { // ---------------------------------------------------------------------- // Row iterators +/// Helper method to get schema descriptor for projected schema. +/// If projection is None, then full schema is returned. +#[inline] +fn get_projection_descr( + proj: Option, + root_descr: SchemaDescPtr, +) -> Result { + match proj { + Some(projection) => { + // check if projection is part of file schema + let root_schema = root_descr.root_schema(); + if !root_schema.check_contains(&projection) { + return Err(general_err!("Root schema does not contain projection")); + } + Ok(Rc::new(SchemaDescriptor::new(Rc::new(projection)))) + } + None => Ok(root_descr), + } +} + /// Iterator of [`Row`](crate::record::api::Row)s. /// It is used either for a single row group to iterate over data in that row group, or /// an entire file with auto buffering of all row groups. @@ -628,7 +648,7 @@ impl<'a> RowIter<'a> { /// Creates iterator of [`Row`](crate::record::api::Row)s for all row groups in a /// file. pub fn from_file(proj: Option, reader: &'a FileReader) -> Result { - let descr = Self::get_proj_descr( + let descr = get_projection_descr( proj, reader.metadata().file_metadata().schema_descr_ptr(), )?; @@ -649,7 +669,7 @@ impl<'a> RowIter<'a> { proj: Option, reader: &'a RowGroupReader, ) -> Result { - let descr = Self::get_proj_descr(proj, reader.metadata().schema_descr_ptr())?; + let descr = get_projection_descr(proj, reader.metadata().schema_descr_ptr())?; let tree_builder = Self::tree_builder(); let row_iter = tree_builder.as_iter(descr.clone(), reader); @@ -671,26 +691,6 @@ impl<'a> RowIter<'a> { fn tree_builder() -> TreeBuilder { TreeBuilder::new() } - - /// Helper method to get schema descriptor for projected schema. - /// If projection is None, then full schema is returned. - #[inline] - fn get_proj_descr( - proj: Option, - root_descr: SchemaDescPtr, - ) -> Result { - match proj { - Some(projection) => { - // check if projection is part of file schema - let root_schema = root_descr.root_schema(); - if !root_schema.check_contains(&projection) { - return Err(general_err!("Root schema does not contain projection")); - } - Ok(Rc::new(SchemaDescriptor::new(Rc::new(projection)))) - } - None => Ok(root_descr), - } - } } impl<'a> Iterator for RowIter<'a> { @@ -723,6 +723,103 @@ impl<'a> Iterator for RowIter<'a> { } } +/// Iterator of [`Row`](crate::record::api::Row)s (over all row groups). +/// Takes ownership of the [`FileReader`](crate::file::reader::FileReader). +pub struct Iter { + reader: T, + descr: SchemaDescPtr, + num_row_groups: usize, + current_row_group: usize, + tree_builder: TreeBuilder, + row_iter: Option, +} + +impl Iter +where + T: FileReader, +{ + /// Creates a new iterator of [`Row`](crate::record::api::Row)s. + fn new(reader: T, descr: SchemaDescPtr) -> Self { + let num_row_groups = reader.num_row_groups(); + let tree_builder = TreeBuilder::new(); + + Self { + descr, + reader, + tree_builder, + num_row_groups, + row_iter: None, + current_row_group: 0, + } + } + + /// Tries to create a iterator of [`Row`](crate::record::api::Row)s + /// for a [`FileReader`](crate::file::reader::FileReader) using projections. + /// + /// The Projected schema can be a subset of or equal to the file schema, + /// when it is None, full file schema is assumed. + pub fn from_projection(reader: T, proj: Option) -> Result { + let meta = reader.metadata().file_metadata(); + let schema = meta.schema_descr_ptr(); + let descr = get_projection_descr(proj, schema)?; + + Ok(Self::new(reader, descr)) + } +} + +/// Iterator of [`Row`](crate::record::api::Row)s using the full file schema. +impl From for Iter +where + T: FileReader, +{ + fn from(reader: T) -> Self { + let meta = reader.metadata().file_metadata(); + let descr = meta.schema_descr_ptr(); + + Self::new(reader, descr) + } +} + +impl Iterator for Iter +where + T: FileReader, +{ + type Item = Row; + + fn next(&mut self) -> Option { + let mut row = None; + + if let Some(ref mut iter) = self.row_iter { + row = iter.next(); + } + + if row.is_some() { + return row; + } + + if self.current_row_group >= self.num_row_groups { + return None; + } + + // We do not expect any failures when accessing a row group. + let row_group_reader = &*self + .reader + .get_row_group(self.current_row_group) + .expect("Row group is required to advance"); + + let mut iter = self + .tree_builder + .as_iter(self.descr.clone(), row_group_reader); + + row = iter.next(); + + self.row_iter = Some(iter); + self.current_row_group += 1; + + row + } +} + /// Internal iterator of [`Row`](crate::record::api::Row)s for a reader. pub struct ReaderIter { root_reader: Reader, @@ -759,9 +856,10 @@ mod tests { use crate::errors::{ParquetError, Result}; use crate::file::reader::{FileReader, SerializedFileReader}; - use crate::record::api::{Field, Row}; + use crate::record::api::{Field, Row, RowAccessor, RowFormatter}; use crate::schema::parser::parse_message_type; - use crate::util::test_common::get_test_file; + use crate::util::test_common::{get_test_file, get_test_path}; + use std::convert::TryFrom; // Convenient macros to assemble row, list, map, and group. @@ -1440,6 +1538,64 @@ mod tests { test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap(); } + #[test] + fn test_file_reader_iter() -> Result<()> { + let path = get_test_path("alltypes_plain.parquet"); + let mut vec = vec![path] + .iter() + .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) + .flat_map(|r| Iter::from(r)) + .flat_map(|r| r.get_int(0)) + .collect::>(); + + vec.sort(); + + assert_eq!(vec, vec![0, 1, 2, 3, 4, 5, 6, 7]); + + Ok(()) + } + + #[test] + fn test_file_reader_iter_projection() -> Result<()> { + let path = get_test_path("alltypes_plain.parquet"); + let values = vec![path] + .iter() + .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) + .flat_map(|r| { + let schema = "message schema { OPTIONAL INT32 id; }"; + let proj = parse_message_type(&schema).ok(); + + Iter::from_projection(r, proj).unwrap() + }) + .map(|r| format!("id:{}", r.fmt(0))) + .collect::>() + .join(", "); + + assert_eq!(values, "id:4, id:5, id:6, id:7, id:2, id:3, id:0, id:1"); + + Ok(()) + } + + #[test] + fn test_file_reader_iter_projection_err() { + let schema = " + message spark_schema { + REQUIRED INT32 key; + REQUIRED BOOLEAN value; + } + "; + let proj = parse_message_type(&schema).ok(); + let path = get_test_path("nested_maps.snappy.parquet"); + let reader = SerializedFileReader::try_from(path.as_path()).unwrap(); + let res = Iter::from_projection(reader, proj); + + assert!(res.is_err()); + assert_eq!( + res.err().unwrap(), + general_err!("Root schema does not contain projection") + ); + } + #[test] fn test_tree_reader_handle_repeated_fields_with_no_annotation() { // Array field `phoneNumbers` does not contain LIST annotation. From 4643440c73df7e4b635c704590e617776877576a Mon Sep 17 00:00:00 2001 From: "Fabio B. Silva" Date: Thu, 16 May 2019 17:55:36 -0400 Subject: [PATCH 2/3] ARROW-5317: [Rust] [Parquet] - Not to sort parquet values before assertions --- rust/parquet/src/file/reader.rs | 8 ++++---- rust/parquet/src/record/reader.rs | 6 ++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/rust/parquet/src/file/reader.rs b/rust/parquet/src/file/reader.rs index 2abaca0d539..8a47b248cc0 100644 --- a/rust/parquet/src/file/reader.rs +++ b/rust/parquet/src/file/reader.rs @@ -810,16 +810,16 @@ mod tests { #[test] fn test_file_reader_into_iter() -> Result<()> { let path = get_test_path("alltypes_plain.parquet"); - let mut vec = vec![path.clone(), path.clone()] + let vec = vec![path.clone(), path.clone()] .iter() .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) .flat_map(|r| r.into_iter()) .flat_map(|r| r.get_int(0)) .collect::>(); - vec.sort(); - - assert_eq!(vec, vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7]); + // rows in the parquet file are not sorted by "id" + // each file contains [id:4, id:5, id:6, id:7, id:2, id:3, id:0, id:1] + assert_eq!(vec, vec![4, 5, 6, 7, 2, 3, 0, 1, 4, 5, 6, 7, 2, 3, 0, 1]); Ok(()) } diff --git a/rust/parquet/src/record/reader.rs b/rust/parquet/src/record/reader.rs index c8725449bc1..2715d4efd57 100644 --- a/rust/parquet/src/record/reader.rs +++ b/rust/parquet/src/record/reader.rs @@ -1541,16 +1541,14 @@ mod tests { #[test] fn test_file_reader_iter() -> Result<()> { let path = get_test_path("alltypes_plain.parquet"); - let mut vec = vec![path] + let vec = vec![path] .iter() .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) .flat_map(|r| Iter::from(r)) .flat_map(|r| r.get_int(0)) .collect::>(); - vec.sort(); - - assert_eq!(vec, vec![0, 1, 2, 3, 4, 5, 6, 7]); + assert_eq!(vec, vec![4, 5, 6, 7, 2, 3, 0, 1]); Ok(()) } From 7817ee76e8cbe5dde4b675dd6dce010fd9dfcc28 Mon Sep 17 00:00:00 2001 From: FabioBatSilva Date: Fri, 17 May 2019 22:52:33 -0400 Subject: [PATCH 3/3] ARROW-5317: [Rust] [Parquet] - Use existing RowIter --- rust/parquet/src/file/reader.rs | 33 +++- rust/parquet/src/record/reader.rs | 254 +++++++++++++----------------- 2 files changed, 139 insertions(+), 148 deletions(-) diff --git a/rust/parquet/src/file/reader.rs b/rust/parquet/src/file/reader.rs index 8a47b248cc0..c2bfcfcd21e 100644 --- a/rust/parquet/src/file/reader.rs +++ b/rust/parquet/src/file/reader.rs @@ -41,7 +41,7 @@ use crate::column::{ use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::{metadata::*, statistics, FOOTER_SIZE, PARQUET_MAGIC}; -use crate::record::reader::{Iter, RowIter}; +use crate::record::reader::RowIter; use crate::record::Row; use crate::schema::types::{ self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, Type as SchemaType, @@ -314,12 +314,14 @@ impl<'a> TryFrom<&'a str> for SerializedFileReader { } } +/// Conversion into a [`RowIter`](crate::record::reader::RowIter) +/// using the full file schema over all row groups. impl IntoIterator for SerializedFileReader { type Item = Row; - type IntoIter = Iter>; + type IntoIter = RowIter<'static>; fn into_iter(self) -> Self::IntoIter { - Iter::from(self) + RowIter::from_file_into(Box::new(self)) } } @@ -650,6 +652,7 @@ mod tests { use crate::basic::SortOrder; use crate::record::RowAccessor; + use crate::schema::parser::parse_message_type; use crate::util::test_common::{get_temp_file, get_test_file, get_test_path}; #[test] @@ -824,6 +827,30 @@ mod tests { Ok(()) } + #[test] + fn test_file_reader_into_iter_project() -> Result<()> { + let path = get_test_path("alltypes_plain.parquet"); + let result = vec![path] + .iter() + .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) + .flat_map(|r| { + let schema = "message schema { OPTIONAL INT32 id; }"; + let proj = parse_message_type(&schema).ok(); + + r.into_iter().project(proj).unwrap() + }) + .map(|r| format!("{}", r)) + .collect::>() + .join(","); + + assert_eq!( + result, + "{id: 4},{id: 5},{id: 6},{id: 7},{id: 2},{id: 3},{id: 0},{id: 1}" + ); + + Ok(()) + } + #[test] fn test_reuse_file_chunk() { // This test covers the case of maintaining the correct start position in a file diff --git a/rust/parquet/src/record/reader.rs b/rust/parquet/src/record/reader.rs index 2715d4efd57..4f225d9918f 100644 --- a/rust/parquet/src/record/reader.rs +++ b/rust/parquet/src/record/reader.rs @@ -612,23 +612,19 @@ impl fmt::Display for Reader { // ---------------------------------------------------------------------- // Row iterators -/// Helper method to get schema descriptor for projected schema. -/// If projection is None, then full schema is returned. -#[inline] -fn get_projection_descr( - proj: Option, - root_descr: SchemaDescPtr, -) -> Result { - match proj { - Some(projection) => { - // check if projection is part of file schema - let root_schema = root_descr.root_schema(); - if !root_schema.check_contains(&projection) { - return Err(general_err!("Root schema does not contain projection")); - } - Ok(Rc::new(SchemaDescriptor::new(Rc::new(projection)))) +/// The enum Either with variants That represet a reference and a box of +/// [`FileReader`](crate::file::reader::FileReader). +enum Either<'a> { + Left(&'a FileReader), + Right(Box), +} + +impl<'a> Either<'a> { + fn reader(&self) -> &FileReader { + match *self { + Either::Left(r) => r, + Either::Right(ref r) => &**r, } - None => Ok(root_descr), } } @@ -638,30 +634,45 @@ fn get_projection_descr( pub struct RowIter<'a> { descr: SchemaDescPtr, tree_builder: TreeBuilder, - file_reader: Option<&'a FileReader>, + file_reader: Option>, current_row_group: usize, num_row_groups: usize, row_iter: Option, } impl<'a> RowIter<'a> { + /// Creates a new iterator of [`Row`](crate::record::api::Row)s. + fn new( + file_reader: Option>, + row_iter: Option, + descr: SchemaDescPtr, + ) -> Self { + let tree_builder = Self::tree_builder(); + let num_row_groups = match file_reader { + Some(ref r) => r.reader().num_row_groups(), + None => 0, + }; + + Self { + descr, + file_reader, + tree_builder, + num_row_groups, + row_iter: row_iter, + current_row_group: 0, + } + } + /// Creates iterator of [`Row`](crate::record::api::Row)s for all row groups in a /// file. pub fn from_file(proj: Option, reader: &'a FileReader) -> Result { - let descr = get_projection_descr( + let either = Either::Left(reader); + let descr = Self::get_proj_descr( proj, reader.metadata().file_metadata().schema_descr_ptr(), )?; - let num_row_groups = reader.num_row_groups(); - Ok(Self { - descr, - tree_builder: Self::tree_builder(), - file_reader: Some(reader), - current_row_group: 0, - num_row_groups, - row_iter: None, - }) + Ok(Self::new(Some(either), None, descr)) } /// Creates iterator of [`Row`](crate::record::api::Row)s for a specific row group. @@ -669,20 +680,67 @@ impl<'a> RowIter<'a> { proj: Option, reader: &'a RowGroupReader, ) -> Result { - let descr = get_projection_descr(proj, reader.metadata().schema_descr_ptr())?; + let descr = Self::get_proj_descr(proj, reader.metadata().schema_descr_ptr())?; let tree_builder = Self::tree_builder(); let row_iter = tree_builder.as_iter(descr.clone(), reader); // For row group we need to set `current_row_group` >= `num_row_groups`, because // we only have one row group and can't buffer more. - Ok(Self { - descr, - tree_builder, - file_reader: None, - current_row_group: 0, - num_row_groups: 0, - row_iter: Some(row_iter), - }) + Ok(Self::new(None, Some(row_iter), descr)) + } + + /// Creates a iterator of [`Row`](crate::record::api::Row)s from a + /// [`FileReader`](crate::file::reader::FileReader) using the full file schema. + pub fn from_file_into(reader: Box) -> Self { + let either = Either::Right(reader); + let descr = either + .reader() + .metadata() + .file_metadata() + .schema_descr_ptr(); + + Self::new(Some(either), None, descr) + } + + /// Tries to create a iterator of [`Row`](crate::record::api::Row)s using projections. + /// Returns a error if a file reader is not the source of this iterator. + /// + /// The Projected schema can be a subset of or equal to the file schema, + /// when it is None, full file schema is assumed. + pub fn project(self, proj: Option) -> Result { + match self.file_reader { + Some(ref either) => { + let schema = either + .reader() + .metadata() + .file_metadata() + .schema_descr_ptr(); + let descr = Self::get_proj_descr(proj, schema)?; + + Ok(Self::new(self.file_reader, None, descr)) + } + None => Err(general_err!("File reader is required to use projections")), + } + } + + /// Helper method to get schema descriptor for projected schema. + /// If projection is None, then full schema is returned. + #[inline] + fn get_proj_descr( + proj: Option, + root_descr: SchemaDescPtr, + ) -> Result { + match proj { + Some(projection) => { + // check if projection is part of file schema + let root_schema = root_descr.root_schema(); + if !root_schema.check_contains(&projection) { + return Err(general_err!("Root schema does not contain projection")); + } + Ok(Rc::new(SchemaDescriptor::new(Rc::new(projection)))) + } + None => Ok(root_descr), + } } /// Returns common tree builder, so the same settings are applied to both iterators @@ -705,117 +763,23 @@ impl<'a> Iterator for RowIter<'a> { while row.is_none() && self.current_row_group < self.num_row_groups { // We do not expect any failures when accessing a row group, and file reader // must be set for selecting next row group. - let row_group_reader = &*self - .file_reader - .as_ref() - .expect("File reader is required to advance row group") - .get_row_group(self.current_row_group) - .unwrap(); - self.current_row_group += 1; - let mut iter = self - .tree_builder - .as_iter(self.descr.clone(), row_group_reader); - row = iter.next(); - self.row_iter = Some(iter); - } + if let Some(ref either) = self.file_reader { + let file_reader = either.reader(); + let row_group_reader = &*file_reader + .get_row_group(self.current_row_group) + .expect("Row group is required to advance"); - row - } -} + let mut iter = self + .tree_builder + .as_iter(self.descr.clone(), row_group_reader); -/// Iterator of [`Row`](crate::record::api::Row)s (over all row groups). -/// Takes ownership of the [`FileReader`](crate::file::reader::FileReader). -pub struct Iter { - reader: T, - descr: SchemaDescPtr, - num_row_groups: usize, - current_row_group: usize, - tree_builder: TreeBuilder, - row_iter: Option, -} - -impl Iter -where - T: FileReader, -{ - /// Creates a new iterator of [`Row`](crate::record::api::Row)s. - fn new(reader: T, descr: SchemaDescPtr) -> Self { - let num_row_groups = reader.num_row_groups(); - let tree_builder = TreeBuilder::new(); - - Self { - descr, - reader, - tree_builder, - num_row_groups, - row_iter: None, - current_row_group: 0, - } - } + row = iter.next(); - /// Tries to create a iterator of [`Row`](crate::record::api::Row)s - /// for a [`FileReader`](crate::file::reader::FileReader) using projections. - /// - /// The Projected schema can be a subset of or equal to the file schema, - /// when it is None, full file schema is assumed. - pub fn from_projection(reader: T, proj: Option) -> Result { - let meta = reader.metadata().file_metadata(); - let schema = meta.schema_descr_ptr(); - let descr = get_projection_descr(proj, schema)?; - - Ok(Self::new(reader, descr)) - } -} - -/// Iterator of [`Row`](crate::record::api::Row)s using the full file schema. -impl From for Iter -where - T: FileReader, -{ - fn from(reader: T) -> Self { - let meta = reader.metadata().file_metadata(); - let descr = meta.schema_descr_ptr(); - - Self::new(reader, descr) - } -} - -impl Iterator for Iter -where - T: FileReader, -{ - type Item = Row; - - fn next(&mut self) -> Option { - let mut row = None; - - if let Some(ref mut iter) = self.row_iter { - row = iter.next(); - } - - if row.is_some() { - return row; - } - - if self.current_row_group >= self.num_row_groups { - return None; + self.current_row_group += 1; + self.row_iter = Some(iter); + } } - // We do not expect any failures when accessing a row group. - let row_group_reader = &*self - .reader - .get_row_group(self.current_row_group) - .expect("Row group is required to advance"); - - let mut iter = self - .tree_builder - .as_iter(self.descr.clone(), row_group_reader); - - row = iter.next(); - - self.row_iter = Some(iter); - self.current_row_group += 1; - row } } @@ -1544,7 +1508,7 @@ mod tests { let vec = vec![path] .iter() .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) - .flat_map(|r| Iter::from(r)) + .flat_map(|r| RowIter::from_file_into(Box::new(r))) .flat_map(|r| r.get_int(0)) .collect::>(); @@ -1563,7 +1527,7 @@ mod tests { let schema = "message schema { OPTIONAL INT32 id; }"; let proj = parse_message_type(&schema).ok(); - Iter::from_projection(r, proj).unwrap() + RowIter::from_file_into(Box::new(r)).project(proj).unwrap() }) .map(|r| format!("id:{}", r.fmt(0))) .collect::>() @@ -1585,7 +1549,7 @@ mod tests { let proj = parse_message_type(&schema).ok(); let path = get_test_path("nested_maps.snappy.parquet"); let reader = SerializedFileReader::try_from(path.as_path()).unwrap(); - let res = Iter::from_projection(reader, proj); + let res = RowIter::from_file_into(Box::new(reader)).project(proj); assert!(res.is_err()); assert_eq!(