Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Update ack task logic to fail the task if ack failed, and fail the wo…
Browse files Browse the repository at this point in the history
…rkflow if update task failed further. (#1267)
  • Loading branch information
kishorebanala committed Aug 21, 2019
1 parent 67e178a commit 15589bf
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ public void updateTask(TaskResult taskResult) {
LOGGER.debug("Task: {} has a {} status and the Workflow has been updated with failed task reference", task, task.getStatus());
}
} catch (Exception e) {
String errorMsg = String.format("Error updating task: %s for workflow: %s, terminating workflow.", task.getTaskId(), workflowId);
String errorMsg = String.format("Error updating task: %s for workflow: %s", task.getTaskId(), workflowId);
LOGGER.error(errorMsg, e);
Monitors.recordTaskUpdateError(task.getTaskType(), workflowInstance.getWorkflowName());
throw new ApplicationException(Code.BACKEND_ERROR, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ public List<PollData> getAllPollData() {

}

public void terminateWorkflow(String workflowId, String reason) {
workflowExecutor.terminateWorkflow(workflowId, reason);
}

//For backward compatibility - to be removed in the later versions
public void updateTask(Task task) {
updateTask(new TaskResult(task));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,30 +174,31 @@ public boolean ackTaskReceived(String taskId) {
// Fail the task and let decide reevaluate the workflow, thereby preventing workflow being stuck from transient ack errors.
String errorMsg = String.format("Error when trying to ack task %s", taskId);
LOGGER.error(errorMsg, e);
try {
Task task = executionService.getTask(taskId);
Monitors.recordAckTaskError(task.getTaskType());
failTask(task, errorMsg);
} catch (Exception failTaskException) {
LOGGER.error("Unable to fail task on ack failure, taskId: {}", taskId, failTaskException);
}
Task task = executionService.getTask(taskId);
Monitors.recordAckTaskError(task.getTaskType());
failTask(task, errorMsg);
ackResult.set(false);
}
return ackResult.get();
}

/**
* Fail a task.
* Updates the task with FAILED status; On exception, fails the workflow.
* @param task
* @param errorMsg
*/
private void failTask(Task task, String errorMsg) {
TaskResult taskResult = new TaskResult();
taskResult.setStatus(TaskResult.Status.FAILED);
taskResult.setTaskId(task.getTaskId());
taskResult.setWorkflowInstanceId(task.getWorkflowInstanceId());
taskResult.setReasonForIncompletion(errorMsg);
executionService.updateTask(taskResult);
try {
TaskResult taskResult = new TaskResult();
taskResult.setStatus(TaskResult.Status.FAILED);
taskResult.setTaskId(task.getTaskId());
taskResult.setWorkflowInstanceId(task.getWorkflowInstanceId());
taskResult.setReasonForIncompletion(errorMsg);
executionService.updateTask(taskResult);
} catch (Exception e) {
LOGGER.error("Unable to fail task: {} in workflow: {}", task.getTaskId(), task.getWorkflowInstanceId(), e);
executionService.terminateWorkflow(task.getWorkflowInstanceId(), "Failed to ack task: " + task.getTaskId());
}
}

/**
Expand Down

0 comments on commit 15589bf

Please sign in to comment.