From 120c3bed4b25423113ae69095fc6be15bf22f21b Mon Sep 17 00:00:00 2001 From: xuanyili Date: Mon, 18 May 2026 06:40:43 +0000 Subject: [PATCH] fix(physical-plan): make HashJoinExec dynamic filter pushdown idempotent `FilterPushdown::new_post_optimization()` was ANDing a fresh `DynamicFilterPhysicalExpr` onto the probe-side scan's predicate on every invocation. After N passes the probe-side data source carried `DynamicFilter AND DynamicFilter AND ...` (N copies of the same empty expression). Root cause: `HashJoinExec::gather_filters_for_pushdown` always created a new dynamic filter in the Post phase, regardless of whether the join already had one from a prior pass. The previous-pass filter is retained on the `HashJoinExec` itself (the `dynamic_filter` field is preserved through `with_new_children` via the builder) and the same `Arc` is wired into the probe-side scan's predicate, so a new one is redundant and stacks duplicates. Fix: skip dynamic filter creation when `self.dynamic_filter.is_some()`. The existing shared `Arc` keeps the probe-side scan correctly wired to the build-side accumulator that will populate it. Motivation: adaptive execution in datafusion-ballista AQE (apache/datafusion-ballista#1359) re-runs the entire `PhysicalOptimizer` chain after every completed stage. Unlike `OutputRequirements` (whose duplicate wrappers are masked by `new_remove_mode` later in the chain), this duplication survives to the executed plan and degrades scan performance with redundant filter evaluation. Adds `post_phase_is_idempotent_on_hash_join` to `datafusion/core/tests/physical_optimizer/filter_pushdown.rs`: builds a hash join over two parquet scans, invokes the rule twice, asserts the plan strings match. Fails before this fix (two `AND DynamicFilter` clauses on the probe side); passes after. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../physical_optimizer/filter_pushdown.rs | 41 +++++++++++++++++++ .../physical-plan/src/joins/hash_join/exec.rs | 8 +++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 4ff1fad8f52b9..7cda84c3dc437 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -3374,3 +3374,44 @@ fn test_filter_pushdown_through_sort_with_projection() { " ); } + +/// `FilterPushdown::new_post_optimization()` must be idempotent. When applied +/// to a HashJoinExec, the rule installs a dynamic filter on the probe-side +/// scan; before the fix in `HashJoinExec::gather_filters_for_pushdown`, every +/// invocation created a *new* `DynamicFilterPhysicalExpr` and ANDed it onto +/// the probe side's existing predicate, producing +/// `DynamicFilter AND DynamicFilter AND ...` after N passes. +/// +/// AQE (datafusion-ballista#1359) re-runs the optimizer chain after every +/// completed stage, so this would compound indefinitely without the guard. +#[test] +fn post_phase_is_idempotent_on_hash_join() { + use crate::physical_optimizer::test_utils::{ + hash_join_exec, parquet_exec, schema, + }; + use datafusion_common::JoinType; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; + use datafusion_physical_plan::get_plan_string; + use datafusion_physical_plan::joins::utils::JoinOn; + + let s = schema(); + let left = parquet_exec(Arc::clone(&s)); + let right = parquet_exec(Arc::clone(&s)); + let join_on: JoinOn = vec![( + Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()), + Arc::new(Column::new_with_schema("a", &right.schema()).unwrap()), + )]; + let plan = hash_join_exec(left, right, join_on, None, &JoinType::Inner).unwrap(); + + let config = ConfigOptions::new(); + let rule = FilterPushdown::new_post_optimization(); + let once = rule.optimize(plan, &config).unwrap(); + let twice = rule.optimize(Arc::clone(&once), &config).unwrap(); + + assert_eq!( + get_plan_string(&once), + get_plan_string(&twice), + "second invocation of FilterPushdown::new_post_optimization mutated the plan", + ); +} diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 4ebbf7cb31ccf..067f646a2cdbb 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1640,8 +1640,14 @@ impl ExecutionPlan for HashJoinExec { ChildFilterDescription::all_unsupported(&parent_filters) }; - // Add dynamic filters in Post phase if enabled + // Add dynamic filters in Post phase if enabled. Skip when this join + // already carries a dynamic filter from a previous pass — the shared + // `Arc` is still wired into the probe-side + // scan's predicate, and re-creating it would AND a fresh duplicate + // onto every Post-phase invocation (apache/datafusion-ballista#1359 + // surfaces this in AQE replan loops). if phase == FilterPushdownPhase::Post + && self.dynamic_filter.is_none() && self.allow_join_dynamic_filter_pushdown(config) { // Add actual dynamic filter to right side (probe side)