perf: use DynComparator in sort-merge join (SMJ), microbenchmark queries up to 12% faster, TPC-H overall ~5% faster#21484
perf: use DynComparator in sort-merge join (SMJ), microbenchmark queries up to 12% faster, TPC-H overall ~5% faster#21484mbutrovich wants to merge 11 commits intoapache:mainfrom
Conversation
# Conflicts: # benchmarks/src/smj.rs
…ality at construction.
|
run benchmark smj |
|
run benchmark tpch tpcds |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing dyncomparator (3947c7e) to 91c2e04 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing dyncomparator (3947c7e) to 91c2e04 (merge-base) diff using: smj File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing dyncomparator (3947c7e) to 91c2e04 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesmj — base (merge-base)
smj — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
| let stream_values = stream_batch.compare_key_values(); | ||
|
|
||
| // Build comparator once for the batch pair | ||
| let cmp = JoinKeyComparator::new( |
There was a problem hiding this comment.
Note: piecewise merge join rebuilds the JoinKeyComparator on each resolve_classic_join re-entry (when output exceeds batch size mid-scan), even though the batch pair hasn't changed. Could lift it to ClassicPWMJStream and rebuild only in fetch_stream_batch, but the cost is one make_comparator call per ~8192 rows of output, so probably not worth the added lifecycle coupling. Left as-is for now.
|
I'm confused why I'm failing this extended test: I shouldn't have changed the memory consumption with this PR. |
| @@ -213,6 +213,7 @@ async fn sort_merge_join_spill() { | |||
| .with_config(config) | |||
| .with_disk_manager_builder(DiskManagerBuilder::default()) | |||
| .with_scenario(Scenario::AccessLogStreaming) | |||
There was a problem hiding this comment.
is_join_arrays_equal didn't support Dictionary keys — it hit the not_impl_err! fallthrough. This test was passing because that error counted as the expected "failure." JoinKeyComparator uses make_comparator which handles all Arrow types, so the query now correctly spills and succeeds.
If you add this on main:
.with_expected_errors(vec!["Unsupported data type in sort merge join comparator"])
the test passes, confirming it was failing for the wrong reason on main all along.
There was a problem hiding this comment.
Claude's assessment:
The test's original intent was to verify SMJ fails with OOM when memory is too tight even for spilling. But at 1000 bytes with this data, spilling works fine — there's no "too tight to spill" threshold we can easily hit because the spill path writes to disk before the reservation grows.
The reservation system works like: try_grow fails → spill to disk → no reservation needed. As long as the disk manager works, there's no minimum memory floor that would cause OOM. Even at 1 byte, the first try_grow would fail and spill immediately.
So the test as-is can't be made to fail with OOM under SMJ spilling — it was only "working" because of the Dictionary bug.
I'm not sure if the test is worth keeping, but maybe that's a different PR.
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing dyncomparator (e0d06ee) to 4b1901f (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
Results look really good! I wonder if you thought about the following:
|
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Closes the last item I had on #20910.
Rationale for this change
Sort merge join comparisons (
compare_join_arrays,is_join_arrays_equal) do amatch DataType+downcast_refon every call, per column. These are called per-row in hot join loops across SMJ, semi/anti/mark SMJ, and piecewise merge join.arrow_ord::ord::make_comparatordoes the type dispatch once at construction and returns aDynComparatorclosure that goes straight to typed value comparison. Arrow's ownLexicographicalComparatoruses this pattern for sorting — we should use it for joins too.What changes are included in this PR?
Adds
JoinKeyComparatortojoins/utils.rs: a thin wrapper aroundVec<DynComparator>built once per batch pair. Null handling (NullEqualsNothingboth-null ->Lessoverride) is baked into the closures at construction time socompare()is a branchless loop.Integrated into all hot-path call sites:
materializing_stream.rs:streamed_buffered_cmp(streamed vs buffered) andbuffered_equality_cmp(head vs tail equality)bitwise_stream.rs:outer_inner_cmp,outer_self_cmp,inner_self_cmp; simplifiedfind_key_group_endsignature (takes&JoinKeyComparator, returnsusizeinstead ofResult<usize>since type errors are now caught at construction)piecewise_merge_join/classic_join.rs: single comparator built per batch paircompare_join_arraysis kept for the one-offkeys_matchcall (once per batch boundary).Deleted
is_join_arrays_equal(75-line per-row type dispatch function), replaced byJoinKeyComparator::is_equal.Are these changes tested?
JoinKeyComparator: multi-column mixed types,NullEqualsNull,NullEqualsNothing,nulls_firstorderingAre there any user-facing changes?
No.