perf: use fastrange instead of modulo for hash partition assignment (up to 1.15x improvement)#21830
perf: use fastrange instead of modulo for hash partition assignment (up to 1.15x improvement)#21830Dandandan wants to merge 4 commits intoapache:mainfrom
Conversation
Replaces `hash % num_partitions` with Lemire's fastrange `((hash as u128) * n) >> 64` in BatchPartitioner's hash path. A 64-bit hardware `div` is ~20-80 cycles and not pipelined; the mulhi+shift is ~4-6 cycles and fully pipelined, so this removes the div unit as a bottleneck on per-row partition assignment without changing the uniformity of the distribution. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-fastrange (08d083c) to 85e75e2 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-fastrange (08d083c) to 85e75e2 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-fastrange (08d083c) to 85e75e2 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-fastrange (08d083c) to 85e75e2 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
run benchmark tpch10 |
|
FYI @gene-bordegaray you might be interested |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-fastrange (08d083c) to 85e75e2 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
Nice seems quite an improvement on repartition-heavy queries |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-fastrange (f9f26be) to 85e75e2 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-fastrange (f9f26be) to 85e75e2 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-fastrange (f9f26be) to 85e75e2 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-fastrange (f9f26be) to 85e75e2 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
Adds a doc comment on hash_to_partition pointing at Lemire's fastrange repo and blog post so future readers can find the reasoning behind the mulhi+shift partition mapping. Also removes the #[allow(clippy::mutable_key_type)] attribute and accompanying comment from max_grouping_set_duplicate_ordinal; clippy does not flag this site. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-fastrange (de3d860) to 7d5ddca (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-fastrange (de3d860) to 7d5ddca (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-fastrange (de3d860) to 7d5ddca (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true | ||
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 5 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 8 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition WHEN 2 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) WHEN 8 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) ELSE false END ] |
There was a problem hiding this comment.
Why did the test change?
There was a problem hiding this comment.
Fastrange assigns different partition value, so both the values are different as the calculation.
The statistics / expressions themselves are still the same though.
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Rationale for this change
Lemire's fastrange
((hash as u128) * n) >> 64produces a uniform mapping from a 64-bit hash into0..nusing one 64×64→128 multiply plus a shift (~4–6 cycles, fully pipelined).The output is not the same partition number as
hash % nfor a given row, but the uniformity is equivalent for well-distributed hashes, which is all the partitioner cares about.What changes are included in this PR?
datafusion/physical-plan/src/repartition/mod.rs: replacehash % partitionswith fastrange in the hash-partitioning inner loop ofBatchPartitioner.Are these changes tested?
Covered by the existing repartition tests (
cargo test -p datafusion-physical-plan repartition— 41 tests pass locally). No test pins the specific hash→partition mapping; they assert on counts/ordering invariants that fastrange preserves.Are there any user-facing changes?
No public API changes. The one observable difference is that a given row may land on a different output partition than before for
Hashpartitioning — the distribution is still uniform, so downstream operators behave the same, but anything externally capturing exact per-partition row identity will shift.