perf: sort-merge join (SMJ) batch deferred filtering and move mark joins to specialized stream #21184
perf: sort-merge join (SMJ) batch deferred filtering and move mark joins to specialized stream #21184mbutrovich wants to merge 18 commits intoapache:mainfrom
Conversation
|
run benchmarks sort_merge_join |
Note that the 2 queries I expect a speedup on in the |
|
🤖 Criterion benchmark running (GKE) | trigger File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
Also I'm now confused where I should add benchmarks. #20464 added Criterion SMJ benchmarks for sort-merge join , but it's missing scenarios from dfbench's |
| self.coalescer.push_batch(filtered)?; | ||
| let matched_buf = self.matched.finish(); | ||
|
|
||
| match self.join_type { |
|
run benchmark smj |
|
🤖 Benchmark running (GKE) | trigger File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usagesmj — base (merge-base)
smj — branch
File an issue against this benchmark runner |
Yeah, these are expected results. The queries that demonstrate this issue are new in the PR so we don't get to compare against main. |
|
Can you please create a pr with only the queries so we can run the benchmark on |
|
I'm noticing we don't have a ton of test coverage with spilling. I'll try to shore that up. |
|
|
Hi @mbutrovich, thanks for the request (#21184 (comment)). Only whitelisted users can trigger benchmarks. Allowed users: Dandandan, Jefffrey, Omega359, adriangb, alamb, comphead, etseidl, gabotechs, geoffreyclaude, klion26, kosiew, rluvaton, xudong963, zhuqi-lucas. File an issue against this benchmark runner |
|
run benchmark smj |
|
🤖 Benchmark running (GKE) | trigger File an issue against this benchmark runner |
|
I'm running another 50 iterations of fuzz tests now that I added one that spills, so that'll take ~90 minutes. So far I'm through 12 iterations, so I'll check back in once it's done. |
|
🤖 Benchmark completed (GKE) | trigger Details
Resource Usagesmj — base (merge-base)
smj — branch
File an issue against this benchmark runner |
You love to see it. |
|
Damn, well done |
This finished without issue. |
|
run benchmark smj |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing simplify_smj_full_opt (2d2758b) to 37978e3 (merge-base) diff using: smj File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesmj — base (merge-base)
smj — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Partially addresses #20910.
Rationale for this change
Sort-merge join with a filter on outer joins (LEFT/RIGHT/FULL) runs
process_filtered_batches()on every key transition in the Init state. With near-unique keys (1:1 cardinality), this means running the full deferred filtering pipeline (concat +get_corrected_filter_mask+filter_record_batch_by_join_type) once per row — making filtered LEFT/RIGHT/FULL 55x slower than INNER for 10M unique keys.Additionally, mark join logic in
SortMergeJoinStreammaterializes full(streamed, buffered)pairs only to discard most of them viaget_corrected_filter_mask(). Mark joins are structurally identical to semi joins (one output row per outer row with a boolean result) and belong inSemiAntiMarkSortMergeJoinStream, which avoids pair materialization entirely using a per-outer-batch bitset.What changes are included in this PR?
Three areas of improvement, building on the specialized semi/anti stream from #20806:
1. Move mark joins to
SemiAntiMarkSortMergeJoinStreamsemi_anti_sort_merge_join→semi_anti_mark_sort_merge_joinemit_outer_batch()emits all rows with the match bitset as a boolean column (vs semi's filter / anti's invert-and-filter)LeftMark/RightMarkfromSortMergeJoinExec::execute()to the renamed streamSortMergeJoinStream(mark_row_as_match,is_not_nullcolumn generation, mark arms in filter correction)2. Batch filter evaluation in
freeze_streamed()freeze_streamed()into null-joined classification +freeze_streamed_matched()for batched materializationslice→take→interleave)RecordBatchconstruction and singleexpression.evaluate()per freeze instead of per chunkappend_filter_metadata()using builderextend()instead of per-element loop3. Batch deferred filtering in Init state (this is the big win for Q22 and Q23)
process_filtered_batches()on accumulated rows >=batch_sizeinstead of running on every Init entryfreeze_dequeuing_buffered, one accumulating toward next freeze) — does not reintroduce unbounded buffering fixed by PR fix: SortMergeJoin don't wait for all input before emitting #20482Exhaustedstate flushes any remainderCleanup:
SortMergeJoinStreamnow handles only Inner/Left/Right/Full — all semi/anti/mark branching removedget_corrected_filter_mask(): merge identical Left/Right/Full branches; add null-metadata passthrough for already-null-joined rowsfilter_record_batch_by_join_type(): rewrite fromfilter(true) + filter(false) + concattozip()for in-place null-joining — preserves row ordering and removescreate_null_joined_batch()entirelyfilter_record_batch_by_join_type(): usecompute::filter()directly onBooleanArrayinstead of wrapping in temporaryRecordBatchBenchmarks
cargo run --release --bin dfbench -- smjGeneral workload (Q1-Q20, various join types/cardinalities/selectivities): no regressions.
Are these changes tested?
Yes:
cargo test -p datafusion-physical-plan --lib joins::sort_merge_join)cargo test -p datafusion-sqllogictest --test sqllogictests -- join)cargo test -p datafusion-physical-plan --lib joins::semi_anti_mark_sort_merge_join)cargo test -p datafusion --features extended_tests --test fuzz -- join_fuzzAre there any user-facing changes?
No.