From 3b3189365c941153840f3a2fa719023ea4113d72 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Wed, 26 Nov 2025 11:12:07 +0100 Subject: [PATCH 1/4] Compute Dynamic Filters only when a consumer supports them --- .../src/expressions/dynamic_filters.rs | 53 +++++++++++++++++++ .../physical-plan/src/joins/hash_join/exec.rs | 16 +++++- 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 43a242472bec..8369e4725b6e 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -294,6 +294,16 @@ impl DynamicFilterPhysicalExpr { .await; } + /// Check if this dynamic filter is being actively used by any consumers. + /// + /// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec + /// that created the filter). This is useful to avoid computing expensive filter + /// expressions when no consumer will actually use them. + pub fn is_used(self: &Arc) -> bool { + // Strong count > 1 means at least one consumer is holding a reference beyond the producer. + Arc::strong_count(self) > 1 + } + fn render( &self, f: &mut std::fmt::Formatter<'_>, @@ -607,4 +617,47 @@ mod test { // wait_complete should return immediately dynamic_filter.wait_complete().await; } + + #[test] + fn test_is_used() { + // Create a dynamic filter + let filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![], + lit(true) as Arc, + )); + + // Initially, only one reference exists (the filter itself) + assert!( + !filter.is_used(), + "Filter should not be used with only one reference" + ); + + // Simulate a consumer holding a reference (e.g., ParquetExec) + let consumer1 = Arc::clone(&filter); + assert!( + filter.is_used(), + "Filter should be used with a consumer reference" + ); + + // Multiple consumers + let consumer2 = Arc::clone(&filter); + assert!( + filter.is_used(), + "Filter should still be used with multiple consumers" + ); + + // Drop one consumer + drop(consumer1); + assert!( + filter.is_used(), + "Filter should still be used with remaining consumer" + ); + + // Drop all consumers + drop(consumer2); + assert!( + !filter.is_used(), + "Filter should not be used after all consumers dropped" + ); + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 03bf516eadd1..6e1980ea4f6e 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -910,7 +910,21 @@ impl ExecutionPlan for HashJoinExec { consider using CoalescePartitionsExec or the EnforceDistribution rule" ); - let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); + // Only enable dynamic filter pushdown if: + // - The session config enables dynamic filter pushdown + // - A dynamic filter exists + // - At least one consumer is holding a reference to it, this avoids expensive filter + // computation when disabled or when no consumer will use it. + let enable_dynamic_filter_pushdown = context + .session_config() + .options() + .optimizer + .enable_join_dynamic_filter_pushdown + && self + .dynamic_filter + .as_ref() + .map(|df| df.filter.is_used()) + .unwrap_or(false); let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { From 8c467c9c7dec8d80aac6a6d8107dbd2eaec856e4 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Thu, 27 Nov 2025 17:56:55 +0100 Subject: [PATCH 2/4] Change Approach to Exact/Inexact/Unsupported --- .../filter_pushdown/util.rs | 11 +-- datafusion/datasource-parquet/src/source.rs | 26 ++++-- datafusion/datasource/src/file.rs | 3 +- datafusion/datasource/src/source.rs | 3 +- .../src/expressions/dynamic_filters.rs | 53 ----------- .../physical-optimizer/src/filter_pushdown.rs | 13 +-- datafusion/physical-plan/src/filter.rs | 14 +-- .../physical-plan/src/filter_pushdown.rs | 90 ++++++++++++++----- .../physical-plan/src/joins/hash_join/exec.rs | 73 +++++++-------- 9 files changed, 147 insertions(+), 139 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 30fd86440566..c8a046517801 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -220,12 +220,12 @@ impl FileSource for TestSource { ..self.clone() }); Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::Yes; filters.len()], + vec![PushedDown::Exact; filters.len()], ) .with_updated_node(new_node)) } else { Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; filters.len()], + vec![PushedDown::Unsupported; filters.len()], )) } } @@ -542,8 +542,8 @@ impl ExecutionPlan for TestNode { let first_pushdown_result = self_pushdown_result[0].clone(); match &first_pushdown_result.discriminant { - PushedDown::No => { - // We have a filter to push down + PushedDown::Unsupported | PushedDown::Inexact => { + // We have a filter that wasn't exactly pushed down, so we need to apply it let new_child = FilterExec::try_new( Arc::clone(&first_pushdown_result.predicate), Arc::clone(&self.input), @@ -555,7 +555,8 @@ impl ExecutionPlan for TestNode { res.updated_node = Some(Arc::new(new_self) as Arc); Ok(res) } - PushedDown::Yes => { + PushedDown::Exact => { + // Filter was exactly pushed down, no need to apply it again let res = FilterPushdownPropagation::if_all(child_pushdown_result); Ok(res) } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index ad7474af80c2..ebf2d1ef04f9 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -707,6 +707,9 @@ impl FileSource for ParquetSource { .into_iter() .map(|filter| { if can_expr_be_pushed_down_with_schemas(&filter, table_schema) { + // When pushdown_filters is true, filters will be used for row-level filtering + // When pushdown_filters is false, filters will be used for stats pruning only + // We'll mark them as supported here and adjust later based on pushdown_filters PushedDownPredicate::supported(filter) } else { PushedDownPredicate::unsupported(filter) @@ -715,19 +718,19 @@ impl FileSource for ParquetSource { .collect(); if filters .iter() - .all(|f| matches!(f.discriminant, PushedDown::No)) + .all(|f| matches!(f.discriminant, PushedDown::Unsupported)) { // No filters can be pushed down, so we can just return the remaining filters // and avoid replacing the source in the physical plan. return Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; filters.len()], + vec![PushedDown::Unsupported; filters.len()], )); } let allowed_filters = filters .iter() .filter_map(|f| match f.discriminant { - PushedDown::Yes => Some(Arc::clone(&f.predicate)), - PushedDown::No => None, + PushedDown::Exact => Some(Arc::clone(&f.predicate)), + PushedDown::Inexact | PushedDown::Unsupported => None, }) .collect_vec(); let predicate = match source.predicate { @@ -740,13 +743,24 @@ impl FileSource for ParquetSource { source = source.with_pushdown_filters(pushdown_filters); let source = Arc::new(source); // If pushdown_filters is false we tell our parents that they still have to handle the filters, - // even if we updated the predicate to include the filters (they will only be used for stats pruning). + // even though we updated the predicate to include the filters (they will be used for stats pruning only). + // In this case, we return Inexact for filters that can be pushed down (used for stats pruning) + // and Unsupported for filters that cannot be pushed down at all. if !pushdown_filters { + let result_discriminants = filters + .iter() + .map(|f| match f.discriminant { + PushedDown::Exact => PushedDown::Inexact, // Downgrade to Inexact (stats pruning only) + PushedDown::Unsupported => PushedDown::Unsupported, // Keep as Unsupported + _ => unreachable!("Only Exact or Unsupported expected at this point"), + }) + .collect(); return Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; filters.len()], + result_discriminants, ) .with_updated_node(source)); } + // If pushdown_filters is true, we return the original discriminants (Exact for supported filters) Ok(FilterPushdownPropagation::with_parent_pushdown_result( filters.iter().map(|f| f.discriminant).collect(), ) diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 9ec34b5dda0c..cb9fc5627a47 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -121,8 +121,9 @@ pub trait FileSource: Send + Sync { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { + // Default implementation: don't support filter pushdown Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; filters.len()], + vec![PushedDown::Unsupported; filters.len()], )) } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index de79512a4101..8ff9b843177e 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -185,8 +185,9 @@ pub trait DataSource: Send + Sync + Debug { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { + // Default implementation: don't support filter pushdown Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; filters.len()], + vec![PushedDown::Unsupported; filters.len()], )) } } diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 8369e4725b6e..43a242472bec 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -294,16 +294,6 @@ impl DynamicFilterPhysicalExpr { .await; } - /// Check if this dynamic filter is being actively used by any consumers. - /// - /// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec - /// that created the filter). This is useful to avoid computing expensive filter - /// expressions when no consumer will actually use them. - pub fn is_used(self: &Arc) -> bool { - // Strong count > 1 means at least one consumer is holding a reference beyond the producer. - Arc::strong_count(self) > 1 - } - fn render( &self, f: &mut std::fmt::Formatter<'_>, @@ -617,47 +607,4 @@ mod test { // wait_complete should return immediately dynamic_filter.wait_complete().await; } - - #[test] - fn test_is_used() { - // Create a dynamic filter - let filter = Arc::new(DynamicFilterPhysicalExpr::new( - vec![], - lit(true) as Arc, - )); - - // Initially, only one reference exists (the filter itself) - assert!( - !filter.is_used(), - "Filter should not be used with only one reference" - ); - - // Simulate a consumer holding a reference (e.g., ParquetExec) - let consumer1 = Arc::clone(&filter); - assert!( - filter.is_used(), - "Filter should be used with a consumer reference" - ); - - // Multiple consumers - let consumer2 = Arc::clone(&filter); - assert!( - filter.is_used(), - "Filter should still be used with multiple consumers" - ); - - // Drop one consumer - drop(consumer1); - assert!( - filter.is_used(), - "Filter should still be used with remaining consumer" - ); - - // Drop all consumers - drop(consumer2); - assert!( - !filter.is_used(), - "Filter should not be used after all consumers dropped" - ); - } } diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index 8bed6c3aeba0..a1ca01c178d5 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -495,9 +495,10 @@ fn push_down_filters( let mut all_predicates = self_filtered.items().to_vec(); // Apply second filter pass: collect indices of parent filters that can be pushed down - let parent_filters_for_child = parent_filtered - .chain_filter_slice(&parent_filters, |filter| { - matches!(filter.discriminant, PushedDown::Yes) + // Push down filters that are either Exact or Inexact (both mean the child will use them) + let parent_filters_for_child = + parent_filtered.chain_filter_slice(&parent_filters, |filter| { + matches!(filter.discriminant, PushedDown::Exact | PushedDown::Inexact) }); // Add the filtered parent predicates to all_predicates @@ -535,7 +536,7 @@ fn push_down_filters( .collect_vec(); // Map the results from filtered self filters back to their original positions using FilteredVec let mapped_self_results = - self_filtered.map_results_to_original(all_filters, PushedDown::No); + self_filtered.map_results_to_original(all_filters, PushedDown::Unsupported); // Wrap each result with its corresponding expression let self_filter_results: Vec<_> = mapped_self_results @@ -548,7 +549,7 @@ fn push_down_filters( // Start by marking all parent filters as unsupported for this child for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() { - parent_filter_pushdown_support.push(PushedDown::No); + parent_filter_pushdown_support.push(PushedDown::Unsupported); assert_eq!( parent_filter_pushdown_support.len(), child_idx + 1, @@ -557,7 +558,7 @@ fn push_down_filters( } // Map results from pushed-down filters back to original parent filter indices let mapped_parent_results = parent_filters_for_child - .map_results_to_original(parent_filters, PushedDown::No); + .map_results_to_original(parent_filters, PushedDown::Unsupported); // Update parent_filter_pushdown_supports with the mapped results // mapped_parent_results already has the results at their original indices diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 58185c8cdf5b..8d17b70691f1 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -521,10 +521,11 @@ impl ExecutionPlan for FilterExec { if !matches!(phase, FilterPushdownPhase::Pre) { return Ok(FilterPushdownPropagation::if_all(child_pushdown_result)); } - // We absorb any parent filters that were not handled by our children + // We absorb any parent filters that were not handled by our children with exact filtering let unsupported_parent_filters = child_pushdown_result.parent_filters.iter().filter_map(|f| { - matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter)) + // Keep filters that weren't handled with exact filtering (Inexact or Unsupported) + (!matches!(f.all(), PushedDown::Exact)).then_some(Arc::clone(&f.filter)) }); let unsupported_self_filters = child_pushdown_result .self_filters @@ -532,8 +533,8 @@ impl ExecutionPlan for FilterExec { .expect("we have exactly one child") .iter() .filter_map(|f| match f.discriminant { - PushedDown::Yes => None, - PushedDown::No => Some(&f.predicate), + PushedDown::Exact => None, + PushedDown::Inexact | PushedDown::Unsupported => Some(&f.predicate), }) .cloned(); @@ -592,8 +593,11 @@ impl ExecutionPlan for FilterExec { Some(Arc::new(new) as _) }; + // FilterExec always applies filters exactly. + // Even if the child returned Inexact or Unsupported, we absorb those filters + // and apply them, so from the parent's perspective, the filter is handled exactly. Ok(FilterPushdownPropagation { - filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()], + filters: vec![PushedDown::Exact; child_pushdown_result.parent_filters.len()], updated_node, }) } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 1274e954eaeb..2f9d78d8da29 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -101,10 +101,26 @@ impl PushedDownPredicate { self.predicate } - /// Create a new [`PushedDownPredicate`] with supported pushdown. + /// Create a new [`PushedDownPredicate`] with exact pushdown. + /// The child will apply the filter exactly as FilterExec would. + pub fn exact(predicate: Arc) -> Self { + Self { + discriminant: PushedDown::Exact, + predicate, + } + } + + /// Create a new [`PushedDownPredicate`] with supported pushdown (exact filtering). + /// This is an alias for `exact()` for backward compatibility. pub fn supported(predicate: Arc) -> Self { + Self::exact(predicate) + } + + /// Create a new [`PushedDownPredicate`] with inexact pushdown. + /// The child will use the filter for stats/partial pruning but not exact filtering. + pub fn inexact(predicate: Arc) -> Self { Self { - discriminant: PushedDown::Yes, + discriminant: PushedDown::Inexact, predicate, } } @@ -112,35 +128,57 @@ impl PushedDownPredicate { /// Create a new [`PushedDownPredicate`] with unsupported pushdown. pub fn unsupported(predicate: Arc) -> Self { Self { - discriminant: PushedDown::No, + discriminant: PushedDown::Unsupported, predicate, } } } /// Discriminant for the result of pushing down a filter into a child node. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PushedDown { - /// The predicate was successfully pushed down into the child node. - Yes, - /// The predicate could not be pushed down into the child node. - No, + /// The filter is applied exactly as FilterExec would apply it. + /// The parent can safely drop the filter as the child guarantees exact filtering. + /// This is typically used for row-level filtering in data sources or selective operators. + Exact, + /// The filter will be used, but the child cannot guarantee exact filtering. + /// This usually means the filter may be used for statistics-based pruning (e.g., row group + /// filtering in Parquet) but will not be used for row-level filtering. + Inexact, + /// The filter is not going to be used at all by the child. + Unsupported, } impl PushedDown { - /// Logical AND operation: returns `Yes` only if both operands are `Yes`. + /// Logical AND operation for combining filter pushdown results. + /// Returns the most restrictive (least capable) result: + /// - Exact AND Exact = Exact + /// - Exact AND Inexact = Inexact + /// - Exact AND Unsupported = Unsupported + /// - Inexact AND Inexact = Inexact + /// - Inexact AND Unsupported = Unsupported + /// - Unsupported AND Unsupported = Unsupported pub fn and(self, other: PushedDown) -> PushedDown { match (self, other) { - (PushedDown::Yes, PushedDown::Yes) => PushedDown::Yes, - _ => PushedDown::No, + (PushedDown::Exact, PushedDown::Exact) => PushedDown::Exact, + (PushedDown::Exact, PushedDown::Inexact) + | (PushedDown::Inexact, PushedDown::Exact) => PushedDown::Inexact, + (PushedDown::Inexact, PushedDown::Inexact) => PushedDown::Inexact, + _ => PushedDown::Unsupported, } } - /// Logical OR operation: returns `Yes` if either operand is `Yes`. + /// Logical OR operation for combining filter pushdown results. + /// Returns the least restrictive (most capable) result: + /// - Exact OR anything = Exact + /// - Inexact OR Inexact = Inexact + /// - Inexact OR Unsupported = Inexact + /// - Unsupported OR Unsupported = Unsupported pub fn or(self, other: PushedDown) -> PushedDown { match (self, other) { - (PushedDown::Yes, _) | (_, PushedDown::Yes) => PushedDown::Yes, - (PushedDown::No, PushedDown::No) => PushedDown::No, + (PushedDown::Exact, _) | (_, PushedDown::Exact) => PushedDown::Exact, + (PushedDown::Inexact, _) | (_, PushedDown::Inexact) => PushedDown::Inexact, + (PushedDown::Unsupported, PushedDown::Unsupported) => PushedDown::Unsupported, } } @@ -162,30 +200,36 @@ pub struct ChildFilterPushdownResult { impl ChildFilterPushdownResult { /// Combine all child results using OR logic. - /// Returns `Yes` if **any** child supports the filter. - /// Returns `No` if **all** children reject the filter or if there are no children. + /// Returns the best (most capable) result among all children. + /// - If any child supports Exact, returns Exact + /// - If any child supports Inexact (but none Exact), returns Inexact + /// - If all children are Unsupported, returns Unsupported + /// - If there are no children, returns Unsupported pub fn any(&self) -> PushedDown { if self.child_results.is_empty() { // If there are no children, filters cannot be supported - PushedDown::No + PushedDown::Unsupported } else { self.child_results .iter() - .fold(PushedDown::No, |acc, result| acc.or(*result)) + .fold(PushedDown::Unsupported, |acc, result| acc.or(*result)) } } /// Combine all child results using AND logic. - /// Returns `Yes` if **all** children support the filter. - /// Returns `No` if **any** child rejects the filter or if there are no children. + /// Returns the worst (least capable) result among all children. + /// - If all children support Exact, returns Exact + /// - If all children support Exact or Inexact (but at least one Inexact), returns Inexact + /// - If any child is Unsupported, returns Unsupported + /// - If there are no children, returns Unsupported pub fn all(&self) -> PushedDown { if self.child_results.is_empty() { // If there are no children, filters cannot be supported - PushedDown::No + PushedDown::Unsupported } else { self.child_results .iter() - .fold(PushedDown::Yes, |acc, result| acc.and(*result)) + .fold(PushedDown::Exact, |acc, result| acc.and(*result)) } } } @@ -263,7 +307,7 @@ impl FilterPushdownPropagation { let filters = child_pushdown_result .parent_filters .into_iter() - .map(|_| PushedDown::No) + .map(|_| PushedDown::Unsupported) .collect(); Self { filters, diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 6e1980ea4f6e..937041dac0da 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -24,7 +24,7 @@ use std::{any::Any, vec}; use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, + FilterPushdownPropagation, PushedDown, }; use crate::joins::hash_join::shared_bounds::{ ColumnBounds, PartitionBounds, SharedBuildAccumulator, @@ -915,16 +915,7 @@ impl ExecutionPlan for HashJoinExec { // - A dynamic filter exists // - At least one consumer is holding a reference to it, this avoids expensive filter // computation when disabled or when no consumer will use it. - let enable_dynamic_filter_pushdown = context - .session_config() - .options() - .optimizer - .enable_join_dynamic_filter_pushdown - && self - .dynamic_filter - .as_ref() - .map(|df| df.filter.is_used()) - .unwrap_or(false); + let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { @@ -1173,34 +1164,38 @@ impl ExecutionPlan for HashJoinExec { let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child // We expect 0 or 1 self filters if let Some(filter) = right_child_self_filters.first() { - // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said - // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating - let predicate = Arc::clone(&filter.predicate); - if let Ok(dynamic_filter) = - Arc::downcast::(predicate) - { - // We successfully pushed down our self filter - we need to make a new node with the dynamic filter - let new_node = Arc::new(HashJoinExec { - left: Arc::clone(&self.left), - right: Arc::clone(&self.right), - on: self.on.clone(), - filter: self.filter.clone(), - join_type: self.join_type, - join_schema: Arc::clone(&self.join_schema), - left_fut: Arc::clone(&self.left_fut), - random_state: self.random_state.clone(), - mode: self.mode, - metrics: ExecutionPlanMetricsSet::new(), - projection: self.projection.clone(), - column_indices: self.column_indices.clone(), - null_equality: self.null_equality, - cache: self.cache.clone(), - dynamic_filter: Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - }), - }); - result = result.with_updated_node(new_node as Arc); + // Only create the dynamic filter if the probe side will actually use it (Exact or Inexact). + // If it's Unsupported, don't compute the filter since it won't be used. + let will_be_used = !matches!(filter.discriminant, PushedDown::Unsupported); + + if will_be_used { + let predicate = Arc::clone(&filter.predicate); + if let Ok(dynamic_filter) = + Arc::downcast::(predicate) + { + // We successfully pushed down our self filter - we need to make a new node with the dynamic filter + let new_node = Arc::new(HashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + left_fut: Arc::clone(&self.left_fut), + random_state: self.random_state.clone(), + mode: self.mode, + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: self.cache.clone(), + dynamic_filter: Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + build_accumulator: OnceLock::new(), + }), + }); + result = result.with_updated_node(new_node as Arc); + } } } Ok(result) From 5195faa4273c638379d1bad21e844531916b3bbb Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Thu, 27 Nov 2025 18:42:11 +0100 Subject: [PATCH 3/4] Add test --- .../physical_optimizer/filter_pushdown/mod.rs | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 0903194b15a9..bb8d821720ab 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -35,6 +35,7 @@ use datafusion::{ }; use datafusion_catalog::memory::DataSourceExec; use datafusion_common::config::ConfigOptions; +use datafusion_common::JoinType; use datafusion_datasource::{ file_groups::FileGroup, file_scan_config::FileScanConfigBuilder, PartitionedFile, }; @@ -55,6 +56,7 @@ use datafusion_physical_plan::{ coalesce_partitions::CoalescePartitionsExec, collect, filter::FilterExec, + joins::{HashJoinExec, PartitionMode}, repartition::RepartitionExec, sorts::sort::SortExec, ExecutionPlan, @@ -2609,3 +2611,53 @@ async fn test_hashjoin_dynamic_filter_with_nulls() { ]; assert_batches_eq!(&expected, &batches); } + +#[test] +fn test_hash_join_dynamic_filter_with_unsupported_scan() { + // This test verifies that HashJoin doesn't create dynamic filters when the probe side + // returns Unsupported (e.g., a scan that can't use filters at all) + let schema = schema(); + + // Build side + let build_scan = TestScanBuilder::new(schema.clone()) + .with_support(true) + .build(); + + // Probe side: scan that doesn't support pushdown + let probe_scan = TestScanBuilder::new(schema.clone()) + .with_support(false) // This will create a test node that doesn't support filter pushdown + .build(); + + let join = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + vec![( + Arc::new(Column::new_with_schema("a", &schema).unwrap()), + Arc::new(Column::new_with_schema("a", &schema).unwrap()), + )], + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + let mut config = ConfigOptions::default(); + config.optimizer.enable_dynamic_filter_pushdown = true; + + let rule = FilterPushdown::new_post_optimization(); + let plan = rule.optimize(join, &config).unwrap(); + + // Optimized plan should not have a DynamicFilter placeholder in the probe node. + insta::assert_snapshot!( + format_plan_for_test(&plan), + @r" + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + " + ); +} From 728fde92251c23e1124f9bd29a9e9cdc395a90a6 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Thu, 27 Nov 2025 19:15:57 +0100 Subject: [PATCH 4/4] Fix clippy warning --- .../core/tests/physical_optimizer/filter_pushdown/mod.rs | 7 +++---- datafusion/execution/src/config.rs | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index a91284c64cd8..2de72a33d378 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -2661,11 +2661,10 @@ fn test_hash_join_dynamic_filter_with_unsupported_scan() { .unwrap(), ); - let mut config = ConfigOptions::default(); - config.optimizer.enable_dynamic_filter_pushdown = true; - + let config = SessionConfig::new(); let rule = FilterPushdown::new_post_optimization(); - let plan = rule.optimize(join, &config).unwrap(); + let ctx = OptimizerContext::new(config.clone()); + let plan = rule.optimize_plan(join, &ctx).unwrap(); // Optimized plan should not have a DynamicFilter placeholder in the probe node. insta::assert_snapshot!( diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 443229a3cb77..3fa602f12554 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -114,10 +114,10 @@ impl Default for SessionConfig { } /// A type map for storing extensions. -/// +/// /// Extensions are indexed by their type `T`. If multiple values of the same type are provided, only the last one /// will be kept. -/// +/// /// Extensions are opaque objects that are unknown to DataFusion itself but can be downcast by optimizer rules, /// execution plans, or other components that have access to the session config. /// They provide a flexible way to attach extra data or behavior to the session config.