Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,11 @@ public class Config extends ConfigBase {
+ " greater than 0, otherwise it defaults to 3." })
public static int job_dictionary_task_consumer_thread_num = 3;

@ConfField(masterOnly = true, description = {"用于执行 Streaming 任务的线程数,值应该大于0,否则默认为10",
"The number of threads used to execute Streaming Tasks, "
+ "the value should be greater than 0, if it is <=0, default is 10."})
public static int job_streaming_task_exec_thread_num = 10;

@ConfField(masterOnly = true, description = {"最大的 Streaming 作业数量,值应该大于0,否则默认为1024",
"The maximum number of Streaming jobs, "
+ "the value should be greater than 0, if it is <=0, default is 1024."})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
@Log4j2
public class StreamingTaskScheduler extends MasterDaemon {
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
Config.job_streaming_task_exec_thread_num,
Config.job_streaming_task_exec_thread_num,
0,
Config.max_streaming_job_num,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(Config.max_streaming_job_num),
new CustomThreadFactory("streaming-task-execute"),
Expand Down Expand Up @@ -120,9 +120,11 @@ private void scheduleOneTask(StreamingInsertTask task) {
log.info("prepare to schedule task, task id: {}, job id: {}", task.getTaskId(), task.getJobId());
job.setLastScheduleTaskTimestamp(System.currentTimeMillis());
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task);

long start = System.currentTimeMillis();
try {
task.execute();
log.info("Finished executing task, task id: {}, job id: {}, cost {}ms",
task.getTaskId(), task.getJobId(), System.currentTimeMillis() - start);
} catch (Exception e) {
log.error("Failed to execute task, task id: {}, job id: {}", task.getTaskId(), task.getJobId(), e);
}
Expand Down
Loading