Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use crate::sort::reverse_row_selection;
use datafusion_common::{Result, assert_eq_or_internal_err};
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};

/// A selection of rows and row groups within a ParquetFile to decode.
///
Expand Down Expand Up @@ -337,6 +338,64 @@ impl ParquetAccessPlan {
pub fn into_inner(self) -> Vec<RowGroupAccess> {
self.row_groups
}

/// Prepare this plan and resolve to the final `PreparedAccessPlan`
pub(crate) fn prepare(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice addition

self,
row_group_meta_data: &[RowGroupMetaData],
) -> Result<PreparedAccessPlan> {
let row_group_indexes = self.row_group_indexes();
let row_selection = self.into_overall_row_selection(row_group_meta_data)?;

PreparedAccessPlan::new(row_group_indexes, row_selection)
}
}

/// Represents a prepared, fully resolved [`ParquetAccessPlan`]
///
/// The [`RowSelection`] represents the result of applying all pruning such as
/// user provided scans, Row Group statistics, DataPage statistics, and Bloom
/// Filters.
///
/// This plan is what is passed to the parquet reader
pub(crate) struct PreparedAccessPlan {
/// Row group indexes to read
pub(crate) row_group_indexes: Vec<usize>,
/// Optional row selection for filtering within row groups
pub(crate) row_selection: Option<RowSelection>,
}

impl PreparedAccessPlan {
/// Create a new prepared access plan
fn new(
row_group_indexes: Vec<usize>,
row_selection: Option<RowSelection>,
) -> Result<Self> {
Ok(Self {
row_group_indexes,
row_selection,
})
}

/// Reverse the access plan for reverse scanning
pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result<Self> {
// Get the row group indexes before reversing
let row_groups_to_scan = self.row_group_indexes.clone();

// Reverse the row group indexes
self.row_group_indexes = self.row_group_indexes.into_iter().rev().collect();

// If we have a row selection, reverse it to match the new row group order
if let Some(row_selection) = self.row_selection {
self.row_selection = Some(reverse_row_selection(
&row_selection,
file_metadata,
&row_groups_to_scan, // Pass the original (non-reversed) row group indexes
)?);
}

Ok(self)
}
}

#[cfg(test)]
Expand Down
53 changes: 2 additions & 51 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use datafusion_physical_plan::metrics::{
};
use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate};

use crate::sort::reverse_row_selection;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
#[cfg(feature = "parquet_encryption")]
Expand All @@ -67,7 +66,7 @@ use parquet::arrow::arrow_reader::{
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};

/// Implements [`FileOpener`] for a parquet file
pub(super) struct ParquetOpener {
Expand Down Expand Up @@ -125,53 +124,6 @@ pub(super) struct ParquetOpener {
pub reverse_row_groups: bool,
}

/// Represents a prepared access plan with optional row selection
pub(crate) struct PreparedAccessPlan {
/// Row group indexes to read
pub(crate) row_group_indexes: Vec<usize>,
/// Optional row selection for filtering within row groups
pub(crate) row_selection: Option<parquet::arrow::arrow_reader::RowSelection>,
}

impl PreparedAccessPlan {
/// Create a new prepared access plan from a ParquetAccessPlan
pub(crate) fn from_access_plan(
access_plan: ParquetAccessPlan,
rg_metadata: &[RowGroupMetaData],
) -> Result<Self> {
let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan.into_overall_row_selection(rg_metadata)?;

Ok(Self {
row_group_indexes,
row_selection,
})
}

/// Reverse the access plan for reverse scanning
pub(crate) fn reverse(
mut self,
file_metadata: &parquet::file::metadata::ParquetMetaData,
) -> Result<Self> {
// Get the row group indexes before reversing
let row_groups_to_scan = self.row_group_indexes.clone();

// Reverse the row group indexes
self.row_group_indexes = self.row_group_indexes.into_iter().rev().collect();

// If we have a row selection, reverse it to match the new row group order
if let Some(row_selection) = self.row_selection {
self.row_selection = Some(reverse_row_selection(
&row_selection,
file_metadata,
&row_groups_to_scan, // Pass the original (non-reversed) row group indexes
)?);
}

Ok(self)
}
}

impl FileOpener for ParquetOpener {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
// -----------------------------------
Expand Down Expand Up @@ -545,8 +497,7 @@ impl FileOpener for ParquetOpener {
}

// Prepare the access plan (extract row groups and row selection)
let mut prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)?;
let mut prepared_plan = access_plan.prepare(rg_metadata)?;

// ----------------------------------------------------------
// Step: potentially reverse the access plan for performance.
Expand Down
79 changes: 39 additions & 40 deletions datafusion/datasource-parquet/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ pub fn reverse_row_selection(
mod tests {
use crate::ParquetAccessPlan;
use crate::RowGroupAccess;
use crate::opener::PreparedAccessPlan;
use arrow::datatypes::{DataType, Field, Schema};
use bytes::Bytes;
use parquet::arrow::ArrowWriter;
Expand Down Expand Up @@ -169,9 +168,9 @@ mod tests {
let access_plan = ParquetAccessPlan::new_all(3);
let rg_metadata = metadata.row_groups();

let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

// Verify original plan
assert_eq!(prepared_plan.row_group_indexes, vec![0, 1, 2]);
Expand Down Expand Up @@ -205,9 +204,9 @@ mod tests {
);

let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

let original_selected: usize = prepared_plan
.row_selection
Expand Down Expand Up @@ -255,9 +254,9 @@ mod tests {
);

let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

let original_selected: usize = prepared_plan
.row_selection
Expand Down Expand Up @@ -298,9 +297,9 @@ mod tests {
}

let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

let reversed_plan = prepared_plan
.reverse(&metadata)
Expand Down Expand Up @@ -338,9 +337,9 @@ mod tests {
);

let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

let original_selected: usize = prepared_plan
.row_selection
Expand Down Expand Up @@ -379,9 +378,9 @@ mod tests {
);

let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

let original_selected: usize = prepared_plan
.row_selection
Expand Down Expand Up @@ -435,9 +434,9 @@ mod tests {
access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)]));

let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

let original_selected: usize = prepared_plan
.row_selection
Expand Down Expand Up @@ -502,9 +501,9 @@ mod tests {
let rg_metadata = metadata.row_groups();

// Step 1: Create PreparedAccessPlan
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

// Verify original plan
assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]);
Expand Down Expand Up @@ -594,9 +593,9 @@ mod tests {
access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)]));

let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

let original_selected: usize = prepared_plan
.row_selection
Expand Down Expand Up @@ -647,9 +646,9 @@ mod tests {
);

let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

let original_selected: usize = prepared_plan
.row_selection
Expand Down Expand Up @@ -720,9 +719,9 @@ mod tests {
let rg_metadata = metadata.row_groups();

// Step 1: Create PreparedAccessPlan
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

// Verify original plan in detail
assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]);
Expand Down Expand Up @@ -862,9 +861,9 @@ mod tests {
access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)]));

let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

// Verify original selection structure in detail
let orig_selectors: Vec<_> = prepared_plan
Expand Down Expand Up @@ -944,9 +943,9 @@ mod tests {
);

let rg_metadata = metadata.row_groups();
let prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
.expect("Failed to create PreparedAccessPlan");
let prepared_plan = access_plan
.prepare(rg_metadata)
.expect("Failed to create PreparedAccessPlan");

// Original: [0, 2]
assert_eq!(prepared_plan.row_group_indexes, vec![0, 2]);
Expand Down
Loading