diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 7b5557e617d1..2de72a33d378 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -35,6 +35,7 @@ use datafusion::{ }; use datafusion_catalog::memory::DataSourceExec; use datafusion_common::config::ConfigOptions; +use datafusion_common::JoinType; use datafusion_datasource::{ file_groups::FileGroup, file_scan_config::FileScanConfigBuilder, PartitionedFile, }; @@ -55,6 +56,7 @@ use datafusion_physical_plan::{ coalesce_partitions::CoalescePartitionsExec, collect, filter::FilterExec, + joins::{HashJoinExec, PartitionMode}, repartition::RepartitionExec, sorts::sort::SortExec, ExecutionPlan, @@ -2625,3 +2627,52 @@ async fn test_hashjoin_dynamic_filter_with_nulls() { ]; assert_batches_eq!(&expected, &batches); } + +#[test] +fn test_hash_join_dynamic_filter_with_unsupported_scan() { + // This test verifies that HashJoin doesn't create dynamic filters when the probe side + // returns Unsupported (e.g., a scan that can't use filters at all) + let schema = schema(); + + // Build side + let build_scan = TestScanBuilder::new(schema.clone()) + .with_support(true) + .build(); + + // Probe side: scan that doesn't support pushdown + let probe_scan = TestScanBuilder::new(schema.clone()) + .with_support(false) // This will create a test node that doesn't support filter pushdown + .build(); + + let join = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + vec![( + Arc::new(Column::new_with_schema("a", &schema).unwrap()), + Arc::new(Column::new_with_schema("a", &schema).unwrap()), + )], + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + let config = SessionConfig::new(); + let rule = FilterPushdown::new_post_optimization(); + let ctx = OptimizerContext::new(config.clone()); + let plan = rule.optimize_plan(join, &ctx).unwrap(); + + // Optimized plan should not have a DynamicFilter placeholder in the probe node. + insta::assert_snapshot!( + format_plan_for_test(&plan), + @r" + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + " + ); +} diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index fc2be94b01a6..81d8617944ff 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -214,12 +214,12 @@ impl FileSource for TestSource { ..self.clone() }); Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::Yes; filters.len()], + vec![PushedDown::Exact; filters.len()], ) .with_updated_node(new_node)) } else { Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; filters.len()], + vec![PushedDown::Unsupported; filters.len()], )) } } @@ -538,8 +538,8 @@ impl ExecutionPlan for TestNode { let first_pushdown_result = self_pushdown_result[0].clone(); match &first_pushdown_result.discriminant { - PushedDown::No => { - // We have a filter to push down + PushedDown::Unsupported | PushedDown::Inexact => { + // We have a filter that wasn't exactly pushed down, so we need to apply it let new_child = FilterExec::try_new( Arc::clone(&first_pushdown_result.predicate), Arc::clone(&self.input), @@ -551,7 +551,8 @@ impl ExecutionPlan for TestNode { res.updated_node = Some(Arc::new(new_self) as Arc); Ok(res) } - PushedDown::Yes => { + PushedDown::Exact => { + // Filter was exactly pushed down, no need to apply it again let res = FilterPushdownPropagation::if_all(child_pushdown_result); Ok(res) } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 5ed74ecfd98f..2641364bb56b 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -729,6 +729,9 @@ impl FileSource for ParquetSource { .into_iter() .map(|filter| { if can_expr_be_pushed_down_with_schemas(&filter, table_schema) { + // When pushdown_filters is true, filters will be used for row-level filtering + // When pushdown_filters is false, filters will be used for stats pruning only + // We'll mark them as supported here and adjust later based on pushdown_filters PushedDownPredicate::supported(filter) } else { PushedDownPredicate::unsupported(filter) @@ -737,19 +740,19 @@ impl FileSource for ParquetSource { .collect(); if filters .iter() - .all(|f| matches!(f.discriminant, PushedDown::No)) + .all(|f| matches!(f.discriminant, PushedDown::Unsupported)) { // No filters can be pushed down, so we can just return the remaining filters // and avoid replacing the source in the physical plan. return Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; filters.len()], + vec![PushedDown::Unsupported; filters.len()], )); } let allowed_filters = filters .iter() .filter_map(|f| match f.discriminant { - PushedDown::Yes => Some(Arc::clone(&f.predicate)), - PushedDown::No => None, + PushedDown::Exact => Some(Arc::clone(&f.predicate)), + PushedDown::Inexact | PushedDown::Unsupported => None, }) .collect_vec(); let predicate = match source.predicate { @@ -762,13 +765,24 @@ impl FileSource for ParquetSource { source = source.with_pushdown_filters(pushdown_filters); let source = Arc::new(source); // If pushdown_filters is false we tell our parents that they still have to handle the filters, - // even if we updated the predicate to include the filters (they will only be used for stats pruning). + // even though we updated the predicate to include the filters (they will be used for stats pruning only). + // In this case, we return Inexact for filters that can be pushed down (used for stats pruning) + // and Unsupported for filters that cannot be pushed down at all. if !pushdown_filters { + let result_discriminants = filters + .iter() + .map(|f| match f.discriminant { + PushedDown::Exact => PushedDown::Inexact, // Downgrade to Inexact (stats pruning only) + PushedDown::Unsupported => PushedDown::Unsupported, // Keep as Unsupported + _ => unreachable!("Only Exact or Unsupported expected at this point"), + }) + .collect(); return Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; filters.len()], + result_discriminants, ) .with_updated_node(source)); } + // If pushdown_filters is true, we return the original discriminants (Exact for supported filters) Ok(FilterPushdownPropagation::with_parent_pushdown_result( filters.iter().map(|f| f.discriminant).collect(), ) diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 3668e0e4a77e..43d5b38b1b49 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -124,8 +124,9 @@ pub trait FileSource: Send + Sync { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { + // Default implementation: don't support filter pushdown Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; filters.len()], + vec![PushedDown::Unsupported; filters.len()], )) } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 781083c0f14d..5c8378e55db6 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -186,8 +186,9 @@ pub trait DataSource: Send + Sync + Debug { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { + // Default implementation: don't support filter pushdown Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; filters.len()], + vec![PushedDown::Unsupported; filters.len()], )) } } diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 443229a3cb77..3fa602f12554 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -114,10 +114,10 @@ impl Default for SessionConfig { } /// A type map for storing extensions. -/// +/// /// Extensions are indexed by their type `T`. If multiple values of the same type are provided, only the last one /// will be kept. -/// +/// /// Extensions are opaque objects that are unknown to DataFusion itself but can be downcast by optimizer rules, /// execution plans, or other components that have access to the session config. /// They provide a flexible way to attach extra data or behavior to the session config. diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index 8206d4320872..f4381663b316 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -509,9 +509,10 @@ fn push_down_filters( let mut all_predicates = self_filtered.items().to_vec(); // Apply second filter pass: collect indices of parent filters that can be pushed down - let parent_filters_for_child = parent_filtered - .chain_filter_slice(&parent_filters, |filter| { - matches!(filter.discriminant, PushedDown::Yes) + // Push down filters that are either Exact or Inexact (both mean the child will use them) + let parent_filters_for_child = + parent_filtered.chain_filter_slice(&parent_filters, |filter| { + matches!(filter.discriminant, PushedDown::Exact | PushedDown::Inexact) }); // Add the filtered parent predicates to all_predicates @@ -549,7 +550,7 @@ fn push_down_filters( .collect_vec(); // Map the results from filtered self filters back to their original positions using FilteredVec let mapped_self_results = - self_filtered.map_results_to_original(all_filters, PushedDown::No); + self_filtered.map_results_to_original(all_filters, PushedDown::Unsupported); // Wrap each result with its corresponding expression let self_filter_results: Vec<_> = mapped_self_results @@ -562,7 +563,7 @@ fn push_down_filters( // Start by marking all parent filters as unsupported for this child for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() { - parent_filter_pushdown_support.push(PushedDown::No); + parent_filter_pushdown_support.push(PushedDown::Unsupported); assert_eq!( parent_filter_pushdown_support.len(), child_idx + 1, @@ -571,7 +572,7 @@ fn push_down_filters( } // Map results from pushed-down filters back to original parent filter indices let mapped_parent_results = parent_filters_for_child - .map_results_to_original(parent_filters, PushedDown::No); + .map_results_to_original(parent_filters, PushedDown::Unsupported); // Update parent_filter_pushdown_supports with the mapped results // mapped_parent_results already has the results at their original indices diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 58185c8cdf5b..8d17b70691f1 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -521,10 +521,11 @@ impl ExecutionPlan for FilterExec { if !matches!(phase, FilterPushdownPhase::Pre) { return Ok(FilterPushdownPropagation::if_all(child_pushdown_result)); } - // We absorb any parent filters that were not handled by our children + // We absorb any parent filters that were not handled by our children with exact filtering let unsupported_parent_filters = child_pushdown_result.parent_filters.iter().filter_map(|f| { - matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter)) + // Keep filters that weren't handled with exact filtering (Inexact or Unsupported) + (!matches!(f.all(), PushedDown::Exact)).then_some(Arc::clone(&f.filter)) }); let unsupported_self_filters = child_pushdown_result .self_filters @@ -532,8 +533,8 @@ impl ExecutionPlan for FilterExec { .expect("we have exactly one child") .iter() .filter_map(|f| match f.discriminant { - PushedDown::Yes => None, - PushedDown::No => Some(&f.predicate), + PushedDown::Exact => None, + PushedDown::Inexact | PushedDown::Unsupported => Some(&f.predicate), }) .cloned(); @@ -592,8 +593,11 @@ impl ExecutionPlan for FilterExec { Some(Arc::new(new) as _) }; + // FilterExec always applies filters exactly. + // Even if the child returned Inexact or Unsupported, we absorb those filters + // and apply them, so from the parent's perspective, the filter is handled exactly. Ok(FilterPushdownPropagation { - filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()], + filters: vec![PushedDown::Exact; child_pushdown_result.parent_filters.len()], updated_node, }) } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 1274e954eaeb..2f9d78d8da29 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -101,10 +101,26 @@ impl PushedDownPredicate { self.predicate } - /// Create a new [`PushedDownPredicate`] with supported pushdown. + /// Create a new [`PushedDownPredicate`] with exact pushdown. + /// The child will apply the filter exactly as FilterExec would. + pub fn exact(predicate: Arc) -> Self { + Self { + discriminant: PushedDown::Exact, + predicate, + } + } + + /// Create a new [`PushedDownPredicate`] with supported pushdown (exact filtering). + /// This is an alias for `exact()` for backward compatibility. pub fn supported(predicate: Arc) -> Self { + Self::exact(predicate) + } + + /// Create a new [`PushedDownPredicate`] with inexact pushdown. + /// The child will use the filter for stats/partial pruning but not exact filtering. + pub fn inexact(predicate: Arc) -> Self { Self { - discriminant: PushedDown::Yes, + discriminant: PushedDown::Inexact, predicate, } } @@ -112,35 +128,57 @@ impl PushedDownPredicate { /// Create a new [`PushedDownPredicate`] with unsupported pushdown. pub fn unsupported(predicate: Arc) -> Self { Self { - discriminant: PushedDown::No, + discriminant: PushedDown::Unsupported, predicate, } } } /// Discriminant for the result of pushing down a filter into a child node. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PushedDown { - /// The predicate was successfully pushed down into the child node. - Yes, - /// The predicate could not be pushed down into the child node. - No, + /// The filter is applied exactly as FilterExec would apply it. + /// The parent can safely drop the filter as the child guarantees exact filtering. + /// This is typically used for row-level filtering in data sources or selective operators. + Exact, + /// The filter will be used, but the child cannot guarantee exact filtering. + /// This usually means the filter may be used for statistics-based pruning (e.g., row group + /// filtering in Parquet) but will not be used for row-level filtering. + Inexact, + /// The filter is not going to be used at all by the child. + Unsupported, } impl PushedDown { - /// Logical AND operation: returns `Yes` only if both operands are `Yes`. + /// Logical AND operation for combining filter pushdown results. + /// Returns the most restrictive (least capable) result: + /// - Exact AND Exact = Exact + /// - Exact AND Inexact = Inexact + /// - Exact AND Unsupported = Unsupported + /// - Inexact AND Inexact = Inexact + /// - Inexact AND Unsupported = Unsupported + /// - Unsupported AND Unsupported = Unsupported pub fn and(self, other: PushedDown) -> PushedDown { match (self, other) { - (PushedDown::Yes, PushedDown::Yes) => PushedDown::Yes, - _ => PushedDown::No, + (PushedDown::Exact, PushedDown::Exact) => PushedDown::Exact, + (PushedDown::Exact, PushedDown::Inexact) + | (PushedDown::Inexact, PushedDown::Exact) => PushedDown::Inexact, + (PushedDown::Inexact, PushedDown::Inexact) => PushedDown::Inexact, + _ => PushedDown::Unsupported, } } - /// Logical OR operation: returns `Yes` if either operand is `Yes`. + /// Logical OR operation for combining filter pushdown results. + /// Returns the least restrictive (most capable) result: + /// - Exact OR anything = Exact + /// - Inexact OR Inexact = Inexact + /// - Inexact OR Unsupported = Inexact + /// - Unsupported OR Unsupported = Unsupported pub fn or(self, other: PushedDown) -> PushedDown { match (self, other) { - (PushedDown::Yes, _) | (_, PushedDown::Yes) => PushedDown::Yes, - (PushedDown::No, PushedDown::No) => PushedDown::No, + (PushedDown::Exact, _) | (_, PushedDown::Exact) => PushedDown::Exact, + (PushedDown::Inexact, _) | (_, PushedDown::Inexact) => PushedDown::Inexact, + (PushedDown::Unsupported, PushedDown::Unsupported) => PushedDown::Unsupported, } } @@ -162,30 +200,36 @@ pub struct ChildFilterPushdownResult { impl ChildFilterPushdownResult { /// Combine all child results using OR logic. - /// Returns `Yes` if **any** child supports the filter. - /// Returns `No` if **all** children reject the filter or if there are no children. + /// Returns the best (most capable) result among all children. + /// - If any child supports Exact, returns Exact + /// - If any child supports Inexact (but none Exact), returns Inexact + /// - If all children are Unsupported, returns Unsupported + /// - If there are no children, returns Unsupported pub fn any(&self) -> PushedDown { if self.child_results.is_empty() { // If there are no children, filters cannot be supported - PushedDown::No + PushedDown::Unsupported } else { self.child_results .iter() - .fold(PushedDown::No, |acc, result| acc.or(*result)) + .fold(PushedDown::Unsupported, |acc, result| acc.or(*result)) } } /// Combine all child results using AND logic. - /// Returns `Yes` if **all** children support the filter. - /// Returns `No` if **any** child rejects the filter or if there are no children. + /// Returns the worst (least capable) result among all children. + /// - If all children support Exact, returns Exact + /// - If all children support Exact or Inexact (but at least one Inexact), returns Inexact + /// - If any child is Unsupported, returns Unsupported + /// - If there are no children, returns Unsupported pub fn all(&self) -> PushedDown { if self.child_results.is_empty() { // If there are no children, filters cannot be supported - PushedDown::No + PushedDown::Unsupported } else { self.child_results .iter() - .fold(PushedDown::Yes, |acc, result| acc.and(*result)) + .fold(PushedDown::Exact, |acc, result| acc.and(*result)) } } } @@ -263,7 +307,7 @@ impl FilterPushdownPropagation { let filters = child_pushdown_result .parent_filters .into_iter() - .map(|_| PushedDown::No) + .map(|_| PushedDown::Unsupported) .collect(); Self { filters, diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 03bf516eadd1..937041dac0da 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -24,7 +24,7 @@ use std::{any::Any, vec}; use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, + FilterPushdownPropagation, PushedDown, }; use crate::joins::hash_join::shared_bounds::{ ColumnBounds, PartitionBounds, SharedBuildAccumulator, @@ -910,6 +910,11 @@ impl ExecutionPlan for HashJoinExec { consider using CoalescePartitionsExec or the EnforceDistribution rule" ); + // Only enable dynamic filter pushdown if: + // - The session config enables dynamic filter pushdown + // - A dynamic filter exists + // - At least one consumer is holding a reference to it, this avoids expensive filter + // computation when disabled or when no consumer will use it. let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); @@ -1159,34 +1164,38 @@ impl ExecutionPlan for HashJoinExec { let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child // We expect 0 or 1 self filters if let Some(filter) = right_child_self_filters.first() { - // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said - // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating - let predicate = Arc::clone(&filter.predicate); - if let Ok(dynamic_filter) = - Arc::downcast::(predicate) - { - // We successfully pushed down our self filter - we need to make a new node with the dynamic filter - let new_node = Arc::new(HashJoinExec { - left: Arc::clone(&self.left), - right: Arc::clone(&self.right), - on: self.on.clone(), - filter: self.filter.clone(), - join_type: self.join_type, - join_schema: Arc::clone(&self.join_schema), - left_fut: Arc::clone(&self.left_fut), - random_state: self.random_state.clone(), - mode: self.mode, - metrics: ExecutionPlanMetricsSet::new(), - projection: self.projection.clone(), - column_indices: self.column_indices.clone(), - null_equality: self.null_equality, - cache: self.cache.clone(), - dynamic_filter: Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - }), - }); - result = result.with_updated_node(new_node as Arc); + // Only create the dynamic filter if the probe side will actually use it (Exact or Inexact). + // If it's Unsupported, don't compute the filter since it won't be used. + let will_be_used = !matches!(filter.discriminant, PushedDown::Unsupported); + + if will_be_used { + let predicate = Arc::clone(&filter.predicate); + if let Ok(dynamic_filter) = + Arc::downcast::(predicate) + { + // We successfully pushed down our self filter - we need to make a new node with the dynamic filter + let new_node = Arc::new(HashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + left_fut: Arc::clone(&self.left_fut), + random_state: self.random_state.clone(), + mode: self.mode, + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: self.cache.clone(), + dynamic_filter: Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + build_accumulator: OnceLock::new(), + }), + }); + result = result.with_updated_node(new_node as Arc); + } } } Ok(result)