From 73eb57deabb44b9634c4454c733d83807814d19e Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 26 Jan 2024 07:53:16 +0800 Subject: [PATCH 1/2] [pipelineX](localexchange) Adjust local exchange plan rule --- be/src/pipeline/pipeline.h | 8 ++------ .../org/apache/doris/planner/ScanNode.java | 5 +++-- .../java/org/apache/doris/qe/Coordinator.java | 18 ++++++++++-------- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 148191f4e2dd0c..d78837dab3b499 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -132,14 +132,10 @@ class Pipeline : public std::enable_shared_from_this { return true; } return _data_distribution.distribution_type != - target_data_distribution.distribution_type && - !(is_hash_exchange(_data_distribution.distribution_type) && - is_hash_exchange(target_data_distribution.distribution_type)); + target_data_distribution.distribution_type; } else { return _data_distribution.distribution_type != - target_data_distribution.distribution_type && - !(is_hash_exchange(_data_distribution.distribution_type) && - is_hash_exchange(target_data_distribution.distribution_type)); + target_data_distribution.distribution_type; } } void init_data_distribution() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 9881fe6f6e3449..3633f2d1fd0251 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -720,12 +720,13 @@ public boolean shouldDisableSharedScan(ConnectContext context) { || getShouldColoScan(); } - public boolean ignoreStorageDataDistribution(ConnectContext context) { + public boolean ignoreStorageDataDistribution(ConnectContext context, int numBackends) { return context != null && context.getSessionVariable().isIgnoreStorageDataDistribution() && context.getSessionVariable().getEnablePipelineXEngine() && !fragment.isHasNullAwareLeftAntiJoin() - && getScanRangeNum() < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); + && getScanRangeNum() + < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numBackends; } public int getScanRangeNum() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 244c08c370965d..19f83a253dbbe1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2039,15 +2039,16 @@ private void computeFragmentHosts() throws Exception { /** * Ignore storage data distribution iff: - * 1. Current fragment is not forced to use data distribution. - * 2. `parallelExecInstanceNum` is larger than scan ranges. - * 3. Use Nereids planner. + * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. + * 2. Use Nereids planner. */ boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); boolean ignoreStorageDataDistribution = scanNodes.stream() - .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context)) && useNereids; + .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context, + fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId()) + .scanRangeAssignment.size())) && useNereids; if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) || ignoreStorageDataDistribution)) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); @@ -2973,12 +2974,13 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc /** * Ignore storage data distribution iff: - * 1. Current fragment is not forced to use data distribution. - * 2. `parallelExecInstanceNum` is larger than scan ranges. - * 3. Use Nereids planner. + * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. + * 2. Use Nereids planner. */ boolean ignoreStorageDataDistribution = scanNodes.stream() - .allMatch(node -> node.ignoreStorageDataDistribution(context)) + .allMatch(node -> node.ignoreStorageDataDistribution(context, + fragmentExecParamsMap.get(node.getFragment().getFragmentId()) + .scanRangeAssignment.size())) && addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> { return addressScanRange.getValue().size() < parallelExecInstanceNum; }) && useNereids; From dc0d8a350a3a689a98501569c056e210c2d33ba9 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 26 Jan 2024 07:58:48 +0800 Subject: [PATCH 2/2] update --- be/src/pipeline/pipeline.h | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index d78837dab3b499..148191f4e2dd0c 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -132,10 +132,14 @@ class Pipeline : public std::enable_shared_from_this { return true; } return _data_distribution.distribution_type != - target_data_distribution.distribution_type; + target_data_distribution.distribution_type && + !(is_hash_exchange(_data_distribution.distribution_type) && + is_hash_exchange(target_data_distribution.distribution_type)); } else { return _data_distribution.distribution_type != - target_data_distribution.distribution_type; + target_data_distribution.distribution_type && + !(is_hash_exchange(_data_distribution.distribution_type) && + is_hash_exchange(target_data_distribution.distribution_type)); } } void init_data_distribution() {