Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Master]serial_wait strategy workflow unable to wake up #15270

Merged
merged 4 commits into from
Jan 12, 2024

Conversation

Gallardot
Copy link
Member

@Gallardot Gallardot commented Dec 4, 2023

Purpose of the pull request

This is an analysis of a bug related to the serial wait strategy, which causes the workflow instance to remain in a waiting state indefinitely.

When a workflow's scheduled strategy is SERIAL_WAIT, if a workflow instance's status is WAITING, then this workflow instance will remain in a waiting state, even if the previous workflow instance has already completed execution.

There is a certain probability that this problem will occur.

The analysis of the cause is as follows: The MasterSchedulerBootstrap thread processes commands through the handleCommand method. Note that this handleCommand is within a transaction. In this transaction, the saveSerialProcess method is used to modify the status of the workflow instance. However, At the same time, in another thread pool of WorkflowExecuteRunnable, the checkSerialProcess method is used to check the status of the workflow instance in order to wake up the workflow instance in a waiting state.

Everything seems fine. But there is a specific situation. That is, a workflow instance is about to complete, and a workflow instance is being created. Problems will arise at this time. Because of the isolation of transactions, the saveSerialProcess in the handleCommand method may have just been executed, but it has not yet been committed. At this time, the checkSerialProcess method will not be able to check that the status of this workflow instance is WAITING, causing this workflow instance to remain in a waiting state and cannot be awakened.

My solution is to use a new transaction for updating the status of the workflow instance in the handleCommand transaction. This can avoid the above problem. I have been running this in my environment for two months, and the problem has not reoccurred

@Override
@Transactional
public @Nullable ProcessInstance handleCommand(String host,
Command command) throws CronParseException, CodeGenerateException {
ProcessInstance processInstance = constructProcessInstance(command, host);
// cannot construct process instance, return null
if (processInstance == null) {
log.error("scan command, command parameter is error: {}", command);
commandService.moveToErrorCommand(command, "process instance is null");
return null;
}
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
processInstance.setTestFlag(command.getTestFlag());
// if the processDefinition is serial
ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (processDefinition.getExecutionType().typeIsSerial()) {
saveSerialProcess(processInstance, processDefinition);
if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) {
setSubProcessParam(processInstance);
triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
deleteCommandWithCheck(command.getId());
// todo: this is a bad design to return null here, whether trigger the task
return null;
}

protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinition processDefinition) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.SERIAL_WAIT, "wait by serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance);
// serial wait
// when we get the running instance(or waiting instance) only get the priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
org.apache.dolphinscheduler.service.utils.Constants.RUNNING_PROCESS_STATE,
processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit from serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance);
}

public void checkSerialProcess(ProcessDefinition processDefinition) {
ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance();
int nextInstanceId = workflowInstance.getNextProcessInstanceId();
if (nextInstanceId == 0) {
ProcessInstance nextProcessInstance =
this.processService.loadNextProcess4Serial(workflowInstance.getProcessDefinition().getCode(),
WorkflowExecutionStatus.SERIAL_WAIT.getCode(), workflowInstance.getId());
if (nextProcessInstance == null) {
return;
}
ProcessInstance nextReadyStopProcessInstance =
this.processService.loadNextProcess4Serial(workflowInstance.getProcessDefinition().getCode(),
WorkflowExecutionStatus.READY_STOP.getCode(), workflowInstance.getId());
if (processDefinition.getExecutionType().typeIsSerialPriority() && nextReadyStopProcessInstance != null) {
return;
}
nextInstanceId = nextProcessInstance.getId();
}
ProcessInstance nextProcessInstance = this.processService.findProcessInstanceById(nextInstanceId);
if (nextProcessInstance.getState().isFinished() || nextProcessInstance.getState().isRunning()) {
return;
}
Map<String, Object> cmdParam = new HashMap<>();
// write the parameters of the nextProcessInstance to command
if (StringUtils.isNotEmpty(nextProcessInstance.getCommandParam())) {
Map<String, String> commandStartParamsMap = JSONUtils.toMap(nextProcessInstance.getCommandParam());
if (MapUtils.isNotEmpty(commandStartParamsMap)) {
Map<String, String> paramsMap = JSONUtils.toMap(commandStartParamsMap.get(CMD_PARAM_START_PARAMS));
if (MapUtils.isNotEmpty(paramsMap)) {
cmdParam.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(paramsMap));
}
}
}
cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, nextInstanceId);
Command command = new Command();
command.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command.setProcessInstanceId(nextProcessInstance.getId());
command.setProcessDefinitionCode(processDefinition.getCode());
command.setProcessDefinitionVersion(processDefinition.getVersion());
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
commandService.createCommand(command);
}

Brief change log

Verify this pull request

This pull request is code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(or)

If your pull request contain incompatible change, you should also add it to docs/docs/en/guide/upgrede/incompatible.md

@Gallardot Gallardot changed the title fix: serial_wait strategy workflow unable to wake up [bug][master]: serial_wait strategy workflow unable to wake up Dec 4, 2023
@Gallardot Gallardot changed the title [bug][master]: serial_wait strategy workflow unable to wake up [Bug][Master]: serial_wait strategy workflow unable to wake up Dec 4, 2023
@codecov-commenter
Copy link

codecov-commenter commented Dec 4, 2023

Codecov Report

Attention: 4 lines in your changes are missing coverage. Please review.

Comparison is base (3c7a77c) 38.11% compared to head (621aebe) 38.11%.

❗ Current head 621aebe differs from pull request most recent head 194a954. Consider uploading reports for the commit 194a954 to get more accurate results

Files Patch % Lines
...er/dao/repository/impl/ProcessInstanceDaoImpl.java 0.00% 2 Missing ⚠️
.../server/master/runner/WorkflowExecuteRunnable.java 0.00% 1 Missing ⚠️
...nscheduler/service/process/ProcessServiceImpl.java 80.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff            @@
##                dev   #15270   +/-   ##
=========================================
  Coverage     38.11%   38.11%           
- Complexity     4696     4697    +1     
=========================================
  Files          1299     1299           
  Lines         44775    44777    +2     
  Branches       4797     4797           
=========================================
+ Hits          17066    17067    +1     
- Misses        25861    25863    +2     
+ Partials       1848     1847    -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Gallardot Gallardot changed the title [Bug][Master]: serial_wait strategy workflow unable to wake up [Bug][Master]serial_wait strategy workflow unable to wake up Jan 8, 2024
Signed-off-by: Gallardot <gallardot@apache.org>
Signed-off-by: Gallardot <gallardot@apache.org>
Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, this is a hot fix PR.
Serial wait notify logic should be refactored, other with if the notify failed, the seaitl wait workflow instance will always in running.

@Gallardot Gallardot marked this pull request as ready for review January 12, 2024 01:34
@Gallardot
Copy link
Member Author

Gallardot commented Jan 12, 2024

This is an analysis of a bug related to the serial wait strategy, which causes the workflow instance to remain in a waiting state indefinitely.

When a workflow's scheduled strategy is SERIAL_WAIT, if a workflow instance's status is WAITING, then this workflow instance will remain in a waiting state, even if the previous workflow instance has already completed execution.

There is a certain probability that this problem will occur.

The analysis of the cause is as follows: The MasterSchedulerBootstrap thread processes commands through the handleCommand method. Note that this handleCommand is within a transaction. In this transaction, the saveSerialProcess method is used to modify the status of the workflow instance. However, At the same time, in another thread pool of WorkflowExecuteRunnable, the checkSerialProcess method is used to check the status of the workflow instance in order to wake up the workflow instance in a waiting state.

Everything seems fine. But there is a specific situation. That is, a workflow instance is about to complete, and a workflow instance is being created. Problems will arise at this time. Because of the isolation of transactions, the saveSerialProcess in the handleCommand method may have just been executed, but it has not yet been committed. At this time, the checkSerialProcess method will not be able to check that the status of this workflow instance is WAITING, causing this workflow instance to remain in a waiting state and cannot be awakened.

