Conversation
update plan update simple count and ph string map update fix update refact update
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
Introduces a new “bucketed hash aggregation” execution path that fuses local+global aggregation into a single operator for single-BE deployments, aiming to reduce exchange and (de)serialization overhead. The change spans FE planning/rules, Thrift plan serialization, and BE pipeline operator implementation.
Changes:
- Add a new Thrift plan node type/struct and FE legacy
BucketedAggregationNodeto serialize a bucketed aggregation operator to BE. - Add Nereids physical plan node (
PhysicalBucketedHashAggregate) plus rule integration (SplitAggWithoutDistinct) and planner/translator/cost/stats/property plumbing. - Add BE pipeline source/sink operators and shared state to build per-instance 256-bucket hash tables and merge/output them without exchange.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| gensrc/thrift/PlanNodes.thrift | Adds BUCKETED_AGGREGATION_NODE and TBucketedAggregationNode to the plan thrift. |
| fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | Adds session vars to gate bucketed agg and control eligibility thresholds. |
| fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java | New legacy plan node to serialize bucketed agg to Thrift. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java | Adds visitor hook for the new physical bucketed aggregate. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java | Defines the new Nereids physical operator. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java | Adds PHYSICAL_BUCKETED_HASH_AGGREGATE plan type. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java | Wires stats derivation for bucketed aggregate. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java | Generates bucketed agg candidate plan under session/cluster/stats constraints. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java | Adds property derivation behavior for bucketed aggregate. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java | Adds child output property derivation for bucketed aggregate. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazySlotPruning.java | Adds visitor handling for the new physical node. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java | Extends RF propagation logic to bucketed aggregate. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ProjectAggregateExpressionsForCse.java | Refactors CSE projection logic to work for both hash agg and bucketed hash agg. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java | Translates bucketed aggregate to legacy plan node + refactors shared agg translation helpers. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java | Adds cost model entry for bucketed aggregate. |
| be/src/exec/pipeline/pipeline_fragment_context.cpp | Creates bucketed agg source/sink pipelines and registers shared state/dependencies. |
| be/src/exec/pipeline/dependency.h | Adds BucketedAggSharedState and related bucket merge state machinery. |
| be/src/exec/pipeline/dependency.cpp | Implements aggregate-state destroy helper for bucketed agg shared state. |
| be/src/exec/operator/operator.cpp | Registers local state templates for new bucketed agg sink/source operators. |
| be/src/exec/operator/bucketed_aggregation_source_operator.h/.cpp | Implements source-side pipelined merge and output across buckets/instances. |
| be/src/exec/operator/bucketed_aggregation_sink_operator.h/.cpp | Implements sink-side build into per-bucket hash tables and wakeup logic. |
| be/src/exec/common/hash_table/hash_map_context.h | Adds reusable output_keys buffer and adjusts string-key emplace helpers. |
| be/src/exec/common/agg_utils.h | Refactors agg hash-table variants and adds bucketed-agg-specific variants using PHHashMap for strings. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| List<TExpr> aggregateFunctions = Lists.newArrayList(); | ||
| for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) { |
| // Add bucketed agg candidate when enabled and on single BE with GROUP BY keys | ||
| candidates.addAll(implementBucketedPhase(aggregate, ctx)); | ||
| return candidates.build(); |
|
/review |
There was a problem hiding this comment.
Code Review: Bucketed Hash Aggregation (#61469)
Thorough review of 25 files, +2635/-91 lines. The feature adds a new BucketedAggSinkOperatorX / BucketedAggSourceOperatorX operator pair that fuses two-phase aggregation into a single operator for single-BE deployments, eliminating exchange overhead.
Architecture Assessment
The overall design is sound:
- Pre-registered shared state pattern (same as broadcast hash join) correctly enables cross-instance visibility
- Per-bucket CAS lock + generation counter for missed-wakeup detection is a solid concurrency design
PHHashMap<StringRef>instead ofStringHashMapfor string keys simplifies the emplace interface- Pipelined merge model (sources wake up as sinks finish) avoids blocking on all sinks
- Feature is disabled by default (
enableBucketedHashAgg = false), appropriate for initial landing
Critical Checkpoints
Goal accomplished? Yes, the code implements the described bucketed hash aggregation operator. No tests prove it — see below.
Modification minimal and focused? Mostly yes. The changes are well-scoped to the new feature with appropriate integration points.
Concurrency: The CAS-based bucket locking, state_generation counter for missed wakeups, std::call_once for init, and per-instance data isolation are all correctly designed. The close() method properly releases held CAS locks on premature shutdown. Memory ordering on atomics is appropriate.
Lifecycle management: Null key lifecycle is correct — cross-instance merge transfers ownership, cross-bucket merge in output phase transfers to bucket 0, and _close_one_agg_data correctly destroys remaining states. The merge_null_key function properly nulls out source entries after merge.
Configuration items: Three new session variables with appropriate defaults and needForward = true. Missing negative value checkers but low risk.
Incompatible changes: New Thrift TBucketedAggregationNode and BUCKETED_AGGREGATION_NODE enum are additive-only. No compatibility issues during rolling upgrade since FE gates the feature.
Parallel code paths: The hash_map_context.h const auto& → auto change for origin is safe — the copy is needed because lazy_emplace_with_origin may modify the key during arena persistence.
Test coverage: ZERO. This is the most significant gap — see inline comment.
Observability: Appropriate timer/counter metrics added (BuildTime, MergeTime, HashTableComputeTime, etc.). No VLOG or structured logging for debugging merge coordination issues.
Transaction/persistence: N/A — pure query-side feature.
FE-BE variable passing: Thrift struct correctly propagated. All fields set in BucketedAggregationNode.toThrift().
Performance: The two-phase bucket approach (pre-group by bucket index, then batch-emplace per bucket) provides good cache locality. get_reserve_mem_size() returns 0 which means no memory reservation/backpressure — acceptable for initial version but should be addressed before enabling by default.
Summary of Issues
| # | Severity | Description |
|---|---|---|
| 1 | High | No regression tests, BE unit tests, or FE unit tests |
| 2 | Medium | _close_one_agg_data destroys null key data without checking if it was already consumed by merge |
| 3 | Medium | get_reserve_mem_size() returns 0 — no memory backpressure |
| 4 | Low | Several FE visitor/processor classes use instanceof PhysicalHashAggregate which won't match PhysicalBucketedHashAggregate |
| 5 | Low | PhysicalBucketedHashAggregate.computeUnique() missing containsUniqueFunction check |
See inline comments for details.
| if constexpr (std::is_assignable_v<decltype(data.has_null_key_data()), | ||
| bool>) { | ||
| if (data.has_null_key_data()) { | ||
| _destroy_agg_status( |
There was a problem hiding this comment.
Potential double-destroy of null key aggregate state
In _close_one_agg_data, the null key's aggregate state is unconditionally destroyed if has_null_key_data() is true. However, during the source-side merge (_merge_bucket → merge_null_key), when both source and destination have null keys, the source null key state is merged INTO the destination and then destroyed via merge_agg_states (which calls destroy on the source state). After merge, set_null_key_agg_data(src_data, nullptr) is called, BUT the null_key_data pointer being set to nullptr does NOT affect has_null_key_data() which is a separate flag.
So for a non-merge-target instance whose null key was merged (case 2 in merge_null_key): has_null_key_data() is set to false by set_null_key_flag(src_data, false) — so this case is actually safe.
BUT there's a subtler case: if _close is called before source-side merge completes (e.g., query cancellation), the merge target's null key might be destroyed here AND simultaneously accessed by the merge function. Consider adding a comment documenting the assumed ordering: _close is only called after all source/sink instances have been closed, which happens after the pipeline is fully finished.
Also: the null key data should be checked for non-null before _destroy_agg_status, similar to how the for_each_mapped lambda checks if (mapped). Currently data.template get_null_key_data<AggregateDataPtr>() could be nullptr if no null key was ever inserted but has_null_key_data() was somehow set (e.g., by merge_null_key case 1 which calls set_null_key_flag(dst, true) + set_null_key_agg_data(dst, src_null) — if src_null happened to be nullptr, we'd set flag=true with data=nullptr).
| * 5. Data volume checks pass (min input rows, max group keys) | ||
| */ | ||
| private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan> aggregate, ConnectContext ctx) { | ||
| if (!ctx.getSessionVariable().enableBucketedHashAgg) { |
There was a problem hiding this comment.
No test coverage for this new feature
This PR adds a significant feature (4 new BE operator files, 2 new FE plan nodes, 15+ FE integration points) with zero tests:
- No regression tests exercising
set enable_bucketed_hash_agg=truewith GROUP BY queries - No BE unit tests for bucket assignment, cross-instance merge, null key handling, or simple_count optimization
- No FE unit tests for the eligibility checks in
implementBucketedPhase
Per AGENTS.md: "All kernel features must have corresponding tests. Prioritize adding regression tests under regression-test/." At minimum, the feature needs:
- Regression tests verifying correct results with bucketed agg enabled (various GROUP BY types, nullable keys, HAVING, LIMIT)
- Regression tests verifying the plan shape (EXPLAIN shows
BUCKETED_AGGREGATION_NODE) - FE unit tests for the eligibility logic (single-BE check, min-rows threshold, max-group-keys threshold)
| } | ||
|
|
||
| size_t get_reserve_mem_size(RuntimeState* state, bool eos) override { return 0; } | ||
|
|
There was a problem hiding this comment.
get_reserve_mem_size() returns 0 — no memory backpressure
The existing AggSinkOperatorX::get_reserve_mem_size() participates in memory reservation to enable backpressure when the hash table grows large. Returning 0 here means the bucketed aggregation operator has no memory limit enforcement — if the hash tables grow beyond available memory, there's no spill mechanism and no reservation-based backpressure.
This is acceptable for an initial version that's disabled by default, but should be addressed before the feature is enabled by default. Consider at least tracking total memory usage across all 256 × N_instances hash tables and integrating with the memory reservation framework. A TODO comment here would be helpful to track this.
| public void computeUnique(DataTrait.Builder builder) { | ||
| DataTrait childFd = child(0).getLogicalProperties().getTrait(); | ||
|
|
||
| ImmutableSet.Builder<Slot> groupByKeysBuilder = ImmutableSet.builder(); |
There was a problem hiding this comment.
Missing containsUniqueFunction check in computeUnique()
Comparing with PhysicalHashAggregate.computeUnique() (line 407), which checks containsUniqueFunction(groupByExpressions) — if any GROUP BY expression contains a unique function (e.g., random()), the output is not unique even if all group keys are present. The bucketed variant misses this check.
Same issue exists in computeUniform(). While these are optimizer property derivation gaps (won't cause wrong results), they could lead to incorrect plan optimization in edge cases.
|
|
||
| return inputPlanFragment; | ||
| } | ||
|
|
There was a problem hiding this comment.
Missing SessionVarGuardExpr handling in aggregate expression collection
Comparing with the regular hash agg translator (lines 1198-1227), the bucketed agg translator at lines 1338-1351 does not handle SessionVarGuardExpr wrapping. While the current code path likely doesn't produce SessionVarGuardExpr (since bucketed agg uses GLOBAL_RESULT mode), this is a latent inconsistency that could cause ClassCastException if the code evolves.
Also note: BucketedAggregationNode.toThrift() is missing the updateMaterializedSlots() call that the regular AggregationNode.toThrift() makes (line 110 of AggregationNode.java). In the Nereids path this is typically a no-op, but worth adding for completeness.
update plan
update simple count and ph string map
update fix
update refact
update
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)