perf: selective column concat in hash join build side#21735
perf: selective column concat in hash join build side#21735SubhamSinghal wants to merge 2 commits intoapache:mainfrom
Conversation
|
@Dandandan can you please review this PR? |
|
run benchmark tpch10 tpcds |
|
Wouldn't the not required columns be removed with projection pushdown? |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing selective-column-concat-hash-join (2fd3f61) to 3aaf393 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing selective-column-concat-hash-join (2fd3f61) to 3aaf393 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
right. Totally missed this. I think this PR is not adding any value. Closing it. |
Which issue does this PR close?
related to: #18942
Rationale for this change
In
CollectLefthash joins,concat_batchescopies all columns from the build side into a singleRecordBatch, even when only a subset is needed for the join output, filter evaluation, and key computation. For wide tables (20+ columns), this wastes significant memory and CPU. Savings = (total_columns - needed_columns) / total_columnsFor ex:
For a 20-column table needing 3 columns: skips 85% of the copy
For a 10-column table needing 8 columns: skips 20% of the copy
For a 5-column table needing 5 columns: skips 0% (short-circuit)
This PR projects build-side batches to only the needed columns before
concat_batches, reducing both peak memory and copy time.What changes are included in this PR?
datafusion/physical-plan/src/joins/hash_join/exec.rs:compute_build_side_projection()— determines which build-side columns are actually needed (union of output columns, filter columns, and join key expression columns)remap_column_indices()— translates original column indices to projected positionsevaluate_and_concat_per_batch()— evaluates join key expressions per-batch before projection, then concatenates result arrays (only used when projection is active)collect_left_input()andtry_create_array_map()to project batches beforeconcat_batcheswhen a column subset sufficesbuild_column_remapfield toJoinLeftDatato carry the remap table downstreamdatafusion/physical-plan/src/joins/hash_join/stream.rs:collect_build_side(), remapscolumn_indicesand filtercolumn_indiceswhen build-side projection is activeAre these changes tested?
Yes — covered by existing tests
No new tests added since this is an internal optimization that doesn't change observable behavior. The existing test suite covers all join types, partition modes, filter combinations, empty build sides, and outer join unmatched-row handling.
Are there any user-facing changes?
No.