-
Notifications
You must be signed in to change notification settings - Fork 5k
Description
Describe the question
Which version of DolphinScheduler:
1.3.3
Additional context
If an exception occurs in the taskExecuteThread, the task will be killed. However, the kill method only sets the status of taskInstance to kill state. But the MasterTaskExecThread will not handle the kill state and just wait.
codes as following:
org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread#run
`try{
...
}catch (Exception e) {
logger.error("task scheduler failure", e);
kill();// some exception will go to kill
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
} finally {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}`
org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread#kill
public void kill(){ if (task != null){ try { task.cancelApplication(true); }catch (Exception e){ logger.error(e.getMessage(),e); } } }
org.apache.dolphinscheduler.server.master.runner.MasterTaskExecThread#waitTaskQuit
`
public Boolean waitTaskQuit(){
// query new state
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
while (Stopper.isRunning()){
try {
if(this.processInstance == null){
logger.error("process instance not exists , master task exec thread exit");
return true;
}
// task instance add queue , waiting worker to kill
if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
cancelTaskInstance();// **taskInstance will change to kill state,but will still wait in the loop**
}
if(processInstance.getState() == ExecutionStatus.READY_PAUSE){
pauseTask();
}
// task instance finished
if (taskInstance.getState().typeIsFinished()){
// if task is final result , then remove taskInstance from cache
taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());
break;
}
if (checkTaskTimeout()) {
this.checkTimeoutFlag = !alertTimeout();
}
// updateProcessInstance task instance
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
logger.error("exception",e);
if (processInstance != null) {
logger.error("wait task quit failed, instance id:{}, task id:{}",
processInstance.getId(), taskInstance.getId());
}
}
}
return true;
}
`
Requirement or improvement
If the status of a taskInstance is set to kill, the corresponding MasterTaskExecThread thread should not be blocked and wait. Instead, the corresponding thread can be returned to the thread pool.