Skip to content

Adaptive (cost-based) decision for skip_partial_aggregation instead of fixed ratio threshold #22405

@zhuqi-lucas

Description

@zhuqi-lucas

Is your feature request related to a problem or challenge?

DataFusion's skip_partial_aggregation already runtime-samples: after skip_partial_aggregation_probe_rows_threshold rows (default 100k), it checks the measured group/row ratio and switches into skip mode if the ratio exceeds skip_partial_aggregation_probe_ratio_threshold (default 0.8).

The heuristic is too coarse for queries where partial aggregation is net-negative but doesn't hit the 0.8 bar. Concrete example — ClickBench Q18:

SELECT "UserID",
       extract(minute FROM to_timestamp_seconds("EventTime")) AS m,
       "SearchPhrase",
       COUNT(*)
FROM hits
GROUP BY "UserID", m, "SearchPhrase"
ORDER BY COUNT(*) DESC
LIMIT 10;

On the standard 100M-row hits.parquet (12 partitions, M-series MacBook, release build, hot cache, median of 5 runs):

Config Q18 elapsed
Default (ratio_threshold = 0.8) 2.72 s
ratio_threshold = 0.6, probe_rows = 5000 1.57 s (1.73× faster)

EXPLAIN ANALYZE shows why — the measured ratio is 0.565 (≈ 50.88 M groups / 90.12 M rows), well below the 0.8 cut-off, so partial aggregation keeps running. It costs 17 s of compute (summed across 12 partitions) and reduces input by only ~40 %, then the final aggregate still has to chew through 60 M rows. With skipping, partial agg drops to 1.27 s and the final stage actually runs faster too (better cache locality without the partial-state lookup overhead).

A single fixed threshold cannot capture this — the right decision depends on the absolute time saved by partial aggregation vs the absolute time spent doing partial aggregation, which varies with input rate, group state size, hash function cost, and downstream work. A query with ratio 0.4 on small group state may still benefit from partial; another with ratio 0.6 on heavy variable-length keys may not.

Describe the solution you'd like

Replace the fixed ratio check with a cost-aware adaptive decision:

  1. Measure both sides during the probe window:
    • partial_agg_time_per_row — wall time spent on hash-probe + insert per input row
    • passthrough_time_per_row — extrapolated time of just forwarding the batch
    • output_rows_per_input_row — actual reduction factor
  2. Estimate net benefit = passthrough_time × output_rows (cost of larger downstream) vs partial_agg_time × input_rows (cost of doing partial). If partial is net-negative beyond a margin, switch to skip.
  3. Re-probe periodically — every N batches re-evaluate, so a query whose distribution shifts mid-stream can switch back.
  4. Bound the overhead — the probe itself should add ≤ 1 % overhead in the worst case.

This subsumes the current ratio_threshold (which can stay as a fallback / safety check).

Related code paths

  • datafusion/physical-plan/src/aggregates/row_hash.rs:441skip_aggregation_probe field
  • datafusion/physical-plan/src/aggregates/row_hash.rs:635 — current probe-and-switch logic
  • datafusion/common/src/config.rsskip_partial_aggregation_probe_* options
  • Tests for current behavior: test_skip_aggregation_after_first_batch, test_skip_aggregation_after_threshold in aggregates/mod.rs

Why this matters

  • ClickBench Q18 (and likely Q35, other count-by-many-keys queries) hit this regularly
  • Removes a magic knob that users have to discover and tune per-query
  • Aligns with the cost-based query optimization direction the optimizer is moving toward elsewhere

Describe alternatives you've considered

  • Lower the default ratio_threshold to 0.6 — simple one-line change but still magic; would need full ClickBench validation to ensure no regression elsewhere. Could be a stepping-stone before the adaptive version.
  • Per-aggregate-shape heuristic (different threshold for different group key types / aggregate functions) — more rules to maintain, doesn't generalize.
  • Pure pass-through (always skip) — would regress low-cardinality cases where partial is genuinely useful (Q35-shape, where ClientIP cardinality is moderate and partial agg compacts effectively).

Additional context

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions