[Feature](agg) support bucketed agg operator#61495
[Feature](agg) support bucketed agg operator#61495BiteTheDDDDt wants to merge 5 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Pull request overview
This PR introduces a new “bucketed hash aggregation” optimization path that fuses local+global aggregation into a single operator for single-BE deployments, along with the required FE/BE plan and pipeline support.
Changes:
- Add a new Thrift plan node type + payload (
BUCKETED_AGGREGATION_NODE/TBucketedAggregationNode) and wire it intoTPlanNode. - Add Nereids physical plan + translation + costing to generate and pick
PhysicalBucketedHashAggregate. - Implement BE pipeline sink/source operators and shared-state to build per-instance bucketed hash tables and merge/output them without exchange.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| gensrc/thrift/PlanNodes.thrift | Adds new plan node type and Thrift struct for bucketed aggregation |
| fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | Adds session variables controlling the optimization and thresholds |
| fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java | Adds legacy planner node that serializes bucketed agg into Thrift |
| fe/fe-core/src/main/java/org/apache/doris/nereids/** | Adds new physical node, visitor hooks, properties, stats, cost model, and implementation rule |
| be/src/exec/pipeline/pipeline_fragment_context.cpp | Creates bucketed agg source/sink pipelines and registers shared state |
| be/src/exec/pipeline/dependency.{h,cpp} | Adds BucketedAggSharedState and cleanup/destroy support |
| be/src/exec/operator/operator.cpp | Registers new bucketed agg pipeline local states |
| be/src/exec/operator/bucketed_aggregation_* | Implements bucketed agg sink/source operators |
| be/src/exec/common/hash_table/hash_map_context.h | Adds reusable output buffer to hash method state |
| be/src/exec/common/agg_utils.h | Factors agg hash-table variants and adds BucketedAggDataVariants |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| + "消除 Exchange 开销和序列化/反序列化成本。默认关闭。", | ||
| "Whether to enable bucketed hash aggregation optimization. This optimization fuses two-phase " | ||
| + "aggregation into a single operator on single-BE deployments, eliminating exchange overhead " | ||
| + "and serialization/deserialization costs. Disabled by default."}) |
| TBucketedAggregationNode bucketedAggNode = new TBucketedAggregationNode(); | ||
| bucketedAggNode.setGroupingExprs(groupingExprs); | ||
| bucketedAggNode.setAggregateFunctions(aggregateFunctions); | ||
| bucketedAggNode.setIntermediateTupleId(aggInfo.getOutputTupleId().asInt()); |
| struct TBucketedAggregationNode { | ||
| 1: optional list<Exprs.TExpr> grouping_exprs | ||
| 2: required list<Exprs.TExpr> aggregate_functions | ||
| 3: required Types.TTupleId intermediate_tuple_id | ||
| 4: required Types.TTupleId output_tuple_id | ||
| 5: required bool need_finalize | ||
| } |
| return ImmutableList.of(); | ||
| } | ||
| // Only for single-BE deployments | ||
| int beNumber = Math.max(1, ctx.getEnv().getClusterInfo().getBackendsNumber(true)); |
| op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs); | ||
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| for (int i = 0; i < _num_instances; i++) { | ||
| auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(), | ||
| "BUCKETED_AGG_SINK_DEPENDENCY"); | ||
| sink_dep->set_shared_state(shared_state.get()); | ||
| shared_state->sink_deps.push_back(sink_dep); | ||
| } | ||
| shared_state->create_source_dependencies(_num_instances, op->operator_id(), |
| if (expr instanceof SlotReference) { | ||
| SlotReference slot = (SlotReference) expr; | ||
| if (slot.getOriginalColumn().isPresent()) { | ||
| groupByColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase()); |
| } | ||
| // All distribution columns must appear in the GROUP BY keys | ||
| for (Column column : distributionColumns) { | ||
| if (!groupByColumnNames.contains(column.getName().toLowerCase())) { |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
1 similar comment
|
run buildall |
|
/review |
There was a problem hiding this comment.
Bucketed Hash Aggregation Review — Code Review Skill Checkpoint Analysis
Part 1.3 Critical Checkpoint Conclusions
Goal: Fuse two-phase (local + global) aggregation into a single operator for single-BE deployments using 256 per-bucket hash tables. The code achieves this goal structurally, but has critical correctness and safety issues.
Concurrency: Sink instances write to independent per_instance_data[task_idx] (safe). Source uses per-bucket CAS locking (safe). state_generation fence-based wakeup is correct. However, shared probe_expr_ctxs are called concurrently without per-instance cloning — see inline comment.
Memory Safety: get_reserve_mem_size() returns 0, completely bypassing pipeline memory reservation. With 256 hash tables × N instances, uncontrolled memory growth is possible. No spill path exists as fallback. No SCOPED_PEAK_MEM instrumentation. See inline comment.
Test Coverage: Zero tests — no regression tests, no BE unit tests, no FE unit tests for ~2700 lines of new production code across FE, BE, and Thrift.
Configuration: 3 new session variables added (enable_bucketed_hash_agg, etc.) — appropriately gated.
FE-BE Protocol: New TPlanNodeType::BUCKETED_AGGREGATION_NODE = 37 properly handled in both FE (BucketedAggregationNode) and BE (pipeline_fragment_context.cpp).
Incompatible Changes: None — new node type, feature-gated, no existing behavior modified.
Observability: Profile counters added for hash table size, memory, build/expr/compute/emplace timers. Adequate.
Other observations:
simple_countoptimization usesreinterpret_cast<UInt64&>onAggregateDataPtrto store counters in pointer bits — correct but subtle, add a comment explaining the aliasing.is_fixed_keymember added toAggregatedDataVariantsbut never read in the diff.close()properly releases held CAS locks (prevents deadlock on LIMIT-induced early termination) — good.- Source-side merge correctly handles the two-level fan-in from N instances × 256 buckets.
Summary: 1 Critical, 1 Significant, 1 Major gap
- Critical: Data race on shared
probe_expr_ctxs(concurrent mutation of non-thread-safeVExprContext) - Significant: Memory reservation bypassed (
get_reserve_mem_sizereturns 0) - Major gap: Zero test coverage for ~2700 lines of new code
| SCOPED_TIMER(_expr_timer); | ||
| for (size_t i = 0; i < key_size; ++i) { | ||
| int result_column_id = -1; | ||
| RETURN_IF_ERROR(shared_state.probe_expr_ctxs[i]->execute(block, &result_column_id)); |
There was a problem hiding this comment.
Critical: Data race on shared probe_expr_ctxs
shared_state.probe_expr_ctxs[i]->execute() is called by ALL sink instances concurrently on the SAME shared VExprContext objects. VExprContext::execute() mutates _last_result_column_id (a non-atomic int) and potentially FunctionContext internal state.
The probe_expr_ctxs were cloned once in call_once (lines 88-94) into shared state, then used concurrently by all sink instances without per-instance cloning.
Note that _aggregate_evaluators are already correctly cloned per-instance (lines 113-115) with a comment explaining the same class of bug. The same treatment is needed here.
Fix: Each sink instance should clone its own probe_expr_ctxs in open() (similar to lines 113-115), and _execute_with_serialized_key should use the per-instance clones instead of shared_state.probe_expr_ctxs.
| return DataDistribution(ExchangeType::NOOP); | ||
| } | ||
|
|
||
| size_t get_reserve_mem_size(RuntimeState* state, bool eos) override { return 0; } |
There was a problem hiding this comment.
Significant: Memory reservation completely bypassed
get_reserve_mem_size() returns 0, which disables the pipeline memory reservation protocol for this operator. For comparison, the existing AggSinkOperatorX::get_reserve_mem_size() returns hash_table->estimate_memory(batch_size) + _memory_usage_last_executing.
With 256 hash tables per instance × N pipeline instances, hash table resizes can cause massive uncontrolled memory growth with no back-pressure mechanism. There is also:
- No
SCOPED_PEAK_MEMinstrumentation (unlike the regular agg operator) - No spill path as a fallback
- No
_memory_sufficient_dependencywiring (perbe/src/exec/AGENTS.mdrequirements)
Even if spill support is deferred, the reservation protocol should still provide accurate estimates so the scheduler can apply back-pressure before OOM.
5e132d9 to
5773305
Compare
|
run buildall |
1 similar comment
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
update plan update simple count and ph string map update fix update refact update adjust default value fmt fix fix update update update
15e1a5a to
142e1da
Compare
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
1 similar comment
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
1 similar comment
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
1 similar comment
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
This pull request introduces a new bucketed hash aggregation operator for the pipeline engine, refactors aggregation data variant handling to support this new operator, and adds supporting infrastructure for efficient memory usage and operator registration. The main changes include new source and sink operator implementations for bucketed aggregation, a flexible and reusable aggregation data variant base, and various supporting improvements for memory management and code organization.
Bucketed Hash Aggregation Operator Implementation:
bucketed_aggregation_sink_operator.handbucketed_aggregation_source_operator.himplementing the sink and source operators for bucketed hash aggregation, including local state management, per-bucket hash tables, and pipelined merge logic. [1] [2]Aggregation Data Variant Refactoring:
agg_utils.hto introduce a parameterizedAggMethodVariantsBaseandAggDataVariantsBase, supporting both traditional and bucketed aggregation with different string key hash map implementations. AddedBucketedAggDataVariantsand associated types for bucketed aggregation. [1] [2]Performance and Memory Improvements:
These changes collectively enable efficient, parallel, and memory-aware bucketed hash aggregation in the pipeline engine, improving scalability and paving the way for further aggregation optimizations.