Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

backported fix for termiante task from #2056 #2093

Merged
merged 1 commit into from Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -311,6 +311,11 @@ public static String getKey(String name, int version){
}

public WorkflowTask getNextTask(String taskReferenceName){
WorkflowTask workflowTask = getTaskByRefName(taskReferenceName);
if (workflowTask != null && TaskType.TERMINATE.name().equals(workflowTask.getType())) {
return null;
}

Iterator<WorkflowTask> it = tasks.iterator();
while(it.hasNext()){
WorkflowTask task = it.next();
Expand Down
Expand Up @@ -628,6 +628,7 @@ public WorkflowTask next(String taskReferenceName, WorkflowTask parent) {
}
break;
case DYNAMIC:
case TERMINATE:
case SIMPLE:
return null;
default:
Expand Down
Expand Up @@ -18,6 +18,7 @@
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SKIPPED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.TIMED_OUT;
import static com.netflix.conductor.common.metadata.workflow.TaskType.SUB_WORKFLOW;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TERMINATE;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.Task;
Expand Down Expand Up @@ -74,6 +75,10 @@ public class DeciderService {

private final Predicate<Task> isNonPendingTask = task -> !task.isRetried() && !task.getStatus().equals(SKIPPED) && !task.isExecuted();

private final Predicate<Workflow> containsSuccessfulTerminateTask = workflow -> workflow.getTasks().stream()
.anyMatch(task -> TERMINATE.name().equals(task.getTaskType())
&& task.getStatus().isTerminal() && task.getStatus().isSuccessful());

private static final String PENDING_TASK_TIME_THRESHOLD_PROPERTY_NAME = "workflow.task.pending.time.threshold.minutes";

@Inject
Expand Down Expand Up @@ -215,15 +220,17 @@ private DeciderOutcome decide(final Workflow workflow, List<Task> preScheduledTa
workflow.getWorkflowId());
outcome.tasksToBeScheduled.addAll(unScheduledTasks);
}
if (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow)) {
if (containsSuccessfulTerminateTask.test(workflow) || (outcome.tasksToBeScheduled.isEmpty()
&& checkForWorkflowCompletion(workflow))) {
LOGGER.debug("Marking workflow: {} as complete.", workflow);
outcome.isComplete = true;
}

return outcome;
}

protected List<Task> filterNextLoopOverTasks(List<Task> tasks, Task pendingTask, Workflow workflow) {
@VisibleForTesting
List<Task> filterNextLoopOverTasks(List<Task> tasks, Task pendingTask, Workflow workflow) {

//Update the task reference name and iteration
tasks.forEach(nextTask -> {
Expand Down Expand Up @@ -295,11 +302,22 @@ void updateWorkflowOutput(final Workflow workflow, @Nullable Task task) {
return;
}

Optional<Task> terminateTask = allTasks.stream()
.filter(t -> TaskType.TERMINATE.name().equals(t.getTaskType()) && t.getStatus().isTerminal()
&& t.getStatus().isSuccessful())
.findFirst();
if (terminateTask.isPresent()) {
if (!terminateTask.get().getOutputData().isEmpty()) {
workflow.setOutput(terminateTask.get().getOutputData());
}
return;
}

Task last = Optional.ofNullable(task).orElse(allTasks.get(allTasks.size() - 1));

WorkflowDef workflowDef = workflow.getWorkflowDefinition();
Map<String, Object> output;
if (workflowDef.getOutputParameters() != null && !workflowDef.getOutputParameters().isEmpty() && !(TaskType.TERMINATE.name().equals(last.getTaskType()))) {
if (workflowDef.getOutputParameters() != null && !workflowDef.getOutputParameters().isEmpty()) {
Workflow workflowInstance = populateWorkflowAndTaskData(workflow);
output = parametersUtils.getTaskInput(workflowDef.getOutputParameters(), workflowInstance, null, null);
} else if (StringUtils.isNotBlank(last.getExternalOutputPayloadStoragePath())) {
Expand All @@ -313,12 +331,17 @@ void updateWorkflowOutput(final Workflow workflow, @Nullable Task task) {
externalizeWorkflowData(workflow);
}

private boolean checkForWorkflowCompletion(final Workflow workflow) throws TerminateWorkflowException {
@VisibleForTesting
boolean checkForWorkflowCompletion(final Workflow workflow) throws TerminateWorkflowException {
List<Task> allTasks = workflow.getTasks();
if (allTasks.isEmpty()) {
return false;
}

if (containsSuccessfulTerminateTask.test(workflow)) {
return true;
}

Map<String, Status> taskStatusMap = new HashMap<>();
workflow.getTasks().forEach(task -> taskStatusMap.put(task.getReferenceTaskName(), task.getStatus()));

Expand Down
Expand Up @@ -47,6 +47,7 @@
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.ApplicationException.Code;
import com.netflix.conductor.core.execution.tasks.SubWorkflow;
import com.netflix.conductor.core.execution.tasks.Terminate;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.metadata.MetadataMapperService;
import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
Expand Down Expand Up @@ -678,20 +679,43 @@ public Task getPendingTaskByWorkflow(String taskReferenceName, String workflowId
.orElse(null);
}

private void endExecution(Workflow workflow) {
Optional<Task> terminateTask = workflow.getTasks().stream()
.filter(t -> TERMINATE.name().equals(t.getTaskType()) && t.getStatus().isTerminal()
&& t.getStatus().isSuccessful())
.findFirst();
if (terminateTask.isPresent()) {
String terminationStatus =
(String) terminateTask.get().getInputData().get(Terminate.getTerminationStatusParameter());
String reason = String
.format("Workflow is %s by TERMINATE task: %s", terminationStatus, terminateTask.get().getTaskId());
if (WorkflowStatus.FAILED.name().equals(terminationStatus)) {
workflow.setStatus(WorkflowStatus.FAILED);
workflow = terminate(workflow, new TerminateWorkflowException(reason));
} else {
workflow.setReasonForIncompletion(reason);
workflow = completeWorkflow(workflow);
}
} else {
workflow = completeWorkflow(workflow);
}
cancelNonTerminalTasks(workflow);
}

/**
* @param wf the workflow to be completed
* @throws ApplicationException if workflow is not in terminal state
*/
@VisibleForTesting
void completeWorkflow(Workflow wf) {
Workflow completeWorkflow(Workflow wf) {
LOGGER.debug("Completing workflow execution for {}", wf.getWorkflowId());
Workflow workflow = executionDAOFacade.getWorkflowById(wf.getWorkflowId(), false);

if (workflow.getStatus().equals(WorkflowStatus.COMPLETED)) {
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); //remove from the sweep queue
executionDAOFacade.removeFromPendingWorkflow(workflow.getWorkflowName(), workflow.getWorkflowId());
LOGGER.debug("Workflow: {} has already been completed.", wf.getWorkflowId());
return;
return wf;
}

if (workflow.getStatus().isTerminal()) {
Expand All @@ -709,23 +733,25 @@ void completeWorkflow(Workflow wf) {
workflow.setStatus(WorkflowStatus.COMPLETED);
workflow.setTasks(wf.getTasks());
workflow.setOutput(wf.getOutput());
workflow.setReasonForIncompletion(wf.getReasonForIncompletion());
workflow.setExternalOutputPayloadStoragePath(wf.getExternalOutputPayloadStoragePath());
executionDAOFacade.updateWorkflow(workflow);
LOGGER.debug("Completed workflow execution for {}", workflow.getWorkflowId());

if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
workflowStatusListener.onWorkflowCompleted(workflow);
}

if (StringUtils.isNotEmpty(workflow.getParentWorkflowId())) {
updateParentWorkflowTask(workflow);
decide(workflow.getParentWorkflowId());
}
Monitors.recordWorkflowCompletion(workflow.getWorkflowName(), workflow.getEndTime() - workflow.getStartTime(), workflow.getOwnerApp());
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); //remove from the sweep queue

if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
workflowStatusListener.onWorkflowCompleted(workflow);
}

executionLockService.releaseLock(workflow.getWorkflowId());
executionLockService.deleteLock(workflow.getWorkflowId());
return workflow;
}

public void terminateWorkflow(String workflowId, String reason) {
Expand All @@ -742,7 +768,7 @@ public void terminateWorkflow(String workflowId, String reason) {
* @param reason the reason for termination
* @param failureWorkflow the failure workflow (if any) to be triggered as a result of this termination
*/
public void terminateWorkflow(Workflow workflow, String reason, String failureWorkflow) {
public Workflow terminateWorkflow(Workflow workflow, String reason, String failureWorkflow) {
try {
executionLockService.acquireLock(workflow.getWorkflowId(), 60000);

Expand Down Expand Up @@ -815,10 +841,17 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
workflowStatusListener.onWorkflowTerminated(workflow);
}

if (!erroredTasks.isEmpty()) {
if (erroredTasks.isEmpty()) {
try {
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId());
} catch (Exception e) {
LOGGER.error("Error removing workflow: {} from decider queue", workflow.getWorkflowId(), e);
}
} else {
throw new ApplicationException(Code.INTERNAL_ERROR, String.format("Error canceling system tasks: %s",
String.join(",", erroredTasks)));
}
return workflow;
} finally {
executionLockService.releaseLock(workflow.getWorkflowId());
executionLockService.deleteLock(workflow.getWorkflowId());
Expand Down Expand Up @@ -1029,7 +1062,7 @@ public boolean decide(String workflowId) {
try {
DeciderService.DeciderOutcome outcome = deciderService.decide(workflow);
if (outcome.isComplete) {
completeWorkflow(workflow);
endExecution(workflow);
return true;
}

Expand All @@ -1045,41 +1078,6 @@ public boolean decide(String workflowId) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
Workflow workflowInstance = deciderService.populateWorkflowAndTaskData(workflow);
if (!workflowSystemTask.isAsync() && workflowSystemTask.execute(workflowInstance, task, this)) {
// FIXME: temporary hack to workaround TERMINATE task
if (TERMINATE.name().equals(task.getTaskType())) {
deciderService.externalizeTaskData(task);
executionDAOFacade.updateTask(task);
workflow.setOutput(workflowInstance.getOutput());
List<Task> terminateTasksToBeUpdated = new ArrayList<Task>();
/*
* The TERMINATE task completes the workflow but does not do anything with SCHEDULED or IN_PROGRESS tasks to complete them
*/
for(Task workflowTask : workflow.getTasks()) {
if(workflowTask != task && !workflowTask.getStatus().isTerminal()) {
workflowTask.setStatus(SKIPPED);
terminateTasksToBeUpdated.add(workflowTask);
}
}
/*
* Now find nested subworkflows that also need to have their tasks skipped
*/
for(Task workflowTask : workflow.getTasks()) {
if(TaskType.SUB_WORKFLOW.name().equals(workflowTask.getTaskType()) && StringUtils.isNotBlank(workflowTask.getSubWorkflowId())) {
Workflow subWorkflow = executionDAOFacade.getWorkflowById(workflowTask.getSubWorkflowId(), true);
if(subWorkflow != null) {
skipTasksAffectedByTerminateTask(subWorkflow);
}
}
}
executionDAOFacade.updateTasks(terminateTasksToBeUpdated);
if(workflowInstance.getStatus().equals(WorkflowStatus.COMPLETED)) {
completeWorkflow(workflow);
} else {
workflow.setStatus(workflowInstance.getStatus());
terminate(workflow, new TerminateWorkflowException("Workflow is FAILED by TERMINATE task: " + task.getTaskId()));
}
return true;
}
deciderService.externalizeTaskData(task);
tasksToBeUpdated.add(task);
stateChanged = true;
Expand Down Expand Up @@ -1162,13 +1160,6 @@ List<String> cancelNonTerminalTasks(Workflow workflow) {
executionDAOFacade.updateTask(task);
}
}
if (erroredTasks.isEmpty()) {
try {
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId());
} catch (Exception e) {
LOGGER.error("Error removing workflow: {} from decider queue", workflow.getWorkflowId(), e);
}
}
return erroredTasks;
}

Expand Down Expand Up @@ -1567,7 +1558,7 @@ private void addTaskToQueue(final List<Task> tasks) {
}
}

private void terminate(final Workflow workflow, TerminateWorkflowException tw) {
private Workflow terminate(final Workflow workflow, TerminateWorkflowException tw) {
if (!workflow.getStatus().isTerminal()) {
workflow.setStatus(tw.workflowStatus);
}
Expand All @@ -1583,7 +1574,7 @@ private void terminate(final Workflow workflow, TerminateWorkflowException tw) {
if (tw.task != null) {
executionDAOFacade.updateTask(tw.task);
}
terminateWorkflow(workflow, tw.getMessage(), failureWorkflow);
return terminateWorkflow(workflow, tw.getMessage(), failureWorkflow);
}

private boolean rerunWF(String workflowId, String taskId, Map<String, Object> taskInput,
Expand Down
Expand Up @@ -106,6 +106,7 @@ private boolean verifyAndRepairDeciderQueue(Workflow workflow) {
String queueName = WorkflowExecutor.DECIDER_QUEUE;
if (!queueDAO.containsMessage(queueName, workflow.getWorkflowId())) {
queueDAO.push(queueName, workflow.getWorkflowId(), configuration.getSweepFrequency());
LOGGER.info("Workflow {} re-queued for repairs", workflow.getWorkflowId());
Monitors.recordQueueMessageRepushFromRepairService(queueName);
return true;
}
Expand All @@ -125,6 +126,7 @@ protected boolean verifyAndRepairTask(Task task) {
String taskQueueName = QueueUtils.getQueueName(task);
if (!queueDAO.containsMessage(taskQueueName, task.getTaskId())) {
queueDAO.push(taskQueueName, task.getTaskId(), task.getCallbackAfterSeconds());
LOGGER.info("Task {} in workflow {} re-queued for repairs", task.getTaskId(), task.getWorkflowInstanceId());
Monitors.recordQueueMessageRepushFromRepairService(task.getTaskDefName());
return true;
}
Expand Down
Expand Up @@ -148,7 +148,10 @@ public void cancel(Workflow workflow, Task task, WorkflowExecutor provider) {
}
Workflow subWorkflow = provider.getWorkflow(workflowId, true);
subWorkflow.setStatus(WorkflowStatus.TERMINATED);
provider.terminateWorkflow(subWorkflow, "Parent workflow has been terminated with status " + workflow.getStatus(), null);
String reason = StringUtils.isEmpty(workflow.getReasonForIncompletion())
? "Parent workflow has been terminated with status " + workflow.getStatus()
: "Parent workflow has been terminated with reason: " + workflow.getReasonForIncompletion();
provider.terminateWorkflow(subWorkflow, reason, null);
}

@Override
Expand Down
Expand Up @@ -55,9 +55,7 @@ public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor)
String returnStatus = (String) task.getInputData().get(TERMINATION_STATUS_PARAMETER);

if(validateInputStatus(returnStatus)) {
workflow.setStatus(Workflow.WorkflowStatus.valueOf(returnStatus));
task.setOutputData(getInputFromParam(task.getInputData()));
setWorkflowOutput(task.getOutputData(), workflow);
task.setStatus(Task.Status.COMPLETED);
return true;
}
Expand All @@ -78,12 +76,7 @@ public static Boolean validateInputStatus(String status) {
return COMPLETED.name().equals(status) || FAILED.name().equals(status);
}

private void setWorkflowOutput(Map<String, Object> taskOutput, Workflow workflow) {
if(!taskOutput.isEmpty()) {
workflow.setOutput(taskOutput);
}
}

@SuppressWarnings("unchecked")
private Map<String, Object> getInputFromParam(Map<String, Object> taskInput) {
HashMap<String, Object> output = new HashMap<>();
if(taskInput.get(TERMINATION_WORKFLOW_OUTPUT) == null) {
Expand Down