perf: simplify HashJoinExec dynamic filter, drop CASE routing#21931
perf: simplify HashJoinExec dynamic filter, drop CASE routing#21931adriangb wants to merge 6 commits intoapache:mainfrom
Conversation
|
run benchmark tcph baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false |
|
run benchmark tcph baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (7a8272f) to main diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (7a8272f) to main diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
run benchmark tpch baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
run benchmark tpch baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (7a8272f) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (7a8272f) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
run benchmark tpch baseline:
ref: main
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS=128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS=128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false |
|
Hi @adriangb, your benchmark configuration could not be parsed (#21931 (comment)). Error: Supported benchmarks:
Usage: Per-side configuration ( env:
SHARED_SETTING: enabled
baseline:
ref: v45.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 1G
changed:
ref: v46.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 2GFile an issue against this benchmark runner |
|
run benchmark tpch baseline:
ref: main
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS=128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS=128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
Hi @adriangb, your benchmark configuration could not be parsed (#21931 (comment)). Error: Supported benchmarks:
Usage: Per-side configuration ( env:
SHARED_SETTING: enabled
baseline:
ref: v45.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 1G
changed:
ref: v46.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 2GFile an issue against this benchmark runner |
|
run benchmark tpch baseline:
ref: main
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS: 128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS: 128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false |
|
run benchmark tpch baseline:
ref: main
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS: 128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS: 128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (18129fe) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (18129fe) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
In Partitioned-mode HashJoinExec, when every reported partition's build
side uses a hash-table strategy, replace the routing CASE expression
(`CASE hash_repartition % N WHEN p THEN bounds AND hash_lookup ELSE
false END`) with `global_minmax AND multi_hash_lookup`.
The new MultiMapLookupExpr hashes the join keys once with HASH_JOIN_SEED
and ORs `contain_hashes()` across every partition's hash table,
eliminating both the routing-hash computation and the per-branch
re-hashing that CaseExpr does. Any non-Map partition (InList, Empty)
disqualifies the fast path and we use the legacy CASE unchanged; same
for partitions that were canceled before reporting build data.
Benchmarks (TPC-H SF=1, 7 iters back-to-back):
TOTAL min vs no-DF:
legacy CASE: +3.0%
multi_hash_lookup: +1.6% (~halves the regression)
Per-query (multi_hash_lookup vs CASE):
Q4 -6.0% Q5 -3.3% Q7 -6.0% Q8 -4.0%
Q9 -3.5% Q12 -3.2% Q17 -1.6% Q21 -3.8%
Refs: apache#19858
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…on IN (SET) When every reported partition for a Partitioned hash join uses InList pushdown and the cross-partition union would be ≤ 20 array entries, concatenate the per-partition `ArrayRef`s and emit `global_minmax AND struct(c0,c1,…) IN (SET)` instead of the routing CASE. The cap is set so the merged set can participate in parquet stats / bloom-filter pruning at the scan, which a per-partition CASE or a `multi_hash_lookup` cannot. A TPC-H SF=1 cap sweep (cap=20/50/100/200/2000) confirmed 20–50 is the sweet spot — past ~200 the larger static_filter hash set blows out of L1 and runtime regresses below the legacy CASE. The tightened path also subsumes the `force_hash_collisions` optimization (when the runtime collapses every key into one partition we get the same shape, just for a different reason) so both `#[cfg]` snapshot branches in test_hashjoin_dynamic_filter_pushdown_partitioned now produce the merged `IN (SET)` form. Refs: apache#19858 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff using: tpcds File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
@gene-bordegaray @Dandandan I'm curious what you think of this. It's a bit of a compromise:
What this gets us is:
|
Which issue does this PR close?
Rationale for this change
Today the
Partitioned-modeHashJoinExecbuilds a dynamic filter that's structured around the repartition layout:Two problems with this:
hash_repartition % Neven though the partition's hash table will be probed anyway with a different seed (HASH_JOIN_SEED).This PR replaces the routing CASE with a structure that depends only on the content of the build side, not its layout, and adds a cross-partition merged `IN (SET)` fast path so small joins can participate in parquet stats / bloom-filter pruning at the scan side.
What changes are included in this PR?
Five commits:
Final filter-shape matrix
No more `CASE`, no more `hash_repartition`, no more `REPARTITION_RANDOM_STATE` in the dynamic-filter path.
Performance
Per-row cost (Partitioned mode)
Let `N` = number of build-side partitions. For each probe row evaluated by the dynamic filter:
Two countervailing forces shape the result:
Benchmarks (TPC-H, runner = c4a-highmem-16, ARM Neoverse-V2, 16 cores)
Triggered four runs covering both N regimes and both pushdown configs.
Default partitioning (N ≈ ncores ≈ 16)
In the `pushdown=true` run the wins concentrate on queries where the dynamic filter feeds parquet stats / bloom-filter pruning at the scan:
Q17 alone is 109 ms of the 93 ms total wall-clock improvement — that's the original issue's regression, fixed. The small-query regressions (Q3/Q5/Q9/Q13/Q14) are the all-Map shape paying `O(N)` probes per row for moderate-to-large build sides where the bounds prefix doesn't prune much.
High partition count (`target_partitions=128`)
A stress test of partition-count scaling on the same 16-core box.
Pushdown=false @ N=128: 11 queries faster, 0 slower, 11 unchanged. `multi_hash_lookup` cleanly beats the legacy 128-branch `CaseExpr` evaluation. Big wins on Q3 (1.68×), Q5 (1.82×), Q7 (1.75×), Q8 (2.15×), Q9 (1.64×), Q12 (1.81×), Q13 (1.76×), Q17 (1.59×), Q18 (1.29×), Q20 (2.05×), Q21 (1.43×).
Pushdown=true @ N=128: 4 faster, 9 slower. This is the case where the filter runs in the scan hot loop and `multi_hash_lookup`'s `O(N)` probes per row dominate. The wins (Q17 1.87×, Q18 1.39×, Q20 1.76×) survive because their merged `IN (SET)` prunes whole row groups before the per-row filter ever runs. The losses (Q5 1.88×, Q9 1.64×, Q21 1.62×, Q14 1.51×, Q8 1.37×) are the same all-Map shape paying 128 probes per row.
Summary of the regime grid
Three of the four configs are wins (one big), one is a regression. The single regressing config is `high N + scan-side pushdown`, which is exactly the scenario `OptionalFilterPhysicalExpr` from #20363 is designed to absorb: the adaptive tracker would measure `multi_hash_lookup`'s low `bytes_pruned_per_second_of_eval_time` for queries like Q5/Q9/Q21 and drop the filter, while keeping Q17/Q18/Q20 (which prune scans aggressively).
A possible structural follow-up — re-introducing partition routing inside `MultiMapLookupExpr` (1 routing hash + 1 probe, so per-row cost matches legacy CASE) — would close the regression at any N, with or without #20363.
Are these changes tested?
Are there any user-facing changes?
🤖 Generated with Claude Code