diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index 44911fcf2a9ca..ca4d097c37a44 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. +use crate::sort::reverse_row_selection; use datafusion_common::{Result, assert_eq_or_internal_err}; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; -use parquet::file::metadata::RowGroupMetaData; +use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; /// A selection of rows and row groups within a ParquetFile to decode. /// @@ -337,6 +338,64 @@ impl ParquetAccessPlan { pub fn into_inner(self) -> Vec { self.row_groups } + + /// Prepare this plan and resolve to the final `PreparedAccessPlan` + pub(crate) fn prepare( + self, + row_group_meta_data: &[RowGroupMetaData], + ) -> Result { + let row_group_indexes = self.row_group_indexes(); + let row_selection = self.into_overall_row_selection(row_group_meta_data)?; + + PreparedAccessPlan::new(row_group_indexes, row_selection) + } +} + +/// Represents a prepared, fully resolved [`ParquetAccessPlan`] +/// +/// The [`RowSelection`] represents the result of applying all pruning such as +/// user provided scans, Row Group statistics, DataPage statistics, and Bloom +/// Filters. +/// +/// This plan is what is passed to the parquet reader +pub(crate) struct PreparedAccessPlan { + /// Row group indexes to read + pub(crate) row_group_indexes: Vec, + /// Optional row selection for filtering within row groups + pub(crate) row_selection: Option, +} + +impl PreparedAccessPlan { + /// Create a new prepared access plan + fn new( + row_group_indexes: Vec, + row_selection: Option, + ) -> Result { + Ok(Self { + row_group_indexes, + row_selection, + }) + } + + /// Reverse the access plan for reverse scanning + pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result { + // Get the row group indexes before reversing + let row_groups_to_scan = self.row_group_indexes.clone(); + + // Reverse the row group indexes + self.row_group_indexes = self.row_group_indexes.into_iter().rev().collect(); + + // If we have a row selection, reverse it to match the new row group order + if let Some(row_selection) = self.row_selection { + self.row_selection = Some(reverse_row_selection( + &row_selection, + file_metadata, + &row_groups_to_scan, // Pass the original (non-reversed) row group indexes + )?); + } + + Ok(self) + } } #[cfg(test)] diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 0d8e825a892c5..bb330c3f4caa1 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -52,7 +52,6 @@ use datafusion_physical_plan::metrics::{ }; use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate}; -use crate::sort::reverse_row_selection; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; #[cfg(feature = "parquet_encryption")] @@ -67,7 +66,7 @@ use parquet::arrow::arrow_reader::{ use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; -use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData}; +use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { @@ -125,53 +124,6 @@ pub(super) struct ParquetOpener { pub reverse_row_groups: bool, } -/// Represents a prepared access plan with optional row selection -pub(crate) struct PreparedAccessPlan { - /// Row group indexes to read - pub(crate) row_group_indexes: Vec, - /// Optional row selection for filtering within row groups - pub(crate) row_selection: Option, -} - -impl PreparedAccessPlan { - /// Create a new prepared access plan from a ParquetAccessPlan - pub(crate) fn from_access_plan( - access_plan: ParquetAccessPlan, - rg_metadata: &[RowGroupMetaData], - ) -> Result { - let row_group_indexes = access_plan.row_group_indexes(); - let row_selection = access_plan.into_overall_row_selection(rg_metadata)?; - - Ok(Self { - row_group_indexes, - row_selection, - }) - } - - /// Reverse the access plan for reverse scanning - pub(crate) fn reverse( - mut self, - file_metadata: &parquet::file::metadata::ParquetMetaData, - ) -> Result { - // Get the row group indexes before reversing - let row_groups_to_scan = self.row_group_indexes.clone(); - - // Reverse the row group indexes - self.row_group_indexes = self.row_group_indexes.into_iter().rev().collect(); - - // If we have a row selection, reverse it to match the new row group order - if let Some(row_selection) = self.row_selection { - self.row_selection = Some(reverse_row_selection( - &row_selection, - file_metadata, - &row_groups_to_scan, // Pass the original (non-reversed) row group indexes - )?); - } - - Ok(self) - } -} - impl FileOpener for ParquetOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { // ----------------------------------- @@ -545,8 +497,7 @@ impl FileOpener for ParquetOpener { } // Prepare the access plan (extract row groups and row selection) - let mut prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)?; + let mut prepared_plan = access_plan.prepare(rg_metadata)?; // ---------------------------------------------------------- // Step: potentially reverse the access plan for performance. diff --git a/datafusion/datasource-parquet/src/sort.rs b/datafusion/datasource-parquet/src/sort.rs index abc50eeb317d5..db22363aa3746 100644 --- a/datafusion/datasource-parquet/src/sort.rs +++ b/datafusion/datasource-parquet/src/sort.rs @@ -122,7 +122,6 @@ pub fn reverse_row_selection( mod tests { use crate::ParquetAccessPlan; use crate::RowGroupAccess; - use crate::opener::PreparedAccessPlan; use arrow::datatypes::{DataType, Field, Schema}; use bytes::Bytes; use parquet::arrow::ArrowWriter; @@ -169,9 +168,9 @@ mod tests { let access_plan = ParquetAccessPlan::new_all(3); let rg_metadata = metadata.row_groups(); - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); // Verify original plan assert_eq!(prepared_plan.row_group_indexes, vec![0, 1, 2]); @@ -205,9 +204,9 @@ mod tests { ); let rg_metadata = metadata.row_groups(); - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -255,9 +254,9 @@ mod tests { ); let rg_metadata = metadata.row_groups(); - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -298,9 +297,9 @@ mod tests { } let rg_metadata = metadata.row_groups(); - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let reversed_plan = prepared_plan .reverse(&metadata) @@ -338,9 +337,9 @@ mod tests { ); let rg_metadata = metadata.row_groups(); - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -379,9 +378,9 @@ mod tests { ); let rg_metadata = metadata.row_groups(); - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -435,9 +434,9 @@ mod tests { access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)])); let rg_metadata = metadata.row_groups(); - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -502,9 +501,9 @@ mod tests { let rg_metadata = metadata.row_groups(); // Step 1: Create PreparedAccessPlan - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); // Verify original plan assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]); @@ -594,9 +593,9 @@ mod tests { access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)])); let rg_metadata = metadata.row_groups(); - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -647,9 +646,9 @@ mod tests { ); let rg_metadata = metadata.row_groups(); - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); let original_selected: usize = prepared_plan .row_selection @@ -720,9 +719,9 @@ mod tests { let rg_metadata = metadata.row_groups(); // Step 1: Create PreparedAccessPlan - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); // Verify original plan in detail assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]); @@ -862,9 +861,9 @@ mod tests { access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)])); let rg_metadata = metadata.row_groups(); - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); // Verify original selection structure in detail let orig_selectors: Vec<_> = prepared_plan @@ -944,9 +943,9 @@ mod tests { ); let rg_metadata = metadata.row_groups(); - let prepared_plan = - PreparedAccessPlan::from_access_plan(access_plan, rg_metadata) - .expect("Failed to create PreparedAccessPlan"); + let prepared_plan = access_plan + .prepare(rg_metadata) + .expect("Failed to create PreparedAccessPlan"); // Original: [0, 2] assert_eq!(prepared_plan.row_group_indexes, vec![0, 2]);