Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3374,3 +3374,44 @@ fn test_filter_pushdown_through_sort_with_projection() {
"
);
}

/// `FilterPushdown::new_post_optimization()` must be idempotent. When applied
/// to a HashJoinExec, the rule installs a dynamic filter on the probe-side
/// scan; before the fix in `HashJoinExec::gather_filters_for_pushdown`, every
/// invocation created a *new* `DynamicFilterPhysicalExpr` and ANDed it onto
/// the probe side's existing predicate, producing
/// `DynamicFilter AND DynamicFilter AND ...` after N passes.
///
/// AQE (datafusion-ballista#1359) re-runs the optimizer chain after every
/// completed stage, so this would compound indefinitely without the guard.
#[test]
fn post_phase_is_idempotent_on_hash_join() {
use crate::physical_optimizer::test_utils::{
hash_join_exec, parquet_exec, schema,
};
use datafusion_common::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_plan::get_plan_string;
use datafusion_physical_plan::joins::utils::JoinOn;

let s = schema();
let left = parquet_exec(Arc::clone(&s));
let right = parquet_exec(Arc::clone(&s));
let join_on: JoinOn = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()),
Arc::new(Column::new_with_schema("a", &right.schema()).unwrap()),
)];
let plan = hash_join_exec(left, right, join_on, None, &JoinType::Inner).unwrap();

let config = ConfigOptions::new();
let rule = FilterPushdown::new_post_optimization();
let once = rule.optimize(plan, &config).unwrap();
let twice = rule.optimize(Arc::clone(&once), &config).unwrap();

assert_eq!(
get_plan_string(&once),
get_plan_string(&twice),
"second invocation of FilterPushdown::new_post_optimization mutated the plan",
);
}
8 changes: 7 additions & 1 deletion datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1640,8 +1640,14 @@ impl ExecutionPlan for HashJoinExec {
ChildFilterDescription::all_unsupported(&parent_filters)
};

// Add dynamic filters in Post phase if enabled
// Add dynamic filters in Post phase if enabled. Skip when this join
// already carries a dynamic filter from a previous pass — the shared
// `Arc<DynamicFilterPhysicalExpr>` is still wired into the probe-side
// scan's predicate, and re-creating it would AND a fresh duplicate
// onto every Post-phase invocation (apache/datafusion-ballista#1359
// surfaces this in AQE replan loops).
if phase == FilterPushdownPhase::Post
&& self.dynamic_filter.is_none()
&& self.allow_join_dynamic_filter_pushdown(config)
{
// Add actual dynamic filter to right side (probe side)
Expand Down