[Improvement](streaming-agg) adjust single streaming threshold#61104
[Improvement](streaming-agg) adjust single streaming threshold#61104BiteTheDDDDt wants to merge 2 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
/review |
There was a problem hiding this comment.
Code Review Summary
PR Goal
This PR introduces an optimization for streaming aggregation operators by detecting single-backend queries and using more aggressive (higher) hash table expansion thresholds. The intent is to reduce wasteful pre-aggregation when all data is local.
Critical Checkpoint Conclusions
1. Does the code accomplish the goal? Is there a test that proves it?
- No, the feature is effectively dead code due to a critical timing bug (see Issue 1 below).
set_single_backend_queryis called aftercontext->prepare(), but the local states that read this flag are constructed duringprepare(). The flag will always befalsewhen read. - No tests are provided — neither regression tests nor BE unit tests verify the new behavior.
2. Is this modification as small, clear, and focused as possible?
- The change also silently modifies the L2 cache threshold for the default (non-single-BE) path in
streaming_aggregation_operator.cppfrom1024 * 1024to256 * 1024(see Issue 2). This is a separate behavioral change that affects all queries.
3. Concurrency concerns?
_is_single_backend_queryinQueryContextis a plainbool, notstd::atomic<bool>. Sinceset_single_backend_queryis called on a different thread than potentially reading it duringprepare(), there is technically a data race. However, this is moot given the timing bug (Issue 1) — once fixed by moving the set before prepare, the race disappears.
4. Are there functionally parallel code paths?
- Yes. Both the legacy Coordinator path (
Coordinator.sendPipelineCtx) and the Nereids path (ThriftPlansBuilder.plansToThrift) correctly set the flag. These are mutually exclusive paths, so no inconsistency.
5. Configuration items?
- No configuration is added. The threshold values are hardcoded. Consider whether this should be a session variable or at least a
configflag to enable/disable the optimization.
6. Incompatible changes?
- The new Thrift field
single_backend_query(field 202) isoptionalwith defaultfalse, so it is backward-compatible during rolling upgrades.
7. Test coverage?
- No tests provided. For a performance optimization that changes aggregation behavior, at minimum a regression test demonstrating the different code paths should be included.
8. Performance analysis?
- The approach is sound conceptually — single-backend queries don't benefit from weak pre-aggregation since there is no shuffle overhead to amortize. However, the 5x/10x thresholds are very aggressive and may cause under-aggregation in moderate reduction scenarios. This is a tuning concern.
Issues Found
See inline comments for details.
| // Expand into L3 cache if we look like we're getting some reduction. | ||
| // At present, The L2 cache is generally 1024k or more | ||
| {1024 * 1024, 1.1}, | ||
| {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1}, |
There was a problem hiding this comment.
[Behavioral Change — Needs Clarification] The L2 cache threshold for the default (non-single-BE) path was silently changed from 1024 * 1024 (1MB) to 256 * 1024 (256KB). This means all multi-backend queries are also affected — they will now require a 1.1x reduction at 256KB instead of 1MB to expand into L3 cache.
The comment still says "The L2 cache is generally 1024k or more", which is now inconsistent with the 256KB value.
Is this intentional? If so, it should be called out in the PR description as a separate change. If not, this is a regression affecting all queries.
| // The L3 cache is generally 16MB or more | ||
| {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0}, | ||
| }; | ||
|
|
There was a problem hiding this comment.
[Robustness] STREAMING_HT_MIN_REDUCTION_SIZE is computed from STREAMING_HT_MIN_REDUCTION but is also used to bound iteration over SINGLE_BE_STREAMING_HT_MIN_REDUCTION. Both arrays currently have 3 entries, but if someone modifies one array without the other, this would silently cause out-of-bounds access or skip entries.
Consider adding a static_assert to enforce they stay in sync:
static_assert(sizeof(SINGLE_BE_STREAMING_HT_MIN_REDUCTION) == sizeof(STREAMING_HT_MIN_REDUCTION),
"SINGLE_BE and default reduction tables must have the same number of entries");Same applies to streaming_aggregation_operator.cpp.
There was a problem hiding this comment.
Pull request overview
This PR optimizes streaming aggregation performance for queries that run on a single backend. It introduces a single_backend_query flag that propagates from the FE to the BE, allowing streaming aggregation operators to use more aggressive hash table expansion thresholds when all data is local to one backend. The higher thresholds (5.0 and 10.0 vs. 1.1 and 2.0) mean the hash table will be expanded more aggressively, favoring in-memory aggregation over streaming passthrough on single-backend queries.
Changes:
- Added
single_backend_queryfield toTQueryOptions(Thrift), with FE logic inCoordinatorContext,Coordinator, andThriftPlansBuilderto detect single-backend scenarios and propagate the flag. - Added
_is_single_backend_queryfield and getter/setter toQueryContext, with initialization inFragmentMgr::exec_plan_fragment. - Both
StreamingAggLocalStateandDistinctStreamingAggLocalStatenow select between two sets of hash table reduction thresholds based on the single-backend flag.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
gensrc/thrift/PaloInternalService.thrift |
Added single_backend_query boolean field to TQueryOptions |
fe/.../ThriftPlansBuilder.java |
Sets single_backend_query on query options from CoordinatorContext |
fe/.../CoordinatorContext.java |
Adds memoized isSingleBackendQuery supplier based on backend count |
fe/.../Coordinator.java |
Sets single_backend_query on per-fragment params in legacy path |
be/.../query_context.h |
Adds _is_single_backend_query field with getter/setter |
be/.../fragment_mgr.cpp |
Sets single_backend_query on QueryContext from params |
be/.../streaming_aggregation_operator.h |
Adds _is_single_backend member to StreamingAggLocalState |
be/.../streaming_aggregation_operator.cpp |
Defines SINGLE_BE_STREAMING_HT_MIN_REDUCTION and uses it based on flag; also changes L2 threshold from 1MB to 256KB in the default table |
be/.../distinct_streaming_aggregation_operator.h |
Adds _is_single_backend member to DistinctStreamingAggLocalState |
be/.../distinct_streaming_aggregation_operator.cpp |
Defines SINGLE_BE_STREAMING_HT_MIN_REDUCTION and uses it based on flag |
Comments suppressed due to low confidence (1)
be/src/pipeline/exec/streaming_aggregation_operator.cpp:90
STREAMING_HT_MIN_REDUCTION_SIZEis computed fromSTREAMING_HT_MIN_REDUCTIONbut also used to bound the loop when iterating overSINGLE_BE_STREAMING_HT_MIN_REDUCTION. If the two tables ever diverge in size, this would cause an out-of-bounds access or premature termination. Consider either adding astatic_assertthat both arrays have the same size, or computing a separate size constant for the single-BE table.
static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // The L3 cache is generally 16MB or more | ||
| {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0}, | ||
| }; | ||
|
|
There was a problem hiding this comment.
STREAMING_HT_MIN_REDUCTION_SIZE is computed from STREAMING_HT_MIN_REDUCTION but also used to bound the loop when iterating over SINGLE_BE_STREAMING_HT_MIN_REDUCTION. If the two tables ever diverge in size, this would cause an out-of-bounds access or premature termination. Consider either adding a static_assert that both arrays have the same size, or computing a separate size constant for the single-BE table.
| static_assert(sizeof(STREAMING_HT_MIN_REDUCTION) == | |
| sizeof(SINGLE_BE_STREAMING_HT_MIN_REDUCTION), | |
| "STREAMING_HT_MIN_REDUCTION and SINGLE_BE_STREAMING_HT_MIN_REDUCTION " | |
| "must have the same size"); |
| // Expand into L3 cache if we look like we're getting some reduction. | ||
| // At present, The L2 cache is generally 1024k or more | ||
| {1024 * 1024, 1.1}, | ||
| {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1}, |
There was a problem hiding this comment.
The L2 cache min_ht_mem threshold was changed from 1024 * 1024 (1MB) to 256 * 1024 (256KB) for the non-single-backend case. This is a behavioral change to the default (multi-backend) streaming aggregation path that is not mentioned in the PR description. Additionally, the comment on the preceding line says "The L2 cache is generally 1024k or more", which is inconsistent with the new 256KB threshold. Please either update the comment to explain the rationale for 256KB, or confirm that this change is intentional and document it in the PR description.
| // Expand into L3 cache if we look like we're getting some reduction. | ||
| // At present, The L2 cache is generally 1024k or more | ||
| {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0}, |
There was a problem hiding this comment.
The comment says "The L2 cache is generally 1024k or more" but the min_ht_mem threshold is set to 256 * 1024 (256KB). The comment is inconsistent with the value. Please update the comment to match the chosen threshold and explain the rationale.
| // Expand into L3 cache if we look like we're getting some reduction. | ||
| // At present, The L2 cache is generally 1024k or more | ||
| {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0}, |
There was a problem hiding this comment.
The comment says "The L2 cache is generally 1024k or more" but the min_ht_mem threshold is set to 256 * 1024 (256KB). The comment is inconsistent with the value. Please update the comment to match the chosen threshold and explain the rationale.
| static constexpr StreamingHtMinReductionEntry SINGLE_BE_STREAMING_HT_MIN_REDUCTION[] = { | ||
| // Expand up to L2 cache always. | ||
| {.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0}, | ||
| // Expand into L3 cache if we look like we're getting some reduction. | ||
| // At present, The L2 cache is generally 1024k or more | ||
| {.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 5.0}, | ||
| // Expand into main memory if we're getting a significant reduction. | ||
| // The L3 cache is generally 16MB or more | ||
| {.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 10.0}, | ||
| }; |
There was a problem hiding this comment.
The StreamingHtMinReductionEntry struct, both reduction tables (STREAMING_HT_MIN_REDUCTION and SINGLE_BE_STREAMING_HT_MIN_REDUCTION), and STREAMING_HT_MIN_REDUCTION_SIZE are now fully duplicated between streaming_aggregation_operator.cpp and distinct_streaming_aggregation_operator.cpp. Consider extracting these into a shared header to avoid having to maintain two copies and keep them in sync.
|
run buildall |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
This pull request introduces an optimization for streaming aggregation operators by detecting when a query is assigned to a single backend and applying more aggressive hash table expansion thresholds in that case. The change propagates a new
single_backend_queryflag from the frontend to the backend, and both thestreaming_aggregation_operatoranddistinct_streaming_aggregation_operatoruse this flag to select different hash table reduction thresholds, potentially improving performance for single-backend queries.Single-backend query detection and propagation:
single_backend_queryfield toTQueryOptionsand set it in the FE (CoordinatorContext,Coordinator, andThriftPlansBuilder) when all fragments are assigned to a single backend. [1] [2] [3] [4] [5] [6]QueryContext, with new getter and setter methods and a backing field. [1] [2] [3]Streaming aggregation operator optimization:
SINGLE_BE_STREAMING_HT_MIN_REDUCTION) in bothstreaming_aggregation_operator.cppanddistinct_streaming_aggregation_operator.cpp. [1] [2]single_backend_queryflag and uses the appropriate threshold table to decide when to expand hash tables. [1] [2] [3] [4]