Skip to content

feat(aqe): dynamic switching from streaming agg to hash aggregation#1722

Closed
wirybeaver wants to merge 1 commit into
apache:mainfrom
wirybeaver:aggswitch
Closed

feat(aqe): dynamic switching from streaming agg to hash aggregation#1722
wirybeaver wants to merge 1 commit into
apache:mainfrom
wirybeaver:aggswitch

Conversation

@wirybeaver
Copy link
Copy Markdown

Which issue does this PR close?

Part of #1359 (AQE epic) — roadmap line: "switch from streaming aggregation to hash aggregation (extended rules)"

What does this PR do?

Adds DynamicAggregateAlgorithmRule, an AQE physical optimizer rule that re-derives InputOrderMode for each AggregateExec after a shuffle stage resolves and rewrites the operator when the derived mode differs from the cached one.

Problem

DataFusion freezes InputOrderMode (Linear / Sorted / PartiallySorted) at plan time. In Ballista's AQE this creates two issues:

  1. Wasted memory: a downstream aggregate stays Linear (hash table, O(distinct-groups) memory) even when an upstream stage completes and grants ordering on the group-by columns — where Sorted (streaming, O(1) memory) would suffice.
  2. Stale correctness assumption: after a subtree rewrite, an aggregate may hold a Sorted claim when its input is no longer ordered.

Approach

AggregateExec::with_new_children already calls try_new_with_schema which re-derives input_order_mode from the current input EquivalenceProperties. The rule:

  1. Walks the plan with transform_up.
  2. At each AggregateExec, skips if no resolved ExchangeExec exists in the subtree (idempotence guard).
  3. Calls with_new_children([same_input]) to force re-derivation.
  4. Returns Transformed::yes(rebuilt) only when the derived mode differs from the cached one.

No upstream DataFusion changes are required.

Configuration

SET ballista.aqe.dynamic_aggregate.enabled = true;

Disabled by default (false) pending benchmarking.

Checklist

  • New rule registered in default_optimizers() before DistributedExchangeRule
  • Config key ballista.aqe.dynamic_aggregate.enabled added to BallistaConfig
  • 9 unit tests: gate off, no-exchange skip, unresolved-exchange skip, sorted/unsorted input, schema preservation, idempotence
  • All 42 AQE optimizer rule tests pass

Add `DynamicAggregateAlgorithmRule`, an AQE physical optimizer rule that
re-derives `InputOrderMode` for each `AggregateExec` after a shuffle stage
resolves and rewrites the operator when the derived mode differs from the
cached one.

DataFusion freezes `InputOrderMode` (Linear/Sorted/PartiallySorted) at plan
time. In Ballista's AQE, this means a downstream aggregate stays in Linear
(hash table) mode even when an upstream stage completes and grants ordering on
the group-by columns — wasting memory that streaming aggregation would avoid.
The rule also corrects the reverse: a stale Sorted claim after a subtree
rewrite removes the assumed ordering.

The rewrite relies on `AggregateExec::with_new_children` which internally
calls `try_new_with_schema`, re-running the derivation against the current
input `EquivalenceProperties`. No upstream DataFusion changes are required.

Changes:
- `ballista/core/src/config.rs`: add `ballista.aqe.dynamic_aggregate.enabled`
  config key (default: false) with getter `aqe_dynamic_aggregate_enabled()`
- `optimizer_rule/dynamic_aggregate_algorithm.rs`: new rule with 9 unit tests
  (gate, no-exchange/unresolved-exchange skip, mode transitions, idempotence,
  schema preservation)
- `optimizer_rule/mod.rs`: expose new module
- `planner.rs`: register rule before `DistributedExchangeRule`

