diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 5c242bd25fade8..d1cfb0aa113cdd 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -585,6 +585,11 @@ public class Config extends ConfigBase { "Whether to enable memtable on sink node by default in stream load"}) public static boolean stream_load_default_memtable_on_sink_node = false; + @ConfField(mutable = true, masterOnly = true, description = { + "Whether to enable forwarding group commit stream load to follower nodes." + + " If true, stream load with group commit mode will be forwarded to a follower FE round robin."}) + public static boolean enable_forward_group_commit_stream_load = false; + @ConfField(mutable = true, masterOnly = true, description = {"Maximum timeout for load jobs, in seconds."}) public static int max_load_timeout_second = 259200; // 3days @@ -3331,7 +3336,7 @@ public static int metaServiceRpcRetryTimes() { "Whether to enable group commit streamload BE forward feature in cloud mode. " + "Solves the issue where LB random forwarding breaks group commit batching " + "by implementing BE-level forwarding to ensure same-table requests reach the same BE node."}) - public static boolean enable_group_commit_streamload_be_forward = false; + public static boolean enable_group_commit_streamload_be_forward = true; @ConfField(description = {"When creating a table in cloud mode, check if recycler keys remain. Default is true."}) public static boolean check_create_table_recycle_key_remained = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 308b1e1e51ed8c..7437fdcdb5b13a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -55,6 +55,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.AuthenticationException; import org.apache.doris.common.CaseSensibility; +import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; @@ -85,6 +86,7 @@ import org.apache.doris.datasource.SplitSource; import org.apache.doris.datasource.maxcompute.MCTransaction; import org.apache.doris.encryption.EncryptionKey; +import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.info.TableRefInfo; import org.apache.doris.insertoverwrite.InsertOverwriteManager; import org.apache.doris.insertoverwrite.InsertOverwriteUtil; @@ -107,6 +109,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; import org.apache.doris.nereids.trees.plans.commands.info.ReplacePartitionOp; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectContext.ConnectType; @@ -135,6 +138,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; import org.apache.doris.tablefunction.MetadataGenerator; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TAbortRemoteTxnRequest; @@ -363,6 +367,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { private static final Logger LOG = LogManager.getLogger(FrontendServiceImpl.class); private static final String NOT_MASTER_ERR_MSG = "FE is not master"; + private static final AtomicInteger GROUP_COMMIT_FOLLOWER_INDEX = new AtomicInteger(0); private MasterImpl masterImpl; private ExecuteEnv exeEnv; @@ -2779,6 +2784,70 @@ private void recordFinishedLoadJobRequestImpl(String label, long txnId, String d EtlJobType.INSERT, createTime, failMsg, trackingUrl, firstErrorMsg, userIdentity, -1); } + private static int nextGroupCommitFollowerIndex(int followerCount) { + return Math.floorMod(GROUP_COMMIT_FOLLOWER_INDEX.getAndIncrement(), followerCount); + } + + private TStreamLoadPutResult forwardGroupCommitStreamLoad(TStreamLoadPutRequest request) { + HostInfo selfNode = Env.getCurrentEnv().getSelfNode(); + List followers = Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER).stream() + .filter(fe -> fe.isAlive() && !(fe.getHost().equals(selfNode.getHost()) + && fe.getEditLogPort() == selfNode.getPort())).collect( + Collectors.toList()); + if (CollectionUtils.isEmpty(followers)) { + return null; + } + + // check table enable light_schema_change and group commit does not block for schema change + TStreamLoadPutResult result = new TStreamLoadPutResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + try { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(request.getDb()); + OlapTable table = (OlapTable) db.getTableOrDdlException(request.getTbl()); + if (!table.getTableProperty().getUseSchemaLightChange()) { + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs( + "table light_schema_change is false, can't do stream load with group commit mode"); + return result; + } + if (Env.getCurrentEnv().getGroupCommitManager().isBlock(table.getId())) { + String msg = "insert table " + table.getId() + GroupCommitPlanner.SCHEMA_CHANGE; + LOG.info(msg); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(msg); + return result; + } + } catch (Exception e) { + LOG.warn("failed to pre-check group commit stream load, fallback to local. db={}, tbl={}", + request.getDb(), request.getTbl(), e); + return null; + } + + int idx = nextGroupCommitFollowerIndex(followers.size()); + Frontend follower = followers.get(idx); + TNetworkAddress address = new TNetworkAddress(follower.getHost(), follower.getRpcPort()); + LOG.info("forward group commit stream load put to follower {}, db={}, tbl={}, groupCommitMode={}", + address, request.getDb(), request.getTbl(), request.getGroupCommitMode()); + FrontendService.Client client = null; + boolean ok = false; + try { + client = ClientPool.frontendPool.borrowObject(address); + TStreamLoadPutResult streamLoadPutResult = client.streamLoadPut(request); + ok = true; + return streamLoadPutResult; + } catch (Exception e) { + LOG.warn("failed to forward stream load put to follower: {}, fallback to local", address, e); + } finally { + if (ok) { + ClientPool.frontendPool.returnObject(address, client); + } else { + ClientPool.frontendPool.invalidateObject(address, client); + } + } + return null; + } + @Override public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) { String clientAddr = getClientAddrAsString(); @@ -2786,6 +2855,15 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) { LOG.debug("receive stream load put request: {}, backend: {}", request, clientAddr); } + String groupCommitMode = request.getGroupCommitMode(); + if (groupCommitMode != null && !groupCommitMode.equals("off_mode") && Env.getCurrentEnv().isMaster() + && Config.enable_forward_group_commit_stream_load) { + TStreamLoadPutResult result = forwardGroupCommitStreamLoad(request); + if (result != null) { + return result; + } + } + TStreamLoadPutResult result = new TStreamLoadPutResult(); TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_multi_follower.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_multi_follower.groovy new file mode 100644 index 00000000000000..f0721a5f689eb4 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_multi_follower.groovy @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite('test_group_commit_stream_load_multi_follower', 'docker') { + def databaseName = context.config.getDbNameByFile(context.file) + def tableName = "tbl" + + def groupCommitStreamLoad = { fe -> + def feIp = fe.getHttpAddress()[0] + def fePort = fe.getHttpAddress()[1] + def command = """ curl -sS --location-trusted -u root: + -H group_commit:async_mode + -H column_separator:, + -H columns:id,name + -T ${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv + http://${feIp}:${fePort}/api/${databaseName}/${tableName}/_stream_load """ + log.info("group commit command: ${command}") + + def process = command.execute() + def code = process.waitFor() + def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))) + def out = process.getText() + logger.info("load through fe {}:{} master={} code={}, out={}, err={}", + feIp, fePort, fe.isMaster, code, out, err) + assertEquals(code, 0) + + def json = parseJson(out) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(2, json.NumberTotalRows) + assertEquals(2, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + + def getRowCount = { expectedRowCount -> + def retry = 0 + while (retry < 30) { + sleep(1000) + def rowCount = sql "select count(*) from ${databaseName}.${tableName}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + retry++ + } + } + + def options = new ClusterOptions() + options.feNum = 3 + options.beNum = 1 + options.cloudMode = true + options.useFollowersMode = true + options.beConfigs.add('enable_java_support=false') + options.feConfigs.add('enable_forward_group_commit_stream_load=true') + options.feConfigs.add('cloud_cluster_check_interval_second=1') + options.feConfigs.add('heartbeat_interval_second=1') + docker(options) { + awaitUntil(60) { + def ret = sql_return_maparray """SHOW FRONTENDS""" + ret.size() == 3 + && ret.count { it.Role.contains("FOLLOWER") } == 3 + && ret.count { it.IsMaster == "true" } == 1 + && ret.count { it.Alive == "true" } == 3 + } + + def frontendRows = sql_return_maparray """SHOW FRONTENDS""" + logger.info("show frontends result {}", frontendRows) + frontendRows.each { row -> + assertTrue(row.Role.contains("FOLLOWER")) + assertEquals("true", row.Alive) + } + + sql """ CREATE DATABASE IF NOT EXISTS ${databaseName} """ + sql """ DROP TABLE IF EXISTS ${databaseName}.${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${databaseName}.${tableName} ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "200" + ); + """ + + def expectedRowCount = 0 + def frontends = cluster.getAllFrontends(true).sort { it.index } + for (int i = 0; i < 2; i++) { + frontends.each { fe -> + groupCommitStreamLoad(fe) + expectedRowCount += 2 + } + } + getRowCount(expectedRowCount) + + def rowCount = sql "select count(*) from ${databaseName}.${tableName}" + logger.info("rowCount: " + rowCount) + assertEquals(expectedRowCount, rowCount[0][0]) + } +}