[refactor](local shuffle) Move local exchange planning from BE to FE#63366
[refactor](local shuffle) Move local exchange planning from BE to FE#63366924060929 wants to merge 7 commits into
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
2e22e0a to
6fa1901
Compare
|
run buildall |
There was a problem hiding this comment.
I found two blocking issues in the FE local exchange planner path. Critical checkpoints: goal/test coverage: the PR implements FE-side local exchange planning with substantial unit/regression coverage, but the new default path is not safe for mixed FE/BE versions and has a correctness gap in serial-source gating. Scope: the change is focused on local exchange planning, though it is broad and cross-module. Concurrency/lifecycle: no direct new shared Java concurrency issue found; BE deferred local exchanger lifecycle appears intentionally wired before pipeline prepare. Configuration/compatibility: enable_local_shuffle_planner defaults to true and sends new thrift plan nodes/types without a visible BE-version gate, which is a rolling-upgrade blocker. Parallel paths: the old coordinator is forced to BE planning, but the Nereids FE path still needs the compatibility and serial-gating fixes. Testing: good coverage was added, but it does not cover mixed-version execution or the non-serial-source fragment cases noted inline. User focus: no additional user-provided focus points.
| description = {"是否在FE规划Local Shuffle", | ||
| "Whether to plan local shuffle in frontend"}, needForward = true) | ||
| private boolean enableLocalShufflePlanner = true; | ||
|
|
There was a problem hiding this comment.
This default enables the FE planner to serialize the new LOCAL_EXCHANGE_NODE/TLocalPartitionType protocol to every BE as soon as the FE is upgraded. During a rolling upgrade, an old BE does not have TPlanNodeType::LOCAL_EXCHANGE_NODE handling in _create_operator and will reject/fail such fragments, while RuntimeState::plan_local_shuffle() also disables the legacy BE planner because enable_local_shuffle_planner is set. Please gate this path on BE capability/version (or keep the default off until all BEs support the new node) so mixed FE/BE deployments continue to execute queries during upgrade.
There was a problem hiding this comment.
Our deployment process upgrades BE before FE, so during a rolling upgrade an old BE never receives LOCAL_EXCHANGE_NODE from a new FE — the new thrift node type is only sent after all BEs in the cluster have been upgraded. Keeping the default true is intentional so we get the FE planner enabled out of the box once the rollout completes. If you have a deployment topology that flips this order, please set enable_local_shuffle_planner=false in fe.conf or via SQL and the legacy BE path stays bit-for-bit identical to the old behavior.
| // 4. Layer 1: skip LE when serial operator or ancestor in same pipeline | ||
| // Equivalent to BE's need_to_local_exchange: any_of(operators[idx..end], is_serial) → skip | ||
| if (translatorContext.hasSerialAncestorInPipeline(this) || isSerialNode()) { | ||
| return childOutput; |
There was a problem hiding this comment.
This skip uses isSerialNode() even though the comment above isSerialOperatorOnBe() says an isSerialNode() only actually runs with one BE task when fragment.useSerialSource(context) is true. For fragments where useSerialSource is false (for example ignore_storage_data_distribution=false, query cache, or NAAJ), a node such as a scalar aggregate or unpartitioned exchange can still return isSerialNode()==true but BE will execute it with normal parallelism (is_serial_operator=false in thrift). In that case this branch skips a required LocalExchange even though BE would not consider the ancestor serial, so downstream hash/passthrough requirements can be silently dropped. The serial-ancestor propagation at line 1094 has the same issue. Please base these planner decisions on isSerialOperatorOnBe(translatorContext.getConnectContext()), not the syntactic isSerialNode(), except for the explicitly documented heavy-op/local-fragment cases.
There was a problem hiding this comment.
Good catch — fixed in 15d92ba. Both the Layer 1 skip and the serial-ancestor propagation in enforceRequire now use isSerialOperatorOnBe(translatorContext.getConnectContext()) instead of the raw isSerialNode().
Verified that BE's OperatorBase constructs from the Thrift is_serial_operator flag (which FE writes via isSerialOperatorOnBe, not isSerialNode) — so Pipeline::need_to_local_exchange's op->is_serial_operator() check returns false when fragment.useSerialSource(ctx) is false, even if isSerialNode() is true. The previous code would have over-skipped LocalExchange in exactly the scenarios you listed (ignore_storage_data_distribution=false, query cache, NAAJ).
Updated LocalShuffleNodeCoverageTest.testMaterializationNode and testSetOperationAndAssertNumRowsNode to reflect the corrected behavior: in the fragment-less unit-test path isSerialOperatorOnBe returns false (the fragment != null guard) so the framework no longer skips Layer 1 and inserts the required LocalExchange.
Other isSerialNode() call sites in PlanNode.java were audited and left as-is:
toThrift()already usesisSerialOperatorOnBehasSerialChildren()is a pure node-level tree walk used only for fragment-internal heuristicscreateLocalExchange()heavy-op gate is already inside afragment.useSerialSource(ctx)branch, soisSerialNodeandisSerialOperatorOnBeare equivalent there
6fa1901 to
15d92ba
Compare
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 31734 ms |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
f87c73e to
4affd22
Compare
|
run buildall |
4affd22 to
287ef12
Compare
TPC-H: Total hot run time: 31488 ms |
|
run buildall |
287ef12 to
2d021f7
Compare
Previously, local exchange (LE) nodes were inserted exclusively by the BE's `_plan_local_exchange` at pipeline build time. The FE had no visibility into which operators needed a fan-out or shuffle before execution, making it impossible to validate, optimize, or override LE decisions at planning time. This PR introduces a full FE-side local exchange planner that mirrors BE semantics, brings several correctness fixes, and leaves the legacy BE path fully intact behind a feature flag. See "Current architecture notes" at the bottom for what the FE planner does and does not own. A new `AddLocalExchange` pass runs after normal fragment assignment. It walks each fragment's plan tree bottom-up, calling the polymorphic `PlanNode.enforceAndDeriveLocalExchange()` on every node. Nodes declare what distribution they require of their children; the framework inserts `LocalExchangeNode` where needed. `LocalExchangeNode` represents intra-fragment data redistribution and supports the full set of exchange types: PASSTHROUGH, HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, GLOBAL_EXECUTION_HASH_SHUFFLE, BROADCAST, PASS_TO_ONE, ADAPTIVE_PASSTHROUGH, LOCAL_MERGE_SORT, and NOOP. The pass is guarded by `enable_local_shuffle_planner` (default true). When disabled, BE continues to run its own `_plan_local_exchange` as before, keeping the old path fully intact. `maxPerBeInstances` (max pipeline instances assigned to any single BE) is used instead of a global `instanceCount`. Planning is a no-op when `maxPerBeInstances == 1` — inserting LE on a single-threaded pipeline would cause task-count mismatches and pipeline starvation. When a serial operator (e.g. OlapScanNode with a single tablet bucket) feeds a non-serial parent without an intermediate LE, downstream tasks starve waiting for data that never arrives. The framework detects this case and inserts a PASSTHROUGH LE to restore N-task parallelism, exactly matching BE's `required_data_distribution()` serial → PASSTHROUGH rule. `LocalExchangeTypeRequire` abstracts two strategies: - `RequireHash` — always resolves to `LOCAL_EXECUTION_HASH_SHUFFLE` (safe for intra-fragment hash partitioning). - `RequireSpecific` — preserves BUCKET_HASH_SHUFFLE / GLOBAL_EXECUTION_HASH_SHUFFLE without degradation. PR #62438 added `enable_local_exchange_before_agg`, but its BE guard `!_needs_finalize && !enable_local_exchange_before_agg → base` conflated two semantically different cases in AggSink and DistinctStreamingAgg: - **AggSink**: `!finalize && hasKeys` covered both LOCAL preagg (performance-only) and FIRST_MERGE dedup (correctness-critical). The flag-gated early-return wrongly skipped HASH for FIRST_MERGE, producing PASSTHROUGH-over-serial-child → wrong aggregation results. - **DistinctStreamingAgg**: `!finalize` covered both streaming preagg (`useStreamingPreagg=true`, performance) and non-streaming dedup (`useStreamingPreagg=false`, correctness). Same class of bug. FE fix: - AggSink: restrict the flag-gated base path to `!isMerge()` LOCAL phases. FIRST_MERGE always emits HASH regardless of the flag. - DistinctStreamingAgg: restrict to `useStreamingPreagg=true`. Non-streaming dedup always emits HASH. Also add `requiresShuffleForCorrectness()` to mirror BE's `is_shuffled_operator()`, so SetOperationNode propagates the "downstream depends on hash" flag correctly instead of using the coarser `parentRequire.preferType().isHashShuffle()` check that over-inserted HASH LE on every union branch under a streaming preagg. These fixes reduce FE/BE consistency mismatches from 8 to 3 (only pre-existing NLJ optimization differences remain). - `enable_local_shuffle_planner` — use FE planner (default true) - `enable_local_shuffle` — master switch for local shuffle - `enable_local_exchange_before_agg` — HASH LE before non-final agg (default true, mirrors #62438) `validateNoSerialWithoutLocalExchange()` walks the final plan tree and logs a warning whenever a serial operator feeds a non-serial parent without an intermediate LocalExchangeNode, catching planning gaps before execution. - `test_enable_local_exchange_before_agg.groovy` — 10 agg patterns with the flag on and off; covers the FIRST_MERGE and DistinctStreamingAgg correctness fixes. - `test_local_shuffle_fe_be_consistency.groovy` — runs the same SQL with `enable_local_shuffle_planner=true` and `=false` across the full operator matrix (Agg, Sort, Analytic, HashJoin, NLJ, Set, Union, TableFunction, AssertNumRows, RQG-derived corner cases) and asserts result rows are identical. Only data correctness is asserted — the two planners legitimately differ on the exact exchange counts/types they emit, so plan-shape equality is intentionally not checked. - `test_local_shuffle_rqg_bugs.groovy` — reproduces 20+ RQG-found crashes and wrong-result cases. - `test_old_coordinator_local_shuffle.groovy` — verifies the old coordinator path is unaffected. - `test_multilevel_join_agg_local_shuffle.groovy` — multi-level join and aggregation plan shapes. - `multi_version.h`: replace `atomic_load/atomic_store` (deprecated in libstdc++ C++20 / LLVM 20) with `std::shared_mutex`-based RW locking. - `memory.cpp`: fix `std::max` type mismatch (`long` vs `int64_t`) on macOS. - `bucketed_aggregation_sink_operator.h`: fix `ExchangeType::NOOP` → `TLocalPartitionType::NOOP` after thrift enum rename. This PR puts the FE planner in the driver's seat for LE insertion but intentionally does NOT remove the BE-side machinery — readers should be aware of three pieces the FE planner shares with or defers to BE: 1. **`is_serial_operator` is computed on both sides.** FE computes the flag and writes it into Thrift, but BE's `OperatorBase::is_serial_operator()` is still overridden per operator in C++ and used for BE-side runtime decisions. Any future change to the BE override needs to be mirrored on the FE side (and vice versa) to keep the planner's view consistent with execution. 2. **The legacy BE planner stays as a fallback.** `pipeline_fragment_context.cpp::_plan_local_exchange` is preserved and gated by `runtime_state.h::plan_local_shuffle()`: when `enable_local_shuffle_planner=false`, BE plans LE itself, exactly as before. The two paths are mutually exclusive, never both running on the same query. 3. **`_propagate_local_exchange_num_tasks` is kept as a runtime safety net.** The two propagation passes in `pipeline_fragment_context.cpp` fix up paired pipelines whose `num_tasks` end up mismatched (e.g. when AGG/SORT/JOIN pipeline splits leave a serial Exchange feeding an N-task sink). FE's framework-level serial→non-serial fan-out (`enforceRequire` step 3) and the `validateNoSerialWithoutLocalExchange` check aim to make these mismatches impossible by construction, but the BE-side fixup remains as a defensive guard. Co-authored-by: Gabriel <liwenqiang@selectdb.com>
2d021f7 to
9016e5a
Compare
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
… DSL - Delete duplicate testAggFromScanUsesLocalExecutionHashShuffle - Rewrite 8 substring-based tests as DSL shape assertions - Add testUnionAllScanAndValues (Tier B from Trino) - Add assertNoLocalExchangeOfType helper for negative checks - Add nestedLoopJoin/partitionSort/olapScan() factories to PlanShape(Dsl)
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
…_multilevel_join_agg_local_shuffle The local-shuffle FE planner inserts LocalExchange nodes after the Nereids physical plan stage, so `explain shape plan` output is independent of enable_local_shuffle_planner. When the on/off variants did differ, the diff was always driven by stats-sensitive rewrites (e.g. cost-based InferSetOperatorDistinct), not by the planner mode itself — meaning the shape check was effectively asserting stats stability across environments, which it cannot guarantee. Keep result-equality (qt_*_result_on / _result_off) and check_sql_equal between planner modes; drop the shape assertions and the 72 _shape_on blocks from the .out file.
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 31328 ms |
…local-shuffle planner FE-side local-shuffle planner wraps a serial fragment root with a PASSTHROUGH LocalExchangeNode (AddLocalExchange#addLocalExchangeForFragment) so the data sink can fan out across pipeline tasks. That replaces `fragment.getPlanRoot()` with a LocalExchangeNode wherever the original root was a serial FileScanNode. InsertIntoTableCommand#applyInsertPlanStatistic was using `fragment.getPlanRoot() instanceof FileScanNode` to find load-source scans, so after the LE wrap the instanceof check fails, addLoadFileInfo is never called, and LoadStatistic.fileNum / totalFileSizeB stay 0 even though the BE-side scannedRows / loadBytes counters work normally. Symptom: job_p0.streaming_job.test_streaming_insert_job fails with loadStat.fileNumber == 0 (expected 2) and loadStat.fileSize == 0 (expected 256) while scannedRows / loadBytes are correct. Fix: peel any LocalExchangeNode wrappers off the fragment root before the instanceof check, then extract fileNum / totalFileSize from the underlying FileScanNode as before. Verified locally: INSERT INTO t SELECT * FROM LOCAL(...) shows FileNumber=1 FileSize=12 with the fix, FileNumber=0 FileSize=0 without it.
…rk it serial (DORIS-25865)
The FE local-shuffle planner used to insert a LocalExchangeNode directly
under RecursiveCteNode, which broke two RecursiveCte invariants:
1. ThriftPlansBuilder locates the recursive sender fragment via
`recursiveCteNode.getChild(1).getChild(0).getFragment()`. A wrapper LE
between RecCte and the cross-fragment ExchangeNode shifts that path off
the receiver and pulls the recursive producer fragment into
`fragmentsToReset`, so BE rejects with
[INTERNAL_ERROR]Fragment N contains a recursive CTE node
from RecCTESourceOperatorX::prepare().
2. BE's RecCTESourceOperatorX::is_serial_operator() always returns true.
RecursiveCteNode#isSerialNode() on the FE side defaulted to false, so
the planner left the producer fragment with parallel=N sender pipelines
even though only one instance actually emits data. The downstream
cross-fragment Exchange then waits forever on the N-1 silent senders.
Fix in RecursiveCteNode:
- override isSerialNode() to return true so addLocalExchangeForFragment
wraps the fragment root with PASSTHROUGH LE and fans the single
producer out to N parallel sinks (mirrors BE-native behaviour);
- override enforceAndDeriveLocalExchange to call children's own
enforceAndDeriveLocalExchange directly, bypassing the framework's
enforceRequire so no LE gets inserted between RecCte and its
cross-fragment Exchange children — children's subtrees still get LE
planning as normal.
Add regression test test_local_shuffle_recursive_cte covering the three
downstream consumer shapes the JIRA listed plus join / negative control:
rec_cte_agg, rec_cte_window, rec_cte_grouping_sets, rec_cte_select,
rec_cte_join. Each is asserted to produce identical rows under
enable_local_shuffle_planner=true vs =false.
|
run buildall |
Two regression checks were comparing actual rows against .out in a specific order even though the SQL was order-insensitive at that point. The FE local-shuffle planner can legitimately change row delivery order within a fragment, exposing the latent assumption and failing the test. - unnest_order_by_list_test.groovy: qt_window_function_order_by_unnested_value has a window RANK() over UNNEST without an outer ORDER BY. Switch to order_qt_* (framework sorts the actual rows before comparing) and re-sort the corresponding .out block so the expected side is also in sorted order. - test_python_udaf_complex.groovy: qt_json_array_agg → order_qt_json_array_agg. The query GROUP BY category already returns rows in alphabetical category order, so no .out change is needed. Note: this only stabilises row order; the python UDAF's per-group array contents still depend on row arrival order inside each group, so a stricter pin (ORDER BY id in a subquery or array_sort around the agg) would still be needed if that variability resurfaces.
|
run buildall |
TPC-H: Total hot run time: 31839 ms |
FE Regression Coverage ReportIncrement line coverage |
…E-planned LE Two correctness issues in the FE-planned local-shuffle path, both surfaced by single-tablet POOLING / share-scan fragments. 1. FE planner inserted LE(LOCAL_HASH) below a streaming partial agg with distributeExprLists = child table distribution (e.g. [id]) instead of grouping_exprs (e.g. [category]). BE's AggSinkOperatorX / StreamingAggOperatorX::update_operator picks _partition_exprs = grouping_exprs when the chain is not followed_by_shuffled_operator — the common case for a streaming preagg at fragment root with only a cross-fragment HASH ExchangeSink above. Using child distribution scattered same-group rows across N partial-agg instances, turning the preagg into a no-op and breaking row-arrival order at the downstream merge-finalize (manifests as non-deterministic group_concat / py_json_array_agg output, e.g. test_python_udaf_complex json_array_agg). Fix: add overridable PlanNode#getLocalExchangeDistributeExprs(childIndex, followedByShuffled) defaulting to the child's distribution, and override it on AggregationNode to mirror BE's update_operator: use child distribution only when (followedByShuffled || hasDistinct); otherwise use grouping_exprs. 2. BE _create_deferred_local_exchangers used sender_count = upstream_pipe->num_tasks() with no max(_, _num_instances). When the upstream pipeline has a serial source (POOLING OlapScan, serial Exchange), num_tasks() stays at 1 and _propagate_local_exchange_num_tasks Pass 1 deliberately does not raise it, but the shared exchanger is shared across all _num_instances fragment instances on this BE — each instance closes once, so total close-count = _num_instances. Initial 1 minus _num_instances closes drove _running_sink_operators negative (e.g. -5 for 6 instances, -15 for 16), so the exchanger never reached "all senders done", downstream sources blocked on SHUFFLE_DATA_DEPENDENCY forever, and the query hung. Fragments hold block references through the hang; on BE shutdown mem_tracker_limiter::~MemTrackerLimiter fired FATAL, aborting BE and producing the build-948971 "stop grace fail" — root case being dictionary_p0.test_dict_load_and_get_ip_trie's refresh dictionary running scan + LE(PASSTHROUGH) + cross-fragment DICTIONARY_SINK. Fix: mirror BE-planned _add_local_exchange_impl (~line 1023) which uses std::max(cur_pipe->num_tasks(), _num_instances). Tests - New LocalExchangePlannerTest#testStreamingAggHashShuffleUsesGroupingExprs: with t1 DISTRIBUTED BY HASH(k1) and SELECT k2, count(*) GROUP BY k2 (k2 non-bucket -> two-phase agg), asserts the LE below the streaming partial agg carries [k2] (grouping key) not [k1] (child distribution). Verified failing pre-fix, passing post-fix. Whole class (26 tests) green. - Local cluster (output_test, 29030): group_concat probe stable 1,2,3,4,5 across 20 runs after both fixes; matches BE-planner=false output.
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary:
Move local exchange (LE) planning from BE's
_plan_local_exchange(pipeline build time) to a new FE-side planner. The FE planner mirrors BE semantics, brings several correctness fixes, and is gated by a session variable so the legacy BE path stays available as a fallback.Core design
AddLocalExchangepass runs afterDistributePlanner, walking each fragment's plan tree bottom-up via the polymorphicPlanNode.enforceAndDeriveLocalExchange(). Each node declares what distribution it requires of its children; the framework insertsLocalExchangeNodewhere needed.LocalExchangeNoderepresents intra-fragment data redistribution and supports PASSTHROUGH, GLOBAL/LOCAL/BUCKET HASH_SHUFFLE, BROADCAST, PASS_TO_ONE, ADAPTIVE_PASSTHROUGH, LOCAL_MERGE_SORT, NOOP.maxPerBeInstances(max pipeline instances assigned to any single BE) is used instead of global instance count to match BE's_num_instancescheck. Planning is a no-op whenmaxPerBeInstances == 1.required_data_distribution()rule.LocalExchangeTypeRequire:RequireHashadapts to any hash flavour,RequireSpecificpreserves the exact requested type.AggregationNode correctness fixes (DORIS-25413)
PR #62438 introduced a semantic split for
required_data_distribution=HASH(correctness-required vs performance-only). BE's!_needs_finalize && !enable_local_exchange_before_agg → baseearly-return conflates both intents inAggSinkOperatorXandDistinctStreamingAggOperatorX, wrongly catching FIRST_MERGE (correctness) / non-streaming dedup (correctness) and producing PASSTHROUGH-over-serial-child → wrong aggregation results. The FE planner adds the missing!isMerge()/useStreamingPreagg=trueguards so FIRST_MERGE and non-streaming dedup always emit HASH, regardless of the flag. Also addsrequiresShuffleForCorrectness()(mirrors BE'sis_shuffled_operator()) so SetOperationNode propagates the "downstream depends on hash" flag correctly through chains.Session variables
enable_local_shuffle_planner(default true) — use FE planner; when false, BE plans LE itself via the legacy path.enable_local_shuffle— master switch.enable_local_exchange_before_agg— mirrors [Improvement](agg) Add a knob to control local exchange #62438.Architectural notes
This PR puts the FE planner in the driver's seat for LE insertion but intentionally keeps BE-side machinery as a fallback:
is_serial_operatoris still computed on both sides — any future change to BE's per-operator C++ override must be mirrored in FE.pipeline_fragment_context.cpp::_plan_local_exchange) is preserved and gated byruntime_state.h::plan_local_shuffle(); the two paths are mutually exclusive._propagate_local_exchange_num_tasksis kept as a runtime safety net for paired-pipeline num_tasks mismatches.Build fixes (cross-toolchain portability)
multi_version.h: replaceatomic_load/atomic_store(deprecated in libstdc++ C++20 / LLVM 20) withstd::shared_mutex-based RW locking.memory.cpp: fixstd::maxtype mismatch (longvsint64_t) on macOS.bucketed_aggregation_sink_operator.h: fixExchangeType::NOOP→TLocalPartitionType::NOOPafter thrift enum rename.Release note
Add session variable
enable_local_shuffle_planner(default true) to control whether local exchange nodes are planned in FE (new path) or in BE (legacy_plan_local_exchange). The two paths are mutually exclusive; the legacy path remains intact behind this flag.Check List (For Author)
Test
Behavior changed:
enable_local_shuffle_planner=true(default). Plan shapes (LOCAL_EXCHANGE_NODE in TPlanNode) and exchange counts may differ from the legacy BE-planned path, but query results remain equivalent. Settingenable_local_shuffle_planner=falserestores the legacy behavior bit-for-bit.Does this need documentation?
enable_local_shuffle_plannershould be added to the documentation; doc PR will be filed separately.Check List (For Reviewer who merge this PR)