diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 7f1080871b12f7..3b707240aab542 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1970,6 +1970,11 @@ public class Config extends ConfigBase { + " greater than 0, otherwise it defaults to 3." }) public static int job_dictionary_task_consumer_thread_num = 3; + @ConfField(masterOnly = true, description = {"用于执行 Streaming 任务的线程数,值应该大于0,否则默认为10", + "The number of threads used to execute Streaming Tasks, " + + "the value should be greater than 0, if it is <=0, default is 10."}) + public static int job_streaming_task_exec_thread_num = 10; + @ConfField(masterOnly = true, description = {"最大的 Streaming 作业数量,值应该大于0,否则默认为1024", "The maximum number of Streaming jobs, " + "the value should be greater than 0, if it is <=0, default is 1024."}) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java index 91b6f475bfc2b3..7e99ca3ada9db1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java @@ -41,9 +41,9 @@ @Log4j2 public class StreamingTaskScheduler extends MasterDaemon { private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( + Config.job_streaming_task_exec_thread_num, + Config.job_streaming_task_exec_thread_num, 0, - Config.max_streaming_job_num, - 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(Config.max_streaming_job_num), new CustomThreadFactory("streaming-task-execute"), @@ -120,9 +120,11 @@ private void scheduleOneTask(StreamingInsertTask task) { log.info("prepare to schedule task, task id: {}, job id: {}", task.getTaskId(), task.getJobId()); job.setLastScheduleTaskTimestamp(System.currentTimeMillis()); Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task); - + long start = System.currentTimeMillis(); try { task.execute(); + log.info("Finished executing task, task id: {}, job id: {}, cost {}ms", + task.getTaskId(), task.getJobId(), System.currentTimeMillis() - start); } catch (Exception e) { log.error("Failed to execute task, task id: {}, job id: {}", task.getTaskId(), task.getJobId(), e); }