diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d8c116f0135b..66780fcd6003 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -33,7 +33,7 @@ use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; use crate::column::page::{PageIterator, PageReader}; #[cfg(feature = "encryption")] -use crate::encryption::decrypt::{CryptoContext, FileDecryptionProperties}; +use crate::encryption::decrypt::FileDecryptionProperties; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::reader::{ChunkReader, SerializedPageReader}; @@ -682,13 +682,11 @@ struct ReaderPageIterator { metadata: Arc, } -impl Iterator for ReaderPageIterator { - type Item = Result>; - - fn next(&mut self) -> Option { - let rg_idx = self.row_groups.next()?; +impl ReaderPageIterator { + /// Return the next SerializedPageReader + fn next_page_reader(&mut self, rg_idx: usize) -> Result> { let rg = self.metadata.row_group(rg_idx); - let meta = rg.column(self.column_idx); + let column_chunk_metadata = rg.column(self.column_idx); let offset_index = self.metadata.offset_index(); // `offset_index` may not exist and `i[rg_idx]` will be empty. // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`. @@ -698,32 +696,25 @@ impl Iterator for ReaderPageIterator { let total_rows = rg.num_rows() as usize; let reader = self.reader.clone(); - #[cfg(feature = "encryption")] - let crypto_context = if let Some(file_decryptor) = self.metadata.file_decryptor() { - match meta.crypto_metadata() { - Some(crypto_metadata) => { - match CryptoContext::for_column( - file_decryptor, - crypto_metadata, - rg_idx, - self.column_idx, - ) { - Ok(context) => Some(Arc::new(context)), - Err(err) => return Some(Err(err)), - } - } - None => None, - } - } else { - None - }; - - let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations); + SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)? + .add_crypto_context( + rg_idx, + self.column_idx, + self.metadata.as_ref(), + column_chunk_metadata, + ) + } +} - #[cfg(feature = "encryption")] - let ret = ret.map(|reader| reader.with_crypto_context(crypto_context)); +impl Iterator for ReaderPageIterator { + type Item = Result>; - Some(ret.map(|x| Box::new(x) as _)) + fn next(&mut self) -> Option { + let rg_idx = self.row_groups.next()?; + let page_reader = self + .next_page_reader(rg_idx) + .map(|page_reader| Box::new(page_reader) as _); + Some(page_reader) } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 4478162e521e..bd7de6f724fe 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -59,9 +59,6 @@ use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHas mod metadata; pub use metadata::*; -#[cfg(feature = "encryption")] -use crate::encryption::decrypt::CryptoContext; - #[cfg(feature = "object_store")] mod store; @@ -1027,6 +1024,7 @@ impl RowGroups for InMemoryRowGroup<'_> { self.row_count } + /// Return chunks for column i fn column_chunks(&self, i: usize) -> Result> { match &self.column_chunks[i] { None => Err(ParquetError::General(format!( @@ -1038,31 +1036,19 @@ impl RowGroups for InMemoryRowGroup<'_> { // filter out empty offset indexes (old versions specified Some(vec![]) when no present) .filter(|index| !index.is_empty()) .map(|index| index[i].page_locations.clone()); - let column_metadata = self.metadata.row_group(self.row_group_idx).column(i); + let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i); let page_reader = SerializedPageReader::new( data.clone(), - column_metadata, + column_chunk_metadata, self.row_count, page_locations, )?; - - #[cfg(feature = "encryption")] - let crypto_context = if let Some(file_decryptor) = self.metadata.file_decryptor() { - match column_metadata.crypto_metadata() { - Some(crypto_metadata) => Some(Arc::new(CryptoContext::for_column( - file_decryptor, - crypto_metadata, - self.row_group_idx, - i, - )?)), - None => None, - } - } else { - None - }; - - #[cfg(feature = "encryption")] - let page_reader = page_reader.with_crypto_context(crypto_context); + let page_reader = page_reader.add_crypto_context( + self.row_group_idx, + i, + self.metadata, + column_chunk_metadata, + )?; let page_reader: Box = Box::new(page_reader); diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 2673f4ac52a7..183c481b242c 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -571,22 +571,51 @@ impl SerializedPageReader { /// Creates a new serialized page reader from a chunk reader and metadata pub fn new( reader: Arc, - meta: &ColumnChunkMetaData, + column_chunk_metadata: &ColumnChunkMetaData, total_rows: usize, page_locations: Option>, ) -> Result { let props = Arc::new(ReaderProperties::builder().build()); - SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props) + SerializedPageReader::new_with_properties( + reader, + column_chunk_metadata, + total_rows, + page_locations, + props, + ) } - /// Adds cryptographical information to the reader. + /// Stub No-op implementation when encryption is disabled. + #[cfg(all(feature = "arrow", not(feature = "encryption")))] + pub(crate) fn add_crypto_context( + self, + _rg_idx: usize, + _column_idx: usize, + _parquet_meta_data: &ParquetMetaData, + _column_chunk_metadata: &ColumnChunkMetaData, + ) -> Result> { + Ok(self) + } + + /// Adds any necessary crypto context to this page reader, if encryption is enabled. #[cfg(feature = "encryption")] - pub(crate) fn with_crypto_context( + pub(crate) fn add_crypto_context( mut self, - crypto_context: Option>, - ) -> Self { - self.crypto_context = crypto_context; - self + rg_idx: usize, + column_idx: usize, + parquet_meta_data: &ParquetMetaData, + column_chunk_metadata: &ColumnChunkMetaData, + ) -> Result> { + let Some(file_decryptor) = parquet_meta_data.file_decryptor() else { + return Ok(self); + }; + let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else { + return Ok(self); + }; + let crypto_context = + CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?; + self.crypto_context = Some(Arc::new(crypto_context)); + Ok(self) } /// Creates a new serialized page with custom options.