Closes part of apache#1359
@wirybeaver wirybeaver changed the title feat(aqe): dynamic streaming↔hash aggregation switch feat(aqe): dynamic switching from streaming agg to hash aggregation May 18, 2026
/// want to know if *any* upstream stage has finished.
fn subtree_has_resolved_exchange(plan: &Arc<dyn ExecutionPlan>) -> bool {
if let Some(exchange) = plan.as_any().downcast_ref::<ExchangeExec>() {
exchange.shuffle_created() && !exchange.inactive_stage
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc above says Recurses through all nodes, including through exchange boundaries but the implementation stops recursion on the first ExchangeExec

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — the doc contradicts the implementation. The code stops at the first ExchangeExec and returns based on its resolved state, which is what we actually want (the nearest exchange gates the agg's input; a resolved exchange further down behind an unresolved one doesn't help). The doc should describe the actual behavior.

That said, please see my reply to @milenkovicm on the main thread — the broader design has problems that may make this PR moot.

let result_agg = result.as_any().downcast_ref::<AggregateExec>().unwrap();

// After the rule the mode must match what re-derivation produces.
assert_eq!(result_agg.input_order_mode(), &original_mode);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert that the order mode is Sorted

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right — the assertion is missing, but more importantly the test as written can't exercise a Linear→Sorted transition. aggregate_on_k(exchange) calls AggregateExec::try_new, which derives input_order_mode from the exchange's eq_properties at construction time. Since the exchange wraps a SortExec(k) and propagates its sort ordering, the agg is derived as Sorted immediately. The rule then re-derives the same mode, producing no transition — so the test passes trivially and doesn't validate what its name claims. Full root-cause analysis in my reply to @milenkovicm.

#[test]
fn schema_check_is_false() {
assert!(!DynamicAggregateAlgorithmRule::default().schema_check());
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the tests validates order mode change from Linear to Sorted.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed — and this gap isn't just test coverage. with_new_children re-derivation produces the same mode the constructor would, because ExchangeExec.equivalence_properties() doesn't change between construction and resolution. So the rule can't, by design, produce a Linear↔Sorted transition. Details in my reply to @milenkovicm.

@milenkovicm
Copy link
Copy Markdown
Contributor

Sorry but I do not understand this PR, can you please give a bit more explanation what are you trying to do in this PR?

In which case can we assume sorting is going to be preserved after an exchange ?

@milenkovicm
Copy link
Copy Markdown
Contributor

Wasted memory: a downstream aggregate stays Linear (hash table, O(distinct-groups) memory) even when an upstream stage completes and grants ordering on the group-by columns — where Sorted (streaming, O(1) memory) would suffice.

In which cases will this happen? When can w guarantee preserving sorting across shuffles ?

Stale correctness assumption: after a subtree rewrite, an aggregate may hold a Sorted claim when its input is no longer ordered.

Can you please give an example when would this claim be true?

@wirybeaver wirybeaver marked this pull request as draft May 21, 2026 07:10
@wirybeaver
Copy link
Copy Markdown
Author

@milenkovicm Thanks for pushing back — your questions exposed that my reasoning was wrong. Let me walk through what I now see.

The flawed premise. I assumed ExchangeExec::equivalence_properties() would reflect post-shuffle ordering once the exchange resolves. It doesn't. From ballista/scheduler/src/state/aqe/execution_plan.rs:118:

let eq_properties = input.properties().eq_properties.clone();

The eq_properties are captured at construction time from the pre-shuffle input and never updated when resolve_shuffle_partitions() is called.

Why the rule is a no-op (or worse).

  1. Linear → Sorted can't usefully fire. If the agg's input advertises sort properties, those were visible at the agg's construction time too — try_new_with_schema derived the right mode then. The rule's with_new_children re-derivation produces the same mode.

  2. When it would fire, it would be incorrect. A hash repartition wrapping a SortExec carries the sort eq_properties on the exchange, even though the actual post-shuffle data is no longer globally sorted (hash shuffle destroys global ordering). Switching the agg to Sorted based on those properties would cause streaming aggregation to produce wrong results.

  3. In valid replan scenarios it's redundant. If a sibling rule changes the agg's subtree during transform_up, the framework already calls with_new_children on the parent, which re-derives the mode — no separate rule needed.

What the roadmap line actually requires.

For AQE to re-derive useful information after a stage resolves, the resolved exchange (or whatever node represents the post-shuffle reader) needs to advertise post-shuffle properties — driven by shuffle mode:

  • Hash shuffle: destroys global ordering, only preserves partitioning on the hash key
  • Sort-based shuffle: preserves per-partition ordering on the sort key
  • Single-partition coalesce: preserves input ordering

Once those properties reflect reality, no dedicated re-derivation rule is needed — with_new_children during normal optimizer passes naturally picks up the change.

Proposal. Close this PR. I'd like to open a separate one (or RFC discussion) to update ExchangeExec.equivalence_properties() to translate properties on resolve based on shuffle mode. Happy to take direction from you on whether that's the right next step or if I'm still misreading the AQE design.

Apologies for the misfire — should have validated the premise with a runnable example before opening.

@wirybeaver
Copy link
Copy Markdown
Author

Closing per my analysis above — the design is wrong at the foundation. Will reopen with a different approach (likely updating ExchangeExec.equivalence_properties() to reflect post-shuffle reality on resolve) once direction is settled.

@wirybeaver wirybeaver closed this May 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants