Skip to content

[fix](local shuffle) fix bucket shuffle exchange incorrectly marked as serial in pooling mode#62054

Closed
924060929 wants to merge 4 commits intoapache:masterfrom
924060929:opt_bucket_shuffle_exchange
Closed

[fix](local shuffle) fix bucket shuffle exchange incorrectly marked as serial in pooling mode#62054
924060929 wants to merge 4 commits intoapache:masterfrom
924060929:opt_bucket_shuffle_exchange

Conversation

@924060929
Copy link
Copy Markdown
Contributor

@924060929 924060929 commented Apr 2, 2026

What problem does this PR solve?

Issue Number: close #xxx

Problem Summary:

When a fragment has pooled scan (ignore_data_distribution enabled), BUCKET_SHUFFLE exchanges were incorrectly treated as serial on the BE side (due to hasSerialScanNode() in ExchangeNode.isSerialOperator()). This caused the exchange to reduce receiver instances to only 1 per BE instead of multiple instances matching bucket count, significantly hurting parallelism and query performance (e.g., TPCH Q9 lineitem+orders bucket shuffle join).

Root cause: The seriality of an exchange should depend solely on the exchange type and use_serial_exchange setting — not on whether the downstream scan is pooled. Scan pooling is handled independently by the scan pipeline's local shuffle mechanism.

Changes

FE changes:

  1. ExchangeNode.isSerialOperator(): Remove hasSerialScanNode() guard. Only UNPARTITIONED exchanges or those with use_serial_exchange=true are serial. BUCKET_SHUFFLE exchanges are never serial.

  2. DistributePlanner.filterInstancesWhichCanReceiveDataFromRemote(): Use isSerialOperator() && useSerialSource() to decide whether to dedupe receivers to 1 per BE, instead of checking LocalShuffleAssignedJob type. Preserve the independent enableShareHashTableForBroadcastJoin branch for broadcast join shared hash table optimization (consistent with Coordinator.computeFragmentExecParams()).

  3. DistributePlanner.sortDestinationInstancesByBuckets(): For non-serial exchanges with LocalShuffleBucketJoinAssignedJob, use getAssignedJoinBucketIndexes() (the distributed join bucket assignments) to determine sender destinations, consistent with ThriftPlansBuilder.computeBucketIdToInstanceId() on the receiver side. Previously, it always used scan source buckets which are pooled to one instance per BE, causing a sender-receiver bucket mapping mismatch.

BE changes:

  1. pipeline_fragment_context.cpp: For non-serial BUCKET_SHUFFLE exchanges, compute the set of destination instances from bucket_seq_to_instance_idx values. Pass this to ExchangeSourceOperatorX.

  2. exchange_source_operator: Padding instances (those with num_instances > bucket_count per BE, having no bucket assignment) create VDataStreamRecvr with num_senders=0 so they return EOS immediately instead of waiting indefinitely for senders that will never target them.

Release note

None

Check List (For Author)

  • Test:
    • Unit Test (BucketShuffleDestinationTest, ExchangeNodeSerialTest, exchange_source_operator_test)
    • Manual test (3-BE cluster, all combinations: serial/non-serial × share_hash_table on/off × parallel_pipeline_task_num 0-8)
  • Behavior changed: No
  • Does this need documentation: No

@Thearas
Copy link
Copy Markdown
Contributor

Thearas commented Apr 2, 2026

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@924060929 924060929 force-pushed the opt_bucket_shuffle_exchange branch from e935b63 to 4c3e765 Compare April 2, 2026 14:11
@924060929
Copy link
Copy Markdown
Contributor Author

run buildall

…s serial in pooling mode

1. ExchangeNode.toThrift: is_serial = isSerialOperator() && useSerialSource().
   isSerialOperator() is pure operator-level: only UNPARTITIONED or use_serial_exchange.
   useSerialSource() is the fragment-level pooling guard, kept in toThrift (not in
   isSerialOperator) to avoid infinite recursion: useSerialSource() calls
   planRoot.isSerialOperator() internally.

2. DistributePlanner.filterInstancesWhichCanReceiveDataFromRemote(): use
   linkNode.isSerialOperator() && linkNode.getFragment().useSerialSource() to decide
   whether to dedupe to first-per-worker destinations. Bucket shuffle / hash exchanges
   are never serial — scan seriality is handled by the scan pipeline independently.

3. Add unit tests for both changes.
@924060929 924060929 force-pushed the opt_bucket_shuffle_exchange branch from 4c3e765 to 122110b Compare April 2, 2026 16:09
@924060929
Copy link
Copy Markdown
Contributor Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 66.67% (6/9) 🎉
Increment coverage report
Complete coverage report

@doris-robot
Copy link
Copy Markdown

TPC-H: Total hot run time: 16520 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit 122110b98440a485a3f5683e5e0ca75e8eb35d1d, data reload: false

------ Round 1 ----------------------------------
customer	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:27:43	NULL	utf-8	NULL	NULL	
lineitem	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:38:59	NULL	utf-8	NULL	NULL	
supplier	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:27:25	NULL	utf-8	NULL	NULL	
region	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:27:24	NULL	utf-8	NULL	NULL	
orders	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	4565	3646	3602	3602
q2	q3	3533	656	663	656
q4	843	340	355	340
q5	2939	1295	1124	1124
q6	289	150	123	123
q7	q8	7862	1261	1161	1161
q9	q10	4624	1758	1730	1730
q11	370	266	254	254
q12	615	676	499	499
q13	2451	2758	2138	2138
q14	276	285	254	254
q15	q16	837	816	766	766
q17	946	996	872	872
q18	q19	5910	960	721	721
q20	q21	3102	1967	2050	1967
q22	436	357	313	313
Total cold run time: 39598 ms
Total hot run time: 16520 ms

----- Round 2, with runtime_filter_mode=off -----
customer	Doris	NULL	NULL	15000000	92	1381653732	NULL	4374759	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:27:43	NULL	utf-8	NULL	NULL	
lineitem	Doris	NULL	NULL	600037902	33	19843441616	NULL	61784740	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:38:59	NULL	utf-8	NULL	NULL	
supplier	Doris	NULL	NULL	1000000	87	87519212	NULL	194931	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:27:25	NULL	utf-8	NULL	NULL	
region	Doris	NULL	NULL	5	240	1201	NULL	147	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:27:24	NULL	utf-8	NULL	NULL	
orders	Doris	NULL	NULL	150000000	42	6422171781	NULL	22778155	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	3896	3883	3878	3878
q2	q3	5073	4329	4305	4305
q4	1936	2047	1325	1325
q5	4921	5014	5047	5014
q6	178	154	119	119
q7	q8	8733	2919	2931	2919
q9	q10	4903	4131	4189	4131
q11	529	372	368	368
q12	647	684	477	477
q13	2417	2825	2123	2123
q14	273	275	249	249
q15	q16	730	741	672	672
q17	1188	1153	1150	1150
q18	q19	4842	885	856	856
q20	q21	5346	5079	4569	4569
q22	513	463	401	401
Total cold run time: 46125 ms
Total hot run time: 32556 ms

@doris-robot
Copy link
Copy Markdown

TPC-DS: Total hot run time: 73983 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit 122110b98440a485a3f5683e5e0ca75e8eb35d1d, data reload: false

query5	4341	638	506	506
query6	348	231	218	218
query7	4244	554	336	336
query8	337	238	226	226
query9	8723	3897	3901	3897
query10	524	405	362	362
query11	6808	5522	5127	5127
query12	
query13	1328	430	418	418
query14	5823	4812	4726	4726
query14_1	
query15	354	199	192	192
query16	
query17	
query18	
query19	287	200	194	194
query20	
query21	279	118	116	116
query22	
query23	
query23_1	
query24	
query24_1	
query25	
query26	868	187	178	178
query27	3130	380	371	371
query28	3040	1891	1916	1891
query29	
query30	349	209	197	197
query31	
query32	357	76	78	76
query33	544	307	304	304
query34	2958	677	648	648
query35	792	687	649	649
query36	1246	1190	1092	1092
query37	350	102	88	88
query38	
query39	972	845	855	845
query39_1	838	830	837	830
query40	291	138	146	138
query41	76	63	63	63
query42	308	285	267	267
query43	375	271	280	271
query44	
query45	211	193	195	193
query46	
query47	
query48	753	319	313	313
query49	668	460	453	453
query50	1156	246	221	221
query51	4361	4395	4193	4193
query52	279	285	271	271
query53	333	356	272	272
query54	420	298	309	298
query55	102	97	98	97
query56	338	338	305	305
query57	
query58	366	279	282	279
query59	3292	2716	2728	2716
query60	348	328	311	311
query61	157	151	153	151
query62	707	575	550	550
query63	306	267	273	267
query64	6664	1320	1080	1080
query65	
query66	2198	488	384	384
query67	
query68	
query69	419	317	298	298
query70	1023	1004	999	999
query71	359	348	315	315
query72	
query73	1217	459	440	440
query74	
query75	3814	2986	2985	2985
query76	2659	994	863	863
query77	396	403	340	340
query78	11344	11202	10690	10690
query79	1522	1166	856	856
query80	824	746	669	669
query81	456	287	240	240
query82	1320	156	121	121
query83	368	296	274	274
query84	314	153	120	120
query85	988	514	466	466
query86	396	341	322	322
query87	
query88	3487	2719	2692	2692
query89	507	368	370	368
query90	1886	176	167	167
query91	184	143	161	143
query92	77	73	71	71
query93	897	825	503	503
query94	
query95	756	432	366	366
query96	1110	373	311	311
query97	2700	2540	2521	2521
query98	
query99	1245	982	962	962
Total cold run time: 146059 ms
Total hot run time: 73983 ms

924060929 added a commit to 924060929/incubator-doris that referenced this pull request Apr 3, 2026
…in pooling mode

### What problem does this PR solve?

Issue Number: close #xxx

Related PR: apache#62054

Problem Summary:

When ExchangeNode.isSerialOperator() is changed to not include hasSerialScanNode()
(PR apache#62054), non-serial BUCKET_SHUFFLE exchanges with pooled scans cause query hangs.

Two issues are fixed:

1. **FE: Sender destination mismatch** - `sortDestinationInstancesByBuckets()` used
   scan source buckets (`bucketIndexToScanNodeToTablets`) which are pooled to one
   instance per BE, causing ALL bucket data to be sent to just 1 instance per BE.
   But `computeBucketIdToInstanceId()` (receiver side) uses join bucket assignments
   (`getAssignedJoinBucketIndexes()`) which distribute buckets across instances.
   Fix: Use `getAssignedJoinBucketIndexes()` for `LocalShuffleBucketJoinAssignedJob`
   in non-serial mode, consistent with `computeBucketIdToInstanceId()`.

2. **BE: Padding instance hang** - When num_instances > bucket count on a BE,
   extra "padding" instances create VDataStreamRecvrs that never receive EOS.
   Fix: Identify padding instances (not in `bucket_seq_to_instance_idx` values)
   and create their recvrs with num_senders=0 so they complete immediately.

### Release note

None

### Check List (For Author)

- Test: Manual test with 3-BE cluster, all combinations of serial/non-serial,
  share_hash_table on/off, parallel_pipeline_task_num 0-8
- Behavior changed: No
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
924060929 added a commit to 924060929/incubator-doris that referenced this pull request Apr 3, 2026
…in pooling mode

### What problem does this PR solve?

Issue Number: close #xxx

Related PR: apache#62054

Problem Summary:

When ExchangeNode.isSerialOperator() is changed to not include hasSerialScanNode()
(PR apache#62054), non-serial BUCKET_SHUFFLE exchanges with pooled scans cause query hangs.

Two issues are fixed:

1. **FE: Sender destination mismatch** - `sortDestinationInstancesByBuckets()` used
   scan source buckets (`bucketIndexToScanNodeToTablets`) which are pooled to one
   instance per BE, causing ALL bucket data to be sent to just 1 instance per BE.
   But `computeBucketIdToInstanceId()` (receiver side) uses join bucket assignments
   (`getAssignedJoinBucketIndexes()`) which distribute buckets across instances.
   Fix: Use `getAssignedJoinBucketIndexes()` for `LocalShuffleBucketJoinAssignedJob`
   in non-serial mode, consistent with `computeBucketIdToInstanceId()`.

2. **BE: Padding instance hang** - When num_instances > bucket count on a BE,
   extra "padding" instances create VDataStreamRecvrs that never receive EOS.
   Fix: Identify padding instances (not in `bucket_seq_to_instance_idx` values)
   and create their recvrs with num_senders=0 so they complete immediately.

### Release note

None

### Check List (For Author)

- Test: Manual test with 3-BE cluster, all combinations of serial/non-serial,
  share_hash_table on/off, parallel_pipeline_task_num 0-8
- Behavior changed: No
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@924060929 924060929 force-pushed the opt_bucket_shuffle_exchange branch from baeaebc to d2aa5c4 Compare April 3, 2026 09:06
@924060929
Copy link
Copy Markdown
Contributor Author

run buildall

@doris-robot
Copy link
Copy Markdown

BE UT Coverage Report

Increment line coverage 26.32% (15/57) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 52.97% (20072/37894)
Line Coverage 36.54% (188498/515853)
Region Coverage 32.82% (146489/446342)
Branch Coverage 33.94% (64082/188814)

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 42.11% (8/19) 🎉
Increment coverage report
Complete coverage report

@924060929
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 finding

  • fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/BucketShuffleDestinationTest.java: the new unit test is wired to the pre-change private API and does not actually execute the code path it is meant to validate.

Critical checkpoints

  • Goal of the task: Partially achieved. The FE/BE logic change for non-serial BUCKET_SHUFFLE exchanges looks internally consistent with ThriftPlansBuilder.computeBucketIdToInstanceId(), but the added FE test is broken, so the PR does not currently provide working proof for this fix.
  • Scope/minimality: Yes. The production changes are focused on exchange seriality and bucket-to-instance routing.
  • Concurrency: Reviewed the BE dependency/sender-count changes in exchange_source_operator and pipeline_fragment_context; I did not find a concrete lock-order or readiness regression in the inspected paths.
  • Lifecycle/static initialization: Reviewed the recvr reset/close path; no static-init issue found, and the recvr lifecycle still appears bounded.
  • Configuration: No new config items.
  • Compatibility: No incompatible FE/BE protocol or storage-format change identified in the reviewed paths.
  • Parallel code paths: Checked the FE destination ordering against ThriftPlansBuilder.computeBucketIdToInstanceId() and the BE bucket_seq_to_instance_idx handling; those paths are aligned after this change.
  • Special conditional checks: The new serial-exchange guard and padding-instance handling are documented well enough to follow.
  • Test coverage: Not sufficient yet. One newly added FE test is stale and will fail before exercising the target logic.
  • Observability: Sufficient for this change; logging adjustments are minor.
  • Transaction/persistence: Not applicable.
  • Data write/atomicity: Not applicable.
  • FE-BE variable passing: Checked the relevant bucket_seq_to_instance_idx flow; no concrete mismatch found.
  • Performance: The change addresses the reported parallelism regression; I did not find a new obvious hot-path issue beyond the broken test.
  • Other issues: None beyond the test failure below.

@924060929 924060929 force-pushed the opt_bucket_shuffle_exchange branch from d2aa5c4 to d4ec761 Compare April 3, 2026 11:11
924060929 added a commit to 924060929/incubator-doris that referenced this pull request Apr 3, 2026
…in pooling mode

### What problem does this PR solve?

Issue Number: close #xxx

Related PR: apache#62054

Problem Summary:

When ExchangeNode.isSerialOperator() is changed to not include hasSerialScanNode()
(PR apache#62054), non-serial BUCKET_SHUFFLE exchanges with pooled scans cause query hangs.

Two issues are fixed:

1. **FE: Sender destination mismatch** - `sortDestinationInstancesByBuckets()` used
   scan source buckets (`bucketIndexToScanNodeToTablets`) which are pooled to one
   instance per BE, causing ALL bucket data to be sent to just 1 instance per BE.
   But `computeBucketIdToInstanceId()` (receiver side) uses join bucket assignments
   (`getAssignedJoinBucketIndexes()`) which distribute buckets across instances.
   Fix: Use `getAssignedJoinBucketIndexes()` for `LocalShuffleBucketJoinAssignedJob`
   in non-serial mode, consistent with `computeBucketIdToInstanceId()`.

2. **BE: Padding instance hang** - When num_instances > bucket count on a BE,
   extra "padding" instances create VDataStreamRecvrs that never receive EOS.
   Fix: Identify padding instances (not in `bucket_seq_to_instance_idx` values)
   and create their recvrs with num_senders=0 so they complete immediately.

### Release note

None

### Check List (For Author)

- Test: Manual test with 3-BE cluster, all combinations of serial/non-serial,
  share_hash_table on/off, parallel_pipeline_task_num 0-8
- Behavior changed: No
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@924060929
Copy link
Copy Markdown
Contributor Author

/review

@924060929
Copy link
Copy Markdown
Contributor Author

run buildall

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 1 issue.

  1. High: DistributePlanner.filterInstancesWhichCanReceiveDataFromRemote() now drops the enableShareHashTableForBroadcastJoin && linkNode.isRightChildOfBroadcastHashJoin() branch, so Nereids no longer collapses broadcast-build receivers to one instance per BE. PhysicalPlanTranslator still marks those exchanges, and the legacy coordinator still relies on that contract to let sibling instances share a single build hash table. With this patch, ordinary broadcast joins can start broadcasting the build side to every local pipeline instance again, which regresses the shared-hash-table optimization outside the bucket-shuffle bug being fixed.

Critical checkpoint conclusions:

  • Goal of current task: Partially accomplished. The bucket-shuffle seriality/routing fix looks correct for the targeted path, and the added FE tests cover that path, but the broader receiver-selection change regresses shared broadcast-hash-table planning.
  • Small, clear, focused modification: Mostly focused, but filterInstancesWhichCanReceiveDataFromRemote() now changes broadcast-join behavior unrelated to the reported bucket-shuffle issue.
  • Concurrency: Applicable in the BE exchange changes. I did not find a new lock-order or atomic-order bug in the patched BE paths.
  • Lifecycle/static initialization: Applicable in the BE receiver lifecycle. The new zero-sender receiver lifecycle appears internally consistent; no static-init issue found.
  • Configuration items: No new configs added.
  • Compatibility: No FE/BE protocol or storage compatibility issue identified in the reviewed paths.
  • Functionally parallel code paths: Not fully handled. The legacy coordinator still preserves one-receiver-per-BE behavior for shared broadcast hash tables, but the Nereids planner path removed the equivalent branch.
  • Special conditional checks: The new serial-exchange comments are clear enough.
  • Test coverage: Insufficient for the full behavior change. Added FE tests cover bucket-shuffle seriality and bucket routing, but there is no test for enableShareHashTableForBroadcastJoin on a broadcast join, and no BE test for the new zero-sender receiver path.
  • Observability: Adequate. Logging changes are minor and do not block review.
  • Transaction/persistence: Not applicable.
  • Data writes/modifications: Not applicable.
  • New FE-BE variables passed across all paths: No missing new variable path found in the reviewed changes.
  • Performance: Regression found in the shared broadcast-hash-table path described above.
  • Other issues: None beyond the item above.

Overall opinion: needs follow-up before merge because the FE receiver-selection predicate now regresses an existing optimization on broadcast joins unrelated to bucket shuffle.

return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
} else if (enableShareHashTableForBroadcastJoin && linkNode.isRightChildOfBroadcastHashJoin()) {
// isSerialOperator(): UNPARTITIONED or use_serial_exchange (operator-level)
// useSerialSource(): fragment is in pooling mode (fragment-level guard)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filterInstancesWhichCanReceiveDataFromRemote() used to collapse destinations not only for local-shuffle serial exchanges, but also when enableShareHashTableForBroadcastJoin and linkNode.isRightChildOfBroadcastHashJoin() were true. That second branch is still part of the execution contract: PhysicalPlanTranslator still marks the right side of broadcast joins (fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:1705-1710), and the legacy coordinator still sends only one destination per BE while the sibling instances share that hash table (fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:1451-1470, 1594-1614). After this change, Nereids will add every local instance as a destination for those exchanges, so the build side gets broadcast to every pipeline instance again. That reintroduces the per-instance network/build cost the shared-hash-table optimization was avoiding for ordinary broadcast joins, even though this PR is only trying to relax bucket-shuffle seriality. Please preserve the broadcast-join dedupe branch here (or fold it into the new predicate) alongside the bucket-shuffle fix.

…in pooling mode

### What problem does this PR solve?

Issue Number: close #xxx

Related PR: apache#62054

Problem Summary:

When ExchangeNode.isSerialOperator() is changed to not include hasSerialScanNode()
(PR apache#62054), non-serial BUCKET_SHUFFLE exchanges with pooled scans cause query hangs.

Two issues are fixed:

1. **FE: Sender destination mismatch** - `sortDestinationInstancesByBuckets()` used
   scan source buckets (`bucketIndexToScanNodeToTablets`) which are pooled to one
   instance per BE, causing ALL bucket data to be sent to just 1 instance per BE.
   But `computeBucketIdToInstanceId()` (receiver side) uses join bucket assignments
   (`getAssignedJoinBucketIndexes()`) which distribute buckets across instances.
   Fix: Use `getAssignedJoinBucketIndexes()` for `LocalShuffleBucketJoinAssignedJob`
   in non-serial mode, consistent with `computeBucketIdToInstanceId()`.

2. **BE: Padding instance hang** - When num_instances > bucket count on a BE,
   extra "padding" instances create VDataStreamRecvrs that never receive EOS.
   Fix: Identify padding instances (not in `bucket_seq_to_instance_idx` values)
   and create their recvrs with num_senders=0 so they complete immediately.

### Release note

None

### Check List (For Author)

- Test: Manual test with 3-BE cluster, all combinations of serial/non-serial,
  share_hash_table on/off, parallel_pipeline_task_num 0-8
- Behavior changed: No
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@924060929 924060929 force-pushed the opt_bucket_shuffle_exchange branch from d4ec761 to aa0752f Compare April 3, 2026 11:34
@924060929
Copy link
Copy Markdown
Contributor Author

run buildall

@924060929
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 1 issue.

  1. be/src/exec/pipeline/pipeline_fragment_context.cpp: the new dest_instances set is built from bucket_seq_to_instance_idx, whose values are FE fragment-instance indexes (ThriftPlansBuilder.instanceToIndex()), but ExchangeLocalState later checks membership with info.task_idx, which is the BE pipeline task index. Those two index spaces are only equal while parallel_instances == 1. In pooled fragments the BE creates multiple tasks per fragment instance, so any task with task_idx >= fragment_instance_count is now misclassified as a padding instance, gets num_senders=0, and returns EOS immediately. That drops valid receivers for real fragment instances instead of just the extra local-shuffle tasks. Please key this logic by fragment instance index / sender id rather than pipeline task index, or derive the destination set in BE terms before checking it.

Critical checkpoint conclusions:

  • Goal / correctness: The PR clearly aims to stop BUCKET_SHUFFLE from being forced serial in pooled fragments and to avoid hangs from padding receivers. The FE routing fix is consistent with computeBucketIdToInstanceId(), but the BE destination-instance check is keyed to the wrong index space, so the end-to-end fix is not yet correct. Existing tests cover FE seriality and bucket routing, but there is no BE test for the new padding-instance path.
  • Change size / focus: The change is reasonably focused on exchange seriality and receiver routing.
  • Concurrency: The touched BE paths are mostly existing dependency/receiver flows; I did not find a new lock-order issue in the changed code. The main problem is incorrect task classification, not locking.
  • Lifecycle / initialization: No new special lifecycle or static-init issue identified.
  • Config / compatibility: No new config or storage compatibility issue identified.
  • Parallel paths: The review checked FE destination building and BE receiver creation together; that cross-path consistency is exactly where the bug appears.
  • Conditional checks: The new conditional around destination instances lacks the needed invariant that FE instance indexes must match BE task indexes; today that invariant does not hold in pooled execution.
  • Test coverage: FE unit tests were added, but BE coverage is missing for the new num_senders=0 path and for pooled multi-task fragments.
  • Observability: Logging changes are acceptable; no extra observability seems required for this small fix.
  • Transaction / persistence / data writes: Not applicable.
  • FE-BE variable passing: The existing FE->BE variables are used consistently, but interpreted inconsistently on BE in the new code.
  • Performance: The intended performance improvement is valid, but the index mismatch can suppress legitimate receivers and break execution.
  • Other issues: None beyond the incorrect index-space assumption above.

tnode.exchange_node.partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED &&
!(tnode.__isset.is_serial_operator && tnode.is_serial_operator)) {
std::unordered_set<int> dest_instances;
for (const auto& [bucket, idx] : _params.bucket_seq_to_instance_idx) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bucket_seq_to_instance_idx stores FE fragment-instance indexes (ThriftPlansBuilder.instanceToIndex()), but later ExchangeLocalState checks this set with info.task_idx, which is the BE pipeline task index. In pooled fragments those are different once parallel_instances > 1, so real receiver tasks can be treated as padding tasks and created with num_senders=0. This makes them return EOS immediately instead of receiving remote data. The destination set here needs to be expressed in the same index space that is_destination_instance() will use, or the later check needs to compare against fragment-instance identity rather than task index.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This concern does not apply here. Let me explain:

  1. bucket_seq_to_instance_idx values are 0-based local instance indices per worker (from ThriftPlansBuilder.instanceToIndex() at line 421-422).

  2. task_idx equals instance_idx — the loop index into _params.local_params passed to PipelineTask constructor at line 483.

  3. In our specific case (non-serial BUCKET_SHUFFLE exchange), the exchange source operator's is_serial_operator() returns false, so add_operator(op, _parallel_instances) at line 1362 does NOT call set_num_tasks(parallelism) (see pipeline.cpp:77). The pipeline keeps num_tasks = _num_instances.

  4. Since num_tasks > 1, tasks are created for ALL instance_idx from 0 to _num_instances-1 (line 435: pipeline->num_tasks() > 1 || instance_idx == 0).

  5. Therefore task_idx == instance_idx for every task, which is the same index space as bucket_seq_to_instance_idx values.

The parallel_instances field is set to 1 for pooled fragments (ignoreDataDistribution() at line 503), but it only takes effect when is_serial_operator() returns true (pipeline.cpp:77). For our non-serial exchange, parallel_instances has no effect on task creation.

@doris-robot
Copy link
Copy Markdown

TPC-H: Total hot run time: 28693 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit aa0752f2017381ee58f0b66d68c8f841acb19208, data reload: false

------ Round 1 ----------------------------------
orders	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	4570	3683	3628	3628
q2	q3	958	858	612	612
q4	887	456	363	363
q5	1781	1345	1148	1148
q6	187	168	135	135
q7	942	936	762	762
q8	1148	1162	1259	1162
q9	5474	5493	5191	5191
q10	2192	1996	1790	1790
q11	422	300	264	264
q12	354	424	285	285
q13	2502	2812	2178	2178
q14	281	276	260	260
q15	q16	836	850	786	786
q17	1064	1181	777	777
q18	6416	5745	5546	5546
q19	1012	1233	1122	1122
q20	534	407	286	286
q21	3122	2155	2072	2072
q22	447	393	326	326
Total cold run time: 35129 ms
Total hot run time: 28693 ms

----- Round 2, with runtime_filter_mode=off -----
orders	Doris	NULL	NULL	150000000	42	6422171781	NULL	22778155	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	3931	3917	3900	3900
q2	q3	4611	4701	4093	4093
q4	1933	2048	1341	1341
q5	4879	4888	5131	4888
q6	192	166	130	130
q7	1962	1786	1573	1573
q8	3257	3003	3016	3003
q9	8022	7813	7862	7813
q10	4414	4421	4223	4223
q11	574	405	373	373
q12	658	698	481	481
q13	2450	2850	2153	2153
q14	286	291	259	259
q15	q16	755	765	696	696
q17	1230	1196	1144	1144
q18	7850	6927	6947	6927
q19	1092	1091	1065	1065
q20	2182	2178	1905	1905
q21	5854	5186	4640	4640
q22	531	480	417	417
Total cold run time: 56663 ms
Total hot run time: 51024 ms

@doris-robot
Copy link
Copy Markdown

TPC-DS: Total hot run time: 177299 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit aa0752f2017381ee58f0b66d68c8f841acb19208, data reload: false

query5	5125	669	507	507
query6	352	224	204	204
query7	4275	623	337	337
query8	334	243	220	220
query9	8738	3925	3929	3925
query10	526	369	306	306
query11	7515	5509	5143	5143
query12	212	140	143	140
query13	1290	633	458	458
query14	8090	5186	4743	4743
query14_1	4132	4142	4172	4142
query15	270	208	196	196
query16	1030	470	444	444
query17	1357	782	671	671
query18	2798	506	387	387
query19	362	211	169	169
query20	148	132	131	131
query21	231	158	123	123
query22	13655	13654	13509	13509
query23	17836	16886	16443	16443
query23_1	16519	16604	16566	16566
query24	7053	1769	1381	1381
query24_1	1372	1377	1373	1373
query25	632	527	464	464
query26	824	315	196	196
query27	2564	672	429	429
query28	4457	1965	1905	1905
query29	1004	708	596	596
query30	312	243	209	209
query31	1114	1049	944	944
query32	93	88	76	76
query33	566	376	300	300
query34	1153	1170	682	682
query35	795	774	666	666
query36	1264	1237	1101	1101
query37	153	104	86	86
query38	3090	3032	2963	2963
query39	926	901	847	847
query39_1	832	831	845	831
query40	233	148	138	138
query41	66	62	61	61
query42	112	106	113	106
query43	316	328	281	281
query44	
query45	208	193	188	188
query46	1125	1249	798	798
query47	2350	2374	2213	2213
query48	410	418	306	306
query49	628	538	444	444
query50	762	297	217	217
query51	4327	4201	4266	4201
query52	108	109	96	96
query53	261	272	213	213
query54	319	276	264	264
query55	103	95	91	91
query56	299	323	316	316
query57	1676	1708	1662	1662
query58	309	279	283	279
query59	2917	2972	2707	2707
query60	340	342	324	324
query61	157	152	149	149
query62	681	623	560	560
query63	245	201	197	197
query64	5237	1319	954	954
query65	
query66	1397	479	365	365
query67	24314	24314	24117	24117
query68	
query69	465	350	311	311
query70	1050	1039	984	984
query71	328	281	274	274
query72	3006	2912	2459	2459
query73	798	853	413	413
query74	9844	9686	9560	9560
query75	2743	2613	2333	2333
query76	2313	1136	785	785
query77	402	417	332	332
query78	11290	11382	10806	10806
query79	1536	1061	796	796
query80	693	590	497	497
query81	447	282	242	242
query82	1341	158	122	122
query83	379	300	262	262
query84	270	151	130	130
query85	925	500	439	439
query86	378	354	326	326
query87	3283	3192	3071	3071
query88	3606	2738	2711	2711
query89	460	404	362	362
query90	1891	183	179	179
query91	179	177	146	146
query92	78	74	74	74
query93	962	981	558	558
query94	544	334	295	295
query95	671	457	335	335
query96	1011	786	360	360
query97	2686	2639	2534	2534
query98	240	228	225	225
query99	1091	1078	946	946
Total cold run time: 259990 ms
Total hot run time: 177299 ms

@hello-stephen
Copy link
Copy Markdown
Contributor

BE UT Coverage Report

Increment line coverage 26.32% (15/57) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 52.97% (20072/37894)
Line Coverage 36.54% (188500/515857)
Region Coverage 32.81% (146462/446342)
Branch Coverage 33.95% (64103/188814)

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 55.56% (10/18) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 92.98% (53/57) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 71.90% (26683/37110)
Line Coverage 54.83% (281969/514276)
Region Coverage 51.88% (233711/450476)
Branch Coverage 53.35% (101044/189396)

morrySnow
morrySnow previously approved these changes Apr 7, 2026
@github-actions github-actions bot added the approved Indicates a PR has been approved by one committer. label Apr 7, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 7, 2026

PR approved by at least one committer and no changes requested.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 7, 2026

PR approved by anyone and no changes requested.

HappenLee

This comment was marked as spam.

return;
} else {
_finished_channels.emplace(channel_id);
if (_working_channels_count.fetch_sub(1) == 1) {
Copy link
Copy Markdown
Contributor

@HappenLee HappenLee Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change the code?no need?

Copy link
Copy Markdown
Contributor Author

@924060929 924060929 Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not remove, just make code more clearer, the remaining is _working_channels_count.fetch_sub(1) - 1, and remaining == 0 looks more meaningful

auto remaining = _working_channels_count.fetch_sub(1) - 1;
if (remaining == 0)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old code seems more clearer. the new code look wired

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This BE change has been reverted in the latest push. The fix is now FE-only.

### What problem does this PR solve?

Issue Number: close #xxx

Related PR: apache#62054

Problem Summary: Restore the BE-side exchange source and pipeline fragment logic to the state before the padding-instance fix so the branch can be validated with FE-only changes.

### Release note

None

### Check List (For Author)

- Test: No need to test (validation will be done in CI/manual runs after this temporary revert)
- Behavior changed: Yes (temporarily removes the BE-side padding-instance handling for validation)
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@924060929 924060929 dismissed stale reviews from HappenLee and morrySnow via d93e51f April 8, 2026 03:19
@924060929
Copy link
Copy Markdown
Contributor Author

run buildall

@github-actions github-actions bot removed the approved Indicates a PR has been approved by one committer. label Apr 8, 2026
@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 55.56% (10/18) 🎉
Increment coverage report
Complete coverage report

…change

### What problem does this PR solve?

Problem Summary: When a bucket scan fragment has a non-serial BUCKET_SHUFFLE exchange
(after removing hasSerialScanNode() from ExchangeNode.isSerialOperator()), the BE does
not insert local exchange because the data distribution matches. However, the FE
degreeOfParallelism() in UnassignedScanBucketOlapTableJob uses getTotalTabletsNum()
(total across ALL BEs) to inflate the instance count. This creates "padding" instances
with no bucket assignments that create VDataStreamRecvrs expecting senders that will
never arrive, causing query hangs.

Fix: When the fragment has a non-serial exchange, cap instanceNum at maxParallel
(per-BE bucket count) to avoid creating padding instances.

### Release note

None

### Check List (For Author)

- Test: Manual test on 3-BE cluster, all pptn/serial/shareHT combinations pass
- Behavior changed: No
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@924060929
Copy link
Copy Markdown
Contributor Author

run buildall

@924060929
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 1 issue.

Critical checkpoint conclusions:

  • Goal of the task: Partially accomplished. The PR fixes the BUCKET_SHUFFLE seriality/destination problem, but the FE-only fallback in UnassignedScanBucketOlapTableJob.degreeOfParallelism() is broader than the bug it describes and can reduce valid fragment parallelism. The added tests do not cover this path.
  • Modification size/focus: Mostly small and focused, except for the new fragment-wide hasNonSerialExchangeInFragment() cap, which changes behavior for more than the BUCKET_SHUFFLE case.
  • Concurrency: No new FE locking/concurrency issues found in the changed code.
  • Lifecycle/static initialization: No special lifecycle or static-init concerns found.
  • Configuration: No new configs.
  • Compatibility: No incompatible FE/BE protocol or storage-format change in this version of the PR.
  • Functionally parallel paths: Not fully handled. Plain HASH_PARTITIONED exchanges share the same non-serial predicate here, but they do not have the same sender/receiver padding problem as BUCKET_SHUFFLE.
  • Special conditional checks: The new hasNonSerialExchangeInFragment() guard is not narrow enough for the stated failure mode.
  • Test coverage: New unit tests cover exchange seriality and bucket destination ordering, but there is no regression test for the new degreeOfParallelism() cap or for pooled bucket-scan fragments with non-BUCKET exchanges.
  • Observability: No additional observability needed for this planner-only change.
  • Transaction/persistence/data writes: Not applicable.
  • FE/BE variable passing: No new variables added.
  • Performance: Regression risk exists because the new cap can unnecessarily cut instance count to per-BE bucket count.
  • Other issues: None beyond the regression above.

Summary opinion: not ready as-is because the new FE-only cap conflates all non-serial exchanges with the specific BUCKET_SHUFFLE padding-instance hang.

// for the exchange pipeline (distribution matches BUCKET_HASH_SHUFFLE). Instances
// beyond per-BE bucket count are "padding" with no bucket assignment — they create
// VDataStreamRecvrs that never receive data and hang. Cap at per-BE bucket count.
if (useLocalShuffleToAddParallel && hasNonSerialExchangeInFragment()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guard looks too broad for the hang described in the PR. hasNonSerialExchangeInFragment() is also true for ordinary HASH_PARTITIONED exchanges, but those plans still include every receiver instance in the sender destination list, so the extra local-shuffle instances are not the same kind of untargeted "padding" instances as in BUCKET_SHUFFLE. On the BE side Pipeline::need_to_local_exchange() treats BUCKET_HASH_SHUFFLE and HASH_SHUFFLE as compatible hash distributions (be/src/exec/pipeline/pipeline.cpp:57-72), so a pooled bucket-scan fragment can still legitimately use more instances than its local bucket count when the downstream operator only requires hash shuffle. Capping to maxParallel here would silently reduce that parallelism.

Can this be narrowed to the actual failing case (BUCKET_SHFFULE_HASH_PARTITIONED / bucket-index destinations), with a regression test that proves a pooled bucket-scan fragment with a non-serial HASH_PARTITIONED exchange still keeps its higher instance count?

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 51.72% (15/29) 🎉
Increment coverage report
Complete coverage report

@924060929 924060929 closed this Apr 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants