Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,11 @@ public class Config extends ConfigBase {
"Whether to enable memtable on sink node by default in stream load"})
public static boolean stream_load_default_memtable_on_sink_node = false;

@ConfField(mutable = true, masterOnly = true, description = {
"Whether to enable forwarding group commit stream load to follower nodes."
+ " If true, stream load with group commit mode will be forwarded to a follower FE round robin."})
public static boolean enable_forward_group_commit_stream_load = false;

@ConfField(mutable = true, masterOnly = true, description = {"Maximum timeout for load jobs, in seconds."})
public static int max_load_timeout_second = 259200; // 3days

Expand Down Expand Up @@ -3331,7 +3336,7 @@ public static int metaServiceRpcRetryTimes() {
"Whether to enable group commit streamload BE forward feature in cloud mode. "
+ "Solves the issue where LB random forwarding breaks group commit batching "
+ "by implementing BE-level forwarding to ensure same-table requests reach the same BE node."})
public static boolean enable_group_commit_streamload_be_forward = false;
public static boolean enable_group_commit_streamload_be_forward = true;

@ConfField(description = {"When creating a table in cloud mode, check if recycler keys remain. Default is true."})
public static boolean check_create_table_recycle_key_remained = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.apache.doris.datasource.SplitSource;
import org.apache.doris.datasource.maxcompute.MCTransaction;
import org.apache.doris.encryption.EncryptionKey;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.info.TableRefInfo;
import org.apache.doris.insertoverwrite.InsertOverwriteManager;
import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
Expand All @@ -107,6 +109,7 @@
import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ReplacePartitionOp;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectContext.ConnectType;
Expand Down Expand Up @@ -135,6 +138,7 @@
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import org.apache.doris.tablefunction.MetadataGenerator;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TAbortRemoteTxnRequest;
Expand Down Expand Up @@ -363,6 +367,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
private static final Logger LOG = LogManager.getLogger(FrontendServiceImpl.class);

private static final String NOT_MASTER_ERR_MSG = "FE is not master";
private static final AtomicInteger GROUP_COMMIT_FOLLOWER_INDEX = new AtomicInteger(0);

private MasterImpl masterImpl;
private ExecuteEnv exeEnv;
Expand Down Expand Up @@ -2779,13 +2784,86 @@ private void recordFinishedLoadJobRequestImpl(String label, long txnId, String d
EtlJobType.INSERT, createTime, failMsg, trackingUrl, firstErrorMsg, userIdentity, -1);
}

private static int nextGroupCommitFollowerIndex(int followerCount) {
return Math.floorMod(GROUP_COMMIT_FOLLOWER_INDEX.getAndIncrement(), followerCount);
}

private TStreamLoadPutResult forwardGroupCommitStreamLoad(TStreamLoadPutRequest request) {
HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
List<Frontend> followers = Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER).stream()
.filter(fe -> fe.isAlive() && !(fe.getHost().equals(selfNode.getHost())
&& fe.getEditLogPort() == selfNode.getPort())).collect(
Collectors.toList());
if (CollectionUtils.isEmpty(followers)) {
return null;
}

// check table enable light_schema_change and group commit does not block for schema change
TStreamLoadPutResult result = new TStreamLoadPutResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
try {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(request.getDb());
OlapTable table = (OlapTable) db.getTableOrDdlException(request.getTbl());
if (!table.getTableProperty().getUseSchemaLightChange()) {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(
"table light_schema_change is false, can't do stream load with group commit mode");
return result;
}
if (Env.getCurrentEnv().getGroupCommitManager().isBlock(table.getId())) {
String msg = "insert table " + table.getId() + GroupCommitPlanner.SCHEMA_CHANGE;
Comment thread
mymeiyi marked this conversation as resolved.
LOG.info(msg);
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(msg);
return result;
}
} catch (Exception e) {
LOG.warn("failed to pre-check group commit stream load, fallback to local. db={}, tbl={}",
request.getDb(), request.getTbl(), e);
return null;
}

int idx = nextGroupCommitFollowerIndex(followers.size());
Frontend follower = followers.get(idx);
TNetworkAddress address = new TNetworkAddress(follower.getHost(), follower.getRpcPort());
LOG.info("forward group commit stream load put to follower {}, db={}, tbl={}, groupCommitMode={}",
address, request.getDb(), request.getTbl(), request.getGroupCommitMode());
FrontendService.Client client = null;
boolean ok = false;
try {
client = ClientPool.frontendPool.borrowObject(address);
TStreamLoadPutResult streamLoadPutResult = client.streamLoadPut(request);
ok = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This synchronous forward creates a nested FE RPC cycle for every forwarded group-commit load: the master RPC worker blocks here waiting for the follower, and the follower's StreamLoadHandler.generatePlan() calls GroupCommitManager.selectBackendForGroupCommit(), whose non-master branch calls back to the master via MasterOpExecutor.getGroupCommitLoadBeId(). With enough concurrent group-commit stream loads, all master RPC workers can be occupied waiting on followers while the followers are waiting for free master RPC workers, causing the loads to stall until RPC timeout. The previous follower-direct path only used one follower-to-master RPC and did not hold a master worker while waiting for that callback. Please avoid the master->follower->master cycle, or make the forwarded request carry the already-selected group-commit backend/table routing decision so the follower does not need to call back to the master.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamLoadHandler.generatePlan() does not call GroupCommitManager.selectBackendForGroupCommit()

return streamLoadPutResult;
} catch (Exception e) {
LOG.warn("failed to forward stream load put to follower: {}, fallback to local", address, e);
} finally {
if (ok) {
ClientPool.frontendPool.returnObject(address, client);
} else {
ClientPool.frontendPool.invalidateObject(address, client);
}
}
return null;
}

@Override
public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive stream load put request: {}, backend: {}", request, clientAddr);
}

String groupCommitMode = request.getGroupCommitMode();
if (groupCommitMode != null && !groupCommitMode.equals("off_mode") && Env.getCurrentEnv().isMaster()
&& Config.enable_forward_group_commit_stream_load) {
TStreamLoadPutResult result = forwardGroupCommitStreamLoad(request);
if (result != null) {
return result;
}
}

TStreamLoadPutResult result = new TStreamLoadPutResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.doris.regression.suite.ClusterOptions
import org.codehaus.groovy.runtime.IOGroovyMethods

suite('test_group_commit_stream_load_multi_follower', 'docker') {
def databaseName = context.config.getDbNameByFile(context.file)
def tableName = "tbl"

def groupCommitStreamLoad = { fe ->
def feIp = fe.getHttpAddress()[0]
def fePort = fe.getHttpAddress()[1]
def command = """ curl -sS --location-trusted -u root:
-H group_commit:async_mode
-H column_separator:,
-H columns:id,name
-T ${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv
http://${feIp}:${fePort}/api/${databaseName}/${tableName}/_stream_load """
log.info("group commit command: ${command}")

def process = command.execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())))
def out = process.getText()
logger.info("load through fe {}:{} master={} code={}, out={}, err={}",
feIp, fePort, fe.isMaster, code, out, err)
assertEquals(code, 0)
Comment thread
mymeiyi marked this conversation as resolved.

def json = parseJson(out)
assertEquals("success", json.Status.toLowerCase())
assertTrue(json.GroupCommit)
assertTrue(json.Label.startsWith("group_commit_"))
assertEquals(2, json.NumberTotalRows)
assertEquals(2, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
}

def getRowCount = { expectedRowCount ->
def retry = 0
while (retry < 30) {
sleep(1000)
def rowCount = sql "select count(*) from ${databaseName}.${tableName}"
logger.info("rowCount: " + rowCount + ", retry: " + retry)
if (rowCount[0][0] >= expectedRowCount) {
break
}
retry++
}
}

def options = new ClusterOptions()
options.feNum = 3
options.beNum = 1
options.cloudMode = true
options.useFollowersMode = true
options.beConfigs.add('enable_java_support=false')
options.feConfigs.add('enable_forward_group_commit_stream_load=true')
options.feConfigs.add('cloud_cluster_check_interval_second=1')
options.feConfigs.add('heartbeat_interval_second=1')
docker(options) {
awaitUntil(60) {
def ret = sql_return_maparray """SHOW FRONTENDS"""
ret.size() == 3
&& ret.count { it.Role.contains("FOLLOWER") } == 3
&& ret.count { it.IsMaster == "true" } == 1
&& ret.count { it.Alive == "true" } == 3
}

def frontendRows = sql_return_maparray """SHOW FRONTENDS"""
logger.info("show frontends result {}", frontendRows)
frontendRows.each { row ->
assertTrue(row.Role.contains("FOLLOWER"))
assertEquals("true", row.Alive)
}

sql """ CREATE DATABASE IF NOT EXISTS ${databaseName} """
sql """ DROP TABLE IF EXISTS ${databaseName}.${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${databaseName}.${tableName} (
`id` int(11) NOT NULL,
`name` varchar(1100) NULL,
`score` int(11) NULL default "-1"
) ENGINE=OLAP
DUPLICATE KEY(`id`, `name`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"group_commit_interval_ms" = "200"
);
"""

def expectedRowCount = 0
def frontends = cluster.getAllFrontends(true).sort { it.index }
for (int i = 0; i < 2; i++) {
frontends.each { fe ->
groupCommitStreamLoad(fe)
expectedRowCount += 2
}
}
getRowCount(expectedRowCount)

def rowCount = sql "select count(*) from ${databaseName}.${tableName}"
logger.info("rowCount: " + rowCount)
assertEquals(expectedRowCount, rowCount[0][0])
Comment thread
mymeiyi marked this conversation as resolved.
}
}
Loading