Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,10 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或

BE 配置项。默认为 10,即 10MB/s。该参数为导入通用参数,不限于例行导入作业。该参数限制了导入数据写入磁盘的速度。对于 SSD 等高性能存储设备,可以适当增加这个限速。

4. max\_concurrent\_task\_num\_per\_be

FE 配置项,默认为10,可以运行时修改。该参数限制了每个 BE 节点最多并发执行的子任务个数。建议维持默认值。如果设置过大,可能导致并发任务数过多,占用集群资源。

* 如果用户发现许多 Routine load job 都处在 NEED\_SCHEDULER 的状态,则说明整个集群的子任务并发数饱和了。

```整个集群的子任务的并发数 = BE 的个数 * max_concurrent_task_num_per_be``` 这种情况下,首先确定是否所有 RUNNING 中的 job 可以被精简,比如暂停或停止掉已经无用的 job。其次,可以考虑修改这个参数,用于扩大整个集群的子任务并发数。
7 changes: 7 additions & 0 deletions fe/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,13 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_routine_load_task_concurrent_num = 5;

/*
* the max concurrent task num per be
* The cluster max concurrent task num = max_concurrent_task_num_per_be * number of be
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_concurrent_task_num_per_be = 10;

/*
* The max number of files store in SmallFileMgr
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@

public class RoutineLoadManager implements Writable {
private static final Logger LOG = LogManager.getLogger(RoutineLoadManager.class);
private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 10;

// Long is beId, integer is the size of tasks in be
private Map<Long, Integer> beIdToMaxConcurrentTasks = Maps.newHashMap();
Expand Down Expand Up @@ -89,7 +88,7 @@ public RoutineLoadManager() {

public void updateBeIdToMaxConcurrentTasks() {
beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true)
.stream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM));
.stream().collect(Collectors.toMap(beId -> beId, beId -> Config.max_concurrent_task_num_per_be));
}

// this is not real-time number
Expand Down Expand Up @@ -342,7 +341,7 @@ public long getMinTaskBeId(String clusterName) throws LoadException {
if (beIdToConcurrentTasks.containsKey(beId)) {
idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
} else {
idleTaskNum = DEFAULT_BE_CONCURRENT_TASK_NUM;
idleTaskNum = Config.max_concurrent_task_num_per_be;
}
if (LOG.isDebugEnabled()) {
LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", beId, idleTaskNum,
Expand Down Expand Up @@ -379,7 +378,7 @@ public boolean checkBeToTask(long beId, String clusterName) throws LoadException
if (beIdToConcurrentTasks.containsKey(beId)) {
idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
} else {
idleTaskNum = DEFAULT_BE_CONCURRENT_TASK_NUM;
idleTaskNum = Config.max_concurrent_task_num_per_be;
}
if (idleTaskNum > 0) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
Expand Down Expand Up @@ -331,7 +332,7 @@ public void testGetMinTaskBeIdWhileNoSlot(@Injectable RoutineLoadJob routineLoad
};

RoutineLoadManager routineLoadManager = new RoutineLoadManager();
Deencapsulation.setField(RoutineLoadManager.class, "DEFAULT_BE_CONCURRENT_TASK_NUM", 0);
Config.max_concurrent_task_num_per_be = 0;
Map<Long, RoutineLoadJob> routineLoadJobMap = Maps.newHashMap();
routineLoadJobMap.put(1l, routineLoadJob);
Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", routineLoadJobMap);
Expand Down Expand Up @@ -631,6 +632,7 @@ public void testCheckBeToTask(@Mocked Catalog catalog,
};

RoutineLoadManager routineLoadManager = new RoutineLoadManager();
Config.max_concurrent_task_num_per_be = 10;
Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks);
Assert.assertEquals(true, routineLoadManager.checkBeToTask(1L, "default"));
}
Expand Down