Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ pub struct ParquetExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
predicate: Option<Expr>,
predicate: Option<Arc<Expr>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without these Arc these predicates get copied once for each parquet file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these predicates get copied once for each parquet file

Just question: I can not find the code clone for each file🤔, but this improvement is reasonable👍.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that

impl FileOpener for ParquetOpener {
    fn open(
        &self,
        _: Arc<dyn ObjectStore>,
        file_meta: FileMeta,
    ) -> Result<FileOpenFuture> {

which clones the predicates, is called once per file

/// Optional predicate for pruning row groups
pruning_predicate: Option<PruningPredicate>,
pruning_predicate: Option<Arc<PruningPredicate>>,
/// Optional hint for the size of the parquet metadata
metadata_size_hint: Option<usize>,
/// Optional user defined parquet file reader factory
Expand Down Expand Up @@ -106,7 +106,7 @@ impl ParquetExec {
predicate_expr,
base_config.file_schema.clone(),
) {
Ok(pruning_predicate) => Some(pruning_predicate),
Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
Err(e) => {
debug!("Could not create pruning predicate for: {}", e);
predicate_creation_errors.add(1);
Expand All @@ -123,6 +123,9 @@ impl ParquetExec {
}
});

// Save original predicate
let predicate = predicate.map(Arc::new);

let (projected_schema, projected_statistics) = base_config.project();

Self {
Expand All @@ -143,7 +146,7 @@ impl ParquetExec {
}

/// Optional reference to this parquet scan's pruning predicate
pub fn pruning_predicate(&self) -> Option<&PruningPredicate> {
pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> {
self.pruning_predicate.as_ref()
}

Expand Down Expand Up @@ -376,8 +379,8 @@ struct ParquetOpener {
partition_index: usize,
projection: Arc<[usize]>,
batch_size: usize,
predicate: Option<Expr>,
pruning_predicate: Option<PruningPredicate>,
predicate: Option<Arc<Expr>>,
pruning_predicate: Option<Arc<PruningPredicate>>,
table_schema: SchemaRef,
metadata_size_hint: Option<usize>,
metrics: ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -435,7 +438,7 @@ impl FileOpener for ParquetOpener {
// Filter pushdown: evaluate predicates during scan
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
predicate.clone(),
predicate.as_ref(),
builder.schema().as_ref(),
table_schema.as_ref(),
builder.metadata(),
Expand Down Expand Up @@ -463,7 +466,7 @@ impl FileOpener for ParquetOpener {
let row_groups = row_groups::prune_row_groups(
file_metadata.row_groups(),
file_range,
pruning_predicate.clone(),
pruning_predicate.as_ref().map(|p| p.as_ref()),
&file_metrics,
);

Expand All @@ -473,7 +476,7 @@ impl FileOpener for ParquetOpener {
if let Some(row_selection) = (enable_page_index && !row_groups.is_empty())
.then(|| {
page_filter::build_page_filter(
pruning_predicate.as_ref(),
pruning_predicate.as_ref().map(|p| p.as_ref()),
builder.schema().clone(),
&row_groups,
file_metadata.as_ref(),
Expand Down Expand Up @@ -1615,7 +1618,7 @@ mod tests {

// but does still has a pushdown down predicate
let predicate = rt.parquet_exec.predicate.as_ref();
assert_eq!(predicate, Some(&filter));
assert_eq!(predicate.unwrap().as_ref(), &filter);
}

/// returns the sum of all the metrics with the specified name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecurs
use std::collections::BTreeSet;

use datafusion_expr::Expr;
use datafusion_optimizer::utils::split_conjunction_owned;
use datafusion_optimizer::utils::split_conjunction;
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
Expand Down Expand Up @@ -309,7 +309,7 @@ fn columns_sorted(

/// Build a [`RowFilter`] from the given predicate `Expr`
pub fn build_row_filter(
expr: Expr,
expr: &Expr,
file_schema: &Schema,
table_schema: &Schema,
metadata: &ParquetMetaData,
Expand All @@ -319,13 +319,13 @@ pub fn build_row_filter(
let rows_filtered = &file_metrics.pushdown_rows_filtered;
let time = &file_metrics.pushdown_eval_time;

let predicates = split_conjunction_owned(expr);
let predicates = split_conjunction(expr);

let mut candidates: Vec<FilterCandidate> = predicates
.into_iter()
.flat_map(|expr| {
if let Ok(candidate) =
FilterCandidateBuilder::new(expr, file_schema, table_schema)
FilterCandidateBuilder::new(expr.clone(), file_schema, table_schema)
.build(metadata)
{
candidate
Expand Down
24 changes: 12 additions & 12 deletions datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use super::ParquetFileMetrics;
pub(crate) fn prune_row_groups(
groups: &[RowGroupMetaData],
range: Option<FileRange>,
predicate: Option<PruningPredicate>,
predicate: Option<&PruningPredicate>,
metrics: &ParquetFileMetrics,
) -> Vec<usize> {
let mut filtered = Vec::with_capacity(groups.len());
Expand All @@ -51,7 +51,7 @@ pub(crate) fn prune_row_groups(
}
}

if let Some(predicate) = &predicate {
if let Some(predicate) = predicate {
let pruning_stats = RowGroupPruningStatistics {
row_group_metadata: metadata,
parquet_schema: predicate.schema().as_ref(),
Expand Down Expand Up @@ -297,7 +297,7 @@ mod tests {

let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
vec![1]
);
}
Expand Down Expand Up @@ -331,7 +331,7 @@ mod tests {
// missing statistics for first row group mean that the result from the predicate expression
// is null / undefined so the first row group can't be filtered out
assert_eq!(
prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
vec![0, 1]
);
}
Expand Down Expand Up @@ -372,7 +372,7 @@ mod tests {
// the first row group is still filtered out because the predicate expression can be partially evaluated
// when conditions are joined using AND
assert_eq!(
prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
prune_row_groups(groups, None, Some(&pruning_predicate), &metrics),
vec![1]
);

Expand All @@ -384,7 +384,7 @@ mod tests {
// if conditions in predicate are joined with OR and an unsupported expression is used
// this bypasses the entire predicate expression and no row groups are filtered out
assert_eq!(
prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
prune_row_groups(groups, None, Some(&pruning_predicate), &metrics),
vec![0, 1]
);
}
Expand Down Expand Up @@ -426,7 +426,7 @@ mod tests {
let metrics = parquet_file_metrics();
// First row group was filtered out because it contains no null value on "c2".
assert_eq!(
prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics),
vec![1]
);
}
Expand All @@ -451,7 +451,7 @@ mod tests {
// bool = NULL always evaluates to NULL (and thus will not
// pass predicates. Ideally these should both be false
assert_eq!(
prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics),
vec![1]
);
}
Expand Down Expand Up @@ -500,7 +500,7 @@ mod tests {
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
vec![0]
);

Expand Down Expand Up @@ -556,7 +556,7 @@ mod tests {
prune_row_groups(
&[rgm1, rgm2, rgm3],
None,
Some(pruning_predicate),
Some(&pruning_predicate),
&metrics
),
vec![0, 1]
Expand Down Expand Up @@ -597,7 +597,7 @@ mod tests {
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
vec![1]
);

Expand Down Expand Up @@ -656,7 +656,7 @@ mod tests {
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
vec![1]
);

Expand Down