From bace4a4be58d86f65e5d186e61f7bfe666c34271 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 18 Sep 2024 10:11:04 +0800 Subject: [PATCH 1/3] [Improvement](rpc) Use 1-phase commit if only 1 BE exists --- .../src/main/java/org/apache/doris/qe/Coordinator.java | 6 ++++-- .../main/java/org/apache/doris/qe/SessionVariable.java | 10 ++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 5e3a59d9a54d96..a71af26c571a81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -800,14 +800,16 @@ private void sendPipelineCtx() throws TException, RpcException, UserException { beToPipelineExecCtxs.clear(); // fragment:backend List> backendFragments = Lists.newArrayList(); - // If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, + // If #fragments > 1 and BE amount is bigger than 1, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, // else use exec_plan_fragments directly. // we choose #fragments > 1 because in some cases // we need ensure that A fragment is already prepared to receive data before B fragment sends data. // For example: select * from numbers("number"="10") will generate ExchangeNode and // TableValuedFunctionScanNode, we should ensure TableValuedFunctionScanNode does not // send data until ExchangeNode is ready to receive. - boolean twoPhaseExecution = fragments.size() > 1; + boolean twoPhaseExecution = ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().isEnableSinglePhaseExecutionCommitOpt() + ? fragments.size() > 1 && addressToBackendID.size() > 1 : fragments.size() > 1; for (PlanFragment fragment : fragments) { FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index a97a098bdc122b..29586c5953e378 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -238,6 +238,7 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_BUSHY_TREE = "enable_bushy_tree"; + public static final String ENABLE_SINGLE_PHASE_EXECUTION_COMMIT_OPT = "enable_single_phase_execution_commit_opt"; public static final String MAX_JOIN_NUMBER_BUSHY_TREE = "max_join_number_bushy_tree"; public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn"; @@ -1400,6 +1401,10 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = true, varType = VariableAnnotation.DEPRECATED) public boolean enableLocalExchange = true; + @VariableMgr.VarAttr(name = ENABLE_SINGLE_PHASE_EXECUTION_COMMIT_OPT, fuzzy = true, + varType = VariableAnnotation.DEPRECATED) + private boolean enableSinglePhaseExecutionCommitOpt = true; + /** * For debug purpose, don't merge unique key and agg key when reading data. */ @@ -2198,6 +2203,7 @@ public void initFuzzyModeVariables() { this.parallelPrepareThreshold = random.nextInt(32) + 1; this.enableCommonExprPushdown = random.nextBoolean(); this.enableLocalExchange = random.nextBoolean(); + this.enableSinglePhaseExecutionCommitOpt = random.nextBoolean(); // This will cause be dead loop, disable it first // this.disableJoinReorder = random.nextBoolean(); this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean(); @@ -4382,4 +4388,8 @@ public TSerdeDialect getSerdeDialect() { throw new IllegalArgumentException("Unknown serde dialect: " + serdeDialect); } } + + public boolean isEnableSinglePhaseExecutionCommitOpt() { + return enableSinglePhaseExecutionCommitOpt; + } } From 8b4d0872b44e7cbf7306335c7d5934c6af0b73dc Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 18 Sep 2024 10:48:58 +0800 Subject: [PATCH 2/3] update --- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a71af26c571a81..c3d1789afcba6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -800,8 +800,8 @@ private void sendPipelineCtx() throws TException, RpcException, UserException { beToPipelineExecCtxs.clear(); // fragment:backend List> backendFragments = Lists.newArrayList(); - // If #fragments > 1 and BE amount is bigger than 1, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, - // else use exec_plan_fragments directly. + // If #fragments > 1 and BE amount is bigger than 1, use twoPhaseExecution with exec_plan_fragments_prepare + // and exec_plan_fragments_start, else use exec_plan_fragments directly. // we choose #fragments > 1 because in some cases // we need ensure that A fragment is already prepared to receive data before B fragment sends data. // For example: select * from numbers("number"="10") will generate ExchangeNode and From e39c67a7402de7584b82c3e05eb3a4b4a706feac Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 18 Sep 2024 15:04:30 +0800 Subject: [PATCH 3/3] update --- be/src/vec/sink/vdata_stream_sender.cpp | 6 ------ .../src/main/java/org/apache/doris/qe/SessionVariable.java | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index f18467fbad9632..0f2b7f0f9c3eb3 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -66,12 +66,6 @@ Status Channel::init_stub(RuntimeState* state) { _is_local &= state->query_options().enable_local_exchange; } if (_is_local) { - auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr( - _fragment_instance_id, _dest_node_id, &_local_recvr); - if (!st.ok()) { - // Recvr not found. Maybe downstream task is finished already. - LOG(INFO) << "Recvr is not found : " << st.to_string(); - } return Status::OK(); } if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 29586c5953e378..9fd8022f0f0e28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -4390,6 +4390,6 @@ public TSerdeDialect getSerdeDialect() { } public boolean isEnableSinglePhaseExecutionCommitOpt() { - return enableSinglePhaseExecutionCommitOpt; + return enableSinglePhaseExecutionCommitOpt && getEnableLocalExchange(); } }