Skip to content

Conversation

@ahmed-mez
Copy link
Contributor

@ahmed-mez ahmed-mez commented Nov 24, 2025

Which issue does this PR close?

Rationale for this change

When a GroupedHashAggregateStream finishes processing its input, it emits the accumulated groups. The current implementation materializes all groups into a single RecordBatch (via emit(EmitTo::All)) before slicing it into smaller batches.

For queries with high-cardinality grouping keys (e.g., >500k groups) or complex types (Strings, Lists), this single emission step becomes a blocking operation that can stall the async runtime for seconds. This "long poll" prevents other tasks from running, leading to latency spikes and reduced concurrency.

This PR changes the emission strategy to respect the configured batch_size during the drain phase, emitting groups incrementally instead of all at once.

What changes are included in this PR?

  1. Incremental Emission Logic:

    • Introduced a new ExecutionState::DrainingGroups in GroupedHashAggregateStream.
    • Modified poll_next to handle this state by emitting batch_size groups at a time.
    • Added input_done() to the GroupValues trait to signal the transition to drain mode.
  2. GroupValues Updates:

    • Implemented input_done() in GroupValues implementations (e.g., GroupValuesRows).
    • In GroupValuesRows, input_done() clears the hash map (to free memory immediately) and sets a drain_mode flag to optimize sequential emission.
  3. Test Case:

    • Added test_chunked_group_emission to verify that the chunked emission behavior works correctly:
      • Verifies that groups are emitted in multiple batches (not all at once).
      • Confirms that batch sizes respect the configured batch_size limit.
      • Ensures all groups are eventually emitted.
    • Added test_long_poll_reproducer to aggregates/mod.rs which demonstrates the latency improvement (from ~2.8s to ~2.1s in local tests) and verifies that results are emitted in multiple small batches rather than one huge batch. This test case doesn't have to be committed, its purpose is mainly to demo the issue and the fix.

Are these changes tested?

Yes, two new test cases have been added.

Are there any user-facing changes?

  • Performance: Users running large aggregations should see lower latency to the first result and fewer "hiccups" (long stalls) in their query engine.
  • Memory: Hash maps are cleared slightly earlier (when input is done), potentially freeing memory sooner during emission.
  • API: No public API changes.

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Nov 24, 2025
@ahmed-mez ahmed-mez changed the title Emit in chunks Emit aggregation groups in chunks to avoid blocking async runtime Nov 24, 2025
@ahmed-mez ahmed-mez marked this pull request as ready for review November 24, 2025 10:11
self.emission_offset = end;
if self.emission_offset == group_values.num_rows() {
group_values.clear();
self.emission_offset = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I fully understand this. Looking at the previous code, it does look like it was already emitting groups incrementally.

The difference I see is that in this new code we track the offset with self.emission_offset but the other side of the if statements mutates self.group_values in place to trim the results that have already been emitted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The emission_offset optimization is just to avoid the expensive "copy remaining rows" operation that the old EmitTo::First path did during input processing. The real fix is replacing emit(EmitTo::All) (which blocks for seconds on large group counts) with incremental drain https://github.com/apache/datafusion/pull/18906/files#diff-69c8ecaca5e2c7005f2ed1facaa41f80b45bfd006f2357e53ff3072f535c287dR1196

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't that mean then that the else part (when drain_mode == false) is now dead code? FWIW I can put a panic!() here:

                    output
                } else {
+                   panic!("foo");
                    let groups_rows = group_values.iter().take(n);
                    let output = self.row_converter.convert_rows(groups_rows)?;

And all the tests in the crate pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's needed for emit_early_if_necessary() for Partial mode with memory pressure and group_ordering.emit_to() when input is sorted or partially sorted. Those 2 cases can trigger incremental / early emission.

I added a test case for sorted input c4903b6 that triggers that path.

Copy link
Contributor

@gabotechs gabotechs Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll reframe. If during the benchmarks you see that the total execution time is the same with the old code vs these new one that executes with drain_mode == true, how about just leaving group_values/row.rs as it was?

If there's actually no noticeable performance improvement, we might same some lines of code and complexity by just keeping the old path here in group_values/row.rs, as the actual improvement is happening in row_hash.rs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can give that a try but it will be less efficient for sure. There is no value in retaining the state of the hash map when draining. And you can see that the drain_mode == false path is doing much heavier operations compared to drain_mode == true.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ahmed-mez -- do you have any measurements / benchmarks that illustrate how this change improves performance?

Your initial PR description in #18907 (comment) sounds like maybe a separate thread pool may be more appropriate?

@ahmed-mez
Copy link
Contributor Author

I augmented the reproducer test case with some stats to clarify the benefit:

  • In chunked, we dramatically reduced poll times (23ms in vs 2.88s previously) and provided many more yield points (i.e, opportunities for other async tasks to get scheduled by the runtime)
  • Total execution time is essentially the same (2.87s vs 2.88s), there is no performance overhead.

Note: The first poll in chunked (2.21s) includes input processing (building the hash table). This is unavoidable and the same in both approaches.

Comment on lines -1170 to +1196
let batch = self.emit(EmitTo::All, false)?;
batch.map_or(ExecutionState::Done, ExecutionState::ProducingOutput)
ExecutionState::DrainingGroups
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I understand better what's happening now, correct me if I'm wrong:

  • The previous implementation, upon finishing accumulating all groups, it bundled everything into a big RecordBatch and then proceeded to yield slices of it respecting the configured batch_size
  • The current implementation, upon finishing accumulating all groups, it bundles nothing, and instead each RecordBatch of size batch_size is bundled on-demand as the stream gets polled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, I imagine there's no reason to keep both modes of emitting outputs right? would there still be any situation where we want the previous behavior?

Copy link
Contributor Author

@ahmed-mez ahmed-mez Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I understand better what's happening now, correct me if I'm wrong:

  • The previous implementation, upon finishing accumulating all groups, it bundled everything into a big RecordBatch and then proceeded to yield slices of it respecting the configured batch_size
  • The current implementation, upon finishing accumulating all groups, it bundles nothing, and instead each RecordBatch of size batch_size is bundled on-demand as the stream gets polled

Yes, that's pretty much it.

If that's the case, I imagine there's no reason to keep both modes of emitting outputs right? would there still be any situation where we want the previous behavior?

Are you referring to EmitTo::All vs EmitTo::First(n) ? It can make sense, however, I also see EmitTo::All being used in many other paths (e.g when spilling) so I'm not confident enough to make the call. Happy to update the code accordingly if it's recommended.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was more referring to ExecutionState::ProducingOutput and ExecutionState::DrainingOutput. If we are capable of draining the output generating small chunks on-demand, I imagine there's no value in also keeping the previous full batch production.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I see it ExecutionState::ProducingOutput is not doing anything wrong per se, it's still useful for producing results while still reading input (e.g. in sorted input scenario) which requires a different (more involved) handling compared to ExecutionState::DrainingOutput.

The fundamental problem was in the way set_input_done_and_produce_output emitted results, not in the ExecutionState::ProducingOutput implementation.

I do acknowledge though that the the current PR added some complexity to the code flow and I'm happy to explore ways to simplify it.

@geoffreyclaude
Copy link
Contributor

Note: The first poll in chunked (2.21s) includes input processing (building the hash table). This is unavoidable and the same in both approaches.

Why is this unavoidable? Can't we insert yield points in the hash table build as well?

@ahmed-mez
Copy link
Contributor Author

ahmed-mez commented Nov 25, 2025

Note: The first poll in chunked (2.21s) includes input processing (building the hash table). This is unavoidable and the same in both approaches.

Why is this unavoidable? Can't we insert yield points in the hash table build as well?

One problem at a time, I'd like to keep the scope limited and easy to review 😅 Also tbf, I did see this particular overhead in the test but not in practice. In practice / prod-like envs the majority of the stalls happen bc of the emission.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Long poll (async runtime stall) in HashAggregate when emitting large number of groups

4 participants