feat: in-memory radix partitioning in grouped hash aggregate (draft)#21856
feat: in-memory radix partitioning in grouped hash aggregate (draft)#21856Dandandan wants to merge 1 commit intoapache:mainfrom
Conversation
When the grouped hash aggregate's working set outgrows a thread's share
of last-level cache, flush the in-memory hash table into a fixed number
of bucketed runs (hashed on the grouping columns) and re-aggregate each
bucket independently with a fresh, cache-resident hash table after the
input is drained. This avoids the L3-thrashing regime that grows the
single hash table beyond cache and ultimately spills to disk.
Matches the design from Müller et al., SIGMOD 2015 ("Cache-Efficient
Aggregation: Hashing Is Sorting"): hashing on small inputs that fit in
cache, partitioning to make them small. The trigger is the working-set
size, not memory-pool exhaustion — by the time the pool is full the
hash table has been thrashing cache for a while.
Two new config options:
- aggregate_radix_partitioned (default true)
- aggregate_radix_partitioned_threshold_bytes (default 32 MiB)
Tests added for high-cardinality, low-cardinality, and single-group
inputs, all asserting the radix path matches the non-radix path.
ClickBench preliminary numbers (single hits.parquet, 3 iterations):
overall ~1% faster (37.9s → 37.5s), with notable wins on Q19 (1.5x)
and Q32 (1.4x), but regressions on Q4–Q9, Q13–Q15 from the threshold
firing on hash tables that already fit in cache. The threshold and
the trigger heuristic both need tuning before flipping the default;
this is a draft.
Disk-spill fallback for an oversized single bucket is not yet
implemented — surfaces an error today.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing radix-partitioned-aggregate (6a2a602) to 65f337d (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing radix-partitioned-aggregate (6a2a602) to 65f337d (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing radix-partitioned-aggregate (6a2a602) to 65f337d (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Rationale for this change
Apache DataFusion's grouped hash aggregate today scales beyond a thread's L3 cache by letting the in-memory hash table grow unbounded, then spilling to disk on memory-pool exhaustion. Between "fits in L3" and "memory pool exhausted" there's a wide regime where the hash table thrashes cache (one near-miss per row), with no defense.
This PR adds a cache-efficient fallback inspired by Müller et al., Cache-Efficient Aggregation: Hashing Is Sorting (SIGMOD 2015): when the working set outgrows a thread's share of last-level cache, the operator radix-partitions the in-memory partial-aggregate state into a fixed number of bucketed runs, and after the input is drained re-aggregates each bucket independently with a fresh, cache-resident hash table. Each bucket holds ~
K / 32groups, so it stays small.The trigger is the working-set size (sized to L3), not memory-pool exhaustion — by the time the pool is full the hash table has been thrashing cache for a while.
What changes are included in this PR?
OutOfMemoryMode::RadixPartitionfor non-Partial modes withGroupOrdering::None.RadixPartitionStateholdingVec<Vec<RecordBatch>>indexed by 5-bit hash bucket (NUM_RADIX_PARTITIONS = 32).ReadingInputpoll path: after each ingested batch, ifgroup_values.size() + accumulators.size() > threshold, flush the hash table into bucketed runs and continue.is_stream_mergingmachinery (each bucket's runs become aBucketStreamthat flows back throughmerge_batch).datafusion.execution.aggregate_radix_partitioned(bool, default true)datafusion.execution.aggregate_radix_partitioned_threshold_bytes(default 32 MiB)Known limitations (draft)
Are these changes tested?
Three new unit tests in
datafusion-physical-plan(test_radix_partitioned_high_cardinality,_low_cardinality,_single_group) drive aSingle-mode aggregate with the cache-size threshold pinned to 64 bytes (so the radix flush fires repeatedly) and assert the output multiset matches the non-radix path on the same input.The pre-existing spill-specific tests (
test_aggregate_with_spill_if_necessary,test_sort_reservation_fails_during_spill,aggregate_source_*_with_spill) explicitly disable the flag in theirTaskContextso they continue to exercise the disk-spill code path.All 89
aggregates::tests pass.Are there any user-facing changes?
The two new config keys appear in
information_schema.df_settingsanddocs/source/user-guide/configs.md. No public Rust API changes.Preliminary ClickBench numbers (single
hits.parquet, 3 iterations)Notable wins:
Notable regressions:
GROUP BY URL, ~10M distinct)The regression pattern is consistent with a too-low threshold: queries whose per-thread hash table sits just above 32 MiB get a forced extra round trip through bucket drain even though the data fit fine in L3. Threshold tuning + a reduction-factor gate (only flush if recent batches show poor aggregation) are the obvious next steps before this is ready for merge.
Follow-ups
SkipAggregationProbeso we don't flush a hash table that is aggregating well.--iterations 5+and on the partitioned dataset for less variance.🤖 Generated with Claude Code