Skip to content

fix(physical-plan): make HashJoinExec dynamic filter pushdown idempotent#22523

Open
wirybeaver wants to merge 1 commit into
apache:mainfrom
wirybeaver:fix-filter-pushdown-post-idempotence
Open

fix(physical-plan): make HashJoinExec dynamic filter pushdown idempotent#22523
wirybeaver wants to merge 1 commit into
apache:mainfrom
wirybeaver:fix-filter-pushdown-post-idempotence

Conversation

@wirybeaver
Copy link
Copy Markdown

Which issue does this PR close?

Related to apache/datafusion-ballista#1359

Rationale

Ballista's Adaptive Query Execution (AQE) planner re-invokes DataFusion's full PhysicalOptimizer chain after every completed stage. FilterPushdown::new_post_optimization() is not idempotent on plans containing HashJoinExec.

In the Post phase, HashJoinExec::gather_filters_for_pushdown unconditionally creates a new DynamicFilterPhysicalExpr and installs it on the probe-side child via with_self_filter. After pass 1 the join already carries a dynamic_filter: Some(...), and the shared Arc<DynamicFilterPhysicalExpr> is already wired into the probe-side scan's predicate. On pass 2 a second dynamic filter is created and ANDed onto the existing predicate, producing DynamicFilter AND DynamicFilter. Each subsequent pass adds another duplicate, compounding indefinitely in AQE replan loops.

What changes are included in this PR?

  • Guard in HashJoinExec::gather_filters_for_pushdown: skip dynamic-filter creation when self.dynamic_filter.is_some(), meaning a previous pass already installed one. The existing Arc remains valid and correctly wired into the probe-side scan.
  • Comment explaining why the guard is needed (AQE replan context).
  • Test post_phase_is_idempotent_on_hash_join in tests/physical_optimizer/filter_pushdown.rs: builds a HashJoinExec, runs FilterPushdown::new_post_optimization() twice, and asserts structural equality via get_plan_string.

Are these changes tested?

Yes. The new test fails without the fix (plan strings diverge due to duplicated dynamic filter predicates) and passes with it.

Are there any user-facing changes?

No. Dynamic filter pushdown is an internal optimization; the idempotence guard only affects re-optimization scenarios (AQE).

🤖 Generated with Claude Code

`FilterPushdown::new_post_optimization()` was ANDing a fresh
`DynamicFilterPhysicalExpr` onto the probe-side scan's predicate on
every invocation. After N passes the probe-side data source carried
`DynamicFilter AND DynamicFilter AND ...` (N copies of the same empty
expression).

Root cause: `HashJoinExec::gather_filters_for_pushdown` always created a
new dynamic filter in the Post phase, regardless of whether the join
already had one from a prior pass. The previous-pass filter is retained
on the `HashJoinExec` itself (the `dynamic_filter` field is preserved
through `with_new_children` via the builder) and the same
`Arc<DynamicFilterPhysicalExpr>` is wired into the probe-side scan's
predicate, so a new one is redundant and stacks duplicates.

Fix: skip dynamic filter creation when `self.dynamic_filter.is_some()`.
The existing shared `Arc` keeps the probe-side scan correctly wired to
the build-side accumulator that will populate it.

Motivation: adaptive execution in datafusion-ballista AQE
(apache/datafusion-ballista#1359) re-runs the entire `PhysicalOptimizer`
chain after every completed stage. Unlike `OutputRequirements` (whose
duplicate wrappers are masked by `new_remove_mode` later in the chain),
this duplication survives to the executed plan and degrades scan
performance with redundant filter evaluation.

Adds `post_phase_is_idempotent_on_hash_join` to
`datafusion/core/tests/physical_optimizer/filter_pushdown.rs`: builds a
hash join over two parquet scans, invokes the rule twice, asserts the
plan strings match. Fails before this fix (two `AND DynamicFilter`
clauses on the probe side); passes after.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant