Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {
_is_local &= state->query_options().enable_local_exchange;
}
if (_is_local) {
auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr);
if (!st.ok()) {
// Recvr not found. Maybe downstream task is finished already.
LOG(INFO) << "Recvr is not found : " << st.to_string();
}
return Status::OK();
}
if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
Expand Down
8 changes: 5 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -800,14 +800,16 @@ private void sendPipelineCtx() throws TException, RpcException, UserException {
beToPipelineExecCtxs.clear();
// fragment:backend
List<Pair<PlanFragmentId, Long>> backendFragments = Lists.newArrayList();
// If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start,
// else use exec_plan_fragments directly.
// If #fragments > 1 and BE amount is bigger than 1, use twoPhaseExecution with exec_plan_fragments_prepare
// and exec_plan_fragments_start, else use exec_plan_fragments directly.
// we choose #fragments > 1 because in some cases
// we need ensure that A fragment is already prepared to receive data before B fragment sends data.
// For example: select * from numbers("number"="10") will generate ExchangeNode and
// TableValuedFunctionScanNode, we should ensure TableValuedFunctionScanNode does not
// send data until ExchangeNode is ready to receive.
boolean twoPhaseExecution = fragments.size() > 1;
boolean twoPhaseExecution = ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().isEnableSinglePhaseExecutionCommitOpt()
? fragments.size() > 1 && addressToBackendID.size() > 1 : fragments.size() > 1;
for (PlanFragment fragment : fragments) {
FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId());

Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_BUSHY_TREE = "enable_bushy_tree";

public static final String ENABLE_SINGLE_PHASE_EXECUTION_COMMIT_OPT = "enable_single_phase_execution_commit_opt";
public static final String MAX_JOIN_NUMBER_BUSHY_TREE = "max_join_number_bushy_tree";
public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn";

Expand Down Expand Up @@ -1400,6 +1401,10 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
@VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = true, varType = VariableAnnotation.DEPRECATED)
public boolean enableLocalExchange = true;

@VariableMgr.VarAttr(name = ENABLE_SINGLE_PHASE_EXECUTION_COMMIT_OPT, fuzzy = true,
varType = VariableAnnotation.DEPRECATED)
private boolean enableSinglePhaseExecutionCommitOpt = true;

/**
* For debug purpose, don't merge unique key and agg key when reading data.
*/
Expand Down Expand Up @@ -2198,6 +2203,7 @@ public void initFuzzyModeVariables() {
this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
this.enableSinglePhaseExecutionCommitOpt = random.nextBoolean();
// This will cause be dead loop, disable it first
// this.disableJoinReorder = random.nextBoolean();
this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
Expand Down Expand Up @@ -4382,4 +4388,8 @@ public TSerdeDialect getSerdeDialect() {
throw new IllegalArgumentException("Unknown serde dialect: " + serdeDialect);
}
}

public boolean isEnableSinglePhaseExecutionCommitOpt() {
return enableSinglePhaseExecutionCommitOpt && getEnableLocalExchange();
}
}