From 7a7e066dff6e814d4dd5c3ceac81be06f7932329 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Thu, 22 Aug 2019 21:22:00 +0800 Subject: [PATCH 1/3] Add config max_concurrent_task_num_per_be This config is used to control the max concurrent task num per be. The cluster max concurrent task num = max_concurrent_task_num_per_be * number of be. --- .../administrator-guide/load-data/routine-load-manual.md | 6 ++++++ fe/src/main/java/org/apache/doris/common/Config.java | 7 +++++++ .../apache/doris/load/routineload/RoutineLoadManager.java | 7 +++---- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md index e791f3325efcaf..c422021498472a 100644 --- a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md @@ -257,4 +257,10 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或 BE 配置项。默认为 10,即 10MB/s。该参数为导入通用参数,不限于例行导入作业。该参数限制了导入数据写入磁盘的速度。对于 SSD 等高性能存储设备,可以适当增加这个限速。 +4. max\_concurrent\_task\_num\_per\_be + FE 配置项,默认为10,可以运行时修改。该参数限制了每个 BE 节点最多并发执行的子任务个数。建议维持默认值。如果设置过大,可能导致并发任务数过多,占用集群资源。 + + * 如果用户发现许多 Routine load job 都处在 NEED\_SCHEDULER 的状态,则说明整个集群的子任务并发数饱和了。 + + ```整个集群的子任务的并发数 = BE 的个数 * max_concurrent_task_num_per_be``` 这种情况下,首先确定是否所有 RUNNING 中的 job 可以被精简,比如暂停或停止掉已经无用的 job。其次,可以考虑修改这个参数,用于扩大整个集群的子任务并发数。 diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 1449d4b7e8bd00..39858e2a3eecf0 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -818,6 +818,13 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_routine_load_task_concurrent_num = 5; + /* + * the max concurrent task num per be + * The cluster max concurrent task num = max_concurrent_task_num_per_be * number of be + */ + @ConfField(mutable = true, masterOnly = true) + public static int max_concurrent_task_num_per_be = 10; + /* * The max number of files store in SmallFileMgr */ diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 3a8ee60b9ef7c4..7aba4310a755c0 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -57,7 +57,6 @@ public class RoutineLoadManager implements Writable { private static final Logger LOG = LogManager.getLogger(RoutineLoadManager.class); - private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 10; // Long is beId, integer is the size of tasks in be private Map beIdToMaxConcurrentTasks = Maps.newHashMap(); @@ -89,7 +88,7 @@ public RoutineLoadManager() { public void updateBeIdToMaxConcurrentTasks() { beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true) - .stream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM)); + .stream().collect(Collectors.toMap(beId -> beId, beId -> Config.max_concurrent_task_num_per_be)); } // this is not real-time number @@ -342,7 +341,7 @@ public long getMinTaskBeId(String clusterName) throws LoadException { if (beIdToConcurrentTasks.containsKey(beId)) { idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId); } else { - idleTaskNum = DEFAULT_BE_CONCURRENT_TASK_NUM; + idleTaskNum = Config.max_concurrent_task_num_per_be; } if (LOG.isDebugEnabled()) { LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", beId, idleTaskNum, @@ -379,7 +378,7 @@ public boolean checkBeToTask(long beId, String clusterName) throws LoadException if (beIdToConcurrentTasks.containsKey(beId)) { idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId); } else { - idleTaskNum = DEFAULT_BE_CONCURRENT_TASK_NUM; + idleTaskNum = Config.max_concurrent_task_num_per_be; } if (idleTaskNum > 0) { return true; From 01d5c5b762093fceed43b51db0408e9572a01038 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Fri, 23 Aug 2019 10:34:04 +0800 Subject: [PATCH 2/3] Fix ut --- .../apache/doris/load/routineload/RoutineLoadManagerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 33d6586ec80a7c..fa86535c758e2b 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -30,6 +30,7 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; @@ -331,7 +332,7 @@ public void testGetMinTaskBeIdWhileNoSlot(@Injectable RoutineLoadJob routineLoad }; RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - Deencapsulation.setField(RoutineLoadManager.class, "DEFAULT_BE_CONCURRENT_TASK_NUM", 0); + Config.max_concurrent_task_num_per_be = 0; Map routineLoadJobMap = Maps.newHashMap(); routineLoadJobMap.put(1l, routineLoadJob); Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", routineLoadJobMap); From e7aa913509ddc2dd38ba9ce89a914bd06aa1ee37 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Fri, 23 Aug 2019 11:05:14 +0800 Subject: [PATCH 3/3] Fix ut --- .../apache/doris/load/routineload/RoutineLoadManagerTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index fa86535c758e2b..0bd3a91aea4fa6 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -632,6 +632,7 @@ public void testCheckBeToTask(@Mocked Catalog catalog, }; RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + Config.max_concurrent_task_num_per_be = 10; Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks); Assert.assertEquals(true, routineLoadManager.checkBeToTask(1L, "default")); }