My solution is to use a new transaction for updating the status of the workflow instance in the handleCommand transaction. This can avoid the above problem. I have been running this in my environment for two months, and the problem has not reoccurred

@Override
@Transactional
public @Nullable ProcessInstance handleCommand(String host,
Command command) throws CronParseException, CodeGenerateException {
ProcessInstance processInstance = constructProcessInstance(command, host);
// cannot construct process instance, return null
if (processInstance == null) {
log.error("scan command, command parameter is error: {}", command);
commandService.moveToErrorCommand(command, "process instance is null");
return null;
}
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
processInstance.setTestFlag(command.getTestFlag());
// if the processDefinition is serial
ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (processDefinition.getExecutionType().typeIsSerial()) {
saveSerialProcess(processInstance, processDefinition);
if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) {
setSubProcessParam(processInstance);
triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
deleteCommandWithCheck(command.getId());
// todo: this is a bad design to return null here, whether trigger the task
return null;
}

protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinition processDefinition) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.SERIAL_WAIT, "wait by serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance);
// serial wait
// when we get the running instance(or waiting instance) only get the priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
org.apache.dolphinscheduler.service.utils.Constants.RUNNING_PROCESS_STATE,
processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit from serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance);
}

public void checkSerialProcess(ProcessDefinition processDefinition) {
ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance();
int nextInstanceId = workflowInstance.getNextProcessInstanceId();
if (nextInstanceId == 0) {
ProcessInstance nextProcessInstance =
this.processService.loadNextProcess4Serial(workflowInstance.getProcessDefinition().getCode(),
WorkflowExecutionStatus.SERIAL_WAIT.getCode(), workflowInstance.getId());
if (nextProcessInstance == null) {
return;
}
ProcessInstance nextReadyStopProcessInstance =
this.processService.loadNextProcess4Serial(workflowInstance.getProcessDefinition().getCode(),
WorkflowExecutionStatus.READY_STOP.getCode(), workflowInstance.getId());
if (processDefinition.getExecutionType().typeIsSerialPriority() && nextReadyStopProcessInstance != null) {
return;
}
nextInstanceId = nextProcessInstance.getId();
}
ProcessInstance nextProcessInstance = this.processService.findProcessInstanceById(nextInstanceId);
if (nextProcessInstance.getState().isFinished() || nextProcessInstance.getState().isRunning()) {
return;
}
Map<String, Object> cmdParam = new HashMap<>();
// write the parameters of the nextProcessInstance to command
if (StringUtils.isNotEmpty(nextProcessInstance.getCommandParam())) {
Map<String, String> commandStartParamsMap = JSONUtils.toMap(nextProcessInstance.getCommandParam());
if (MapUtils.isNotEmpty(commandStartParamsMap)) {
Map<String, String> paramsMap = JSONUtils.toMap(commandStartParamsMap.get(CMD_PARAM_START_PARAMS));
if (MapUtils.isNotEmpty(paramsMap)) {
cmdParam.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(paramsMap));
}
}
}
cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, nextInstanceId);
Command command = new Command();
command.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command.setProcessInstanceId(nextProcessInstance.getId());
command.setProcessDefinitionCode(processDefinition.getCode());
command.setProcessDefinitionVersion(processDefinition.getVersion());
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
commandService.createCommand(command);
}

@ruanwenjun @Radeity @EricGao888 @SbloodyS @fuchanghai @qingwli @caishunfeng PTAL.

@fuchanghai
Copy link
Member

LGTM, this is a hot fix PR. Serial wait notify logic should be refactored, other with if the notify failed, the seaitl wait workflow instance will always in running.

agree with @ruanwenjun

fuchanghai

This comment was marked as outdated.

@fuchanghai
Copy link
Member

hi @Gallardot Can you associate the issue with this PR?

Copy link
Member

@fuchanghai fuchanghai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @Gallardot I remember that for nested things, you need to use a proxy object to make it effective. and need add EnableAspectJAutoProxy on Application cc @ruanwenjun

@Gallardot
Copy link
Member Author

hi @Gallardot Can you associate the issue with this PR?

There's already some issue from the community.

#14182 #11647

@Gallardot
Copy link
Member Author

hi @Gallardot I remember that for nested things, you need to use a proxy object to make it effective. and need add EnableAspectJAutoProxy on Application cc @ruanwenjun

Which part does it mean?

@fuchanghai
Copy link
Member

fuchanghai commented Jan 12, 2024

hi @Gallardot I remember that for nested things, you need to use a proxy object to make it effective. and need add EnableAspectJAutoProxy on Application cc @ruanwenjun

Which part does it mean?

image The transaction has been opened on the handleCommand method, and I am not sure whether the new transaction in this method will take effect. AFAIK, this is a nested transaction and needs to be enabled for annotations to take effect. ```@ EnableAspectJAutoProxy(exposeProxy = true) ```,Maybe I'm wrong, can you test whether the database data is updated after running the upsert method? cc @EricGao888 @ruanwenjun

@fuchanghai
Copy link
Member

hi @Gallardot I remember that for nested things, you need to use a proxy object to make it effective. and need add EnableAspectJAutoProxy on Application cc @ruanwenjun

Which part does it mean?

image The transaction has been opened on the handleCommand method, and I am not sure whether the new transaction in this method will take effect. AFAIK, this is a nested transaction and needs to be enabled for annotations to take effect. @ EnableAspectJAutoProxy(exposeProxy = true) ,Maybe I'm wrong, can you test whether the database data is updated after running the upsert method? cc @EricGao888 @ruanwenjun

@Gallardot Sorry I didn't notice EnableTransactionManagement, and LGTM

@Gallardot
Copy link
Member Author

The transaction has been opened on the handleCommand method, and I am not sure whether the new transaction in this method will take effect. AFAIK, this is a nested transaction and needs to be enabled for annotations to take effect. @ EnableAspectJAutoProxy(exposeProxy = true) ,Maybe I'm wrong, you can test whether the database data is updated after running the upsert method? cc @EricGao888 @ruanwenjun

As I mentioned in the Purpose of the pull request, this issue is caused by the fact that handleCommand has initiated a transaction that has not been committed. Therefore, I am using a new transaction to solve this problem.

For the usage of the Transactional annotation in Spring, you can refer to transaction-declarative-attransactional-settings

@EricGao888 EricGao888 added this to the 3.2.1 milestone Jan 12, 2024
@fuchanghai fuchanghai added 3.2.1 bug Something isn't working labels Jan 12, 2024
Copy link

sonarcloud bot commented Jan 12, 2024

Quality Gate Failed Quality Gate failed

Failed conditions

50.0% Coverage on New Code (required ≥ 60%)

See analysis details on SonarCloud

@fuchanghai fuchanghai merged commit a405abe into apache:dev Jan 12, 2024
55 of 56 checks passed
@Gallardot Gallardot deleted the fix-waitpolicy2 branch January 13, 2024 10:23
Gallardot added a commit to Gallardot/dolphinscheduler that referenced this pull request Mar 14, 2024
…15270)

* fix: serial_wait strategy workflow unable to wake up

Signed-off-by: Gallardot <gallardot@apache.org>

* fix: serial_wait strategy workflow unable to wake up

Signed-off-by: Gallardot <gallardot@apache.org>

---------

Signed-off-by: Gallardot <gallardot@apache.org>
Co-authored-by: fuchanghai <changhaifu@apache.org>
Gallardot pushed a commit to Gallardot/dolphinscheduler that referenced this pull request Mar 14, 2024
[Bug][Master]serial_wait strategy workflow unable to wake up (apache#15270)

See merge request logan/devops/apache/dolphinscheduler!13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants