Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
fix for terminate tasks within a decision branch
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Jan 29, 2021
1 parent 3d30189 commit 6784379
Show file tree
Hide file tree
Showing 13 changed files with 398 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@
import com.netflix.conductor.common.constraints.TaskReferenceNameUniqueConstraint;
import com.netflix.conductor.common.metadata.Auditable;
import com.netflix.conductor.common.metadata.tasks.TaskType;

import javax.validation.Valid;
import javax.validation.constraints.Email;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import javax.validation.Valid;
import javax.validation.constraints.Email;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

@ProtoMessage
@TaskReferenceNameUniqueConstraint
Expand Down Expand Up @@ -304,6 +304,11 @@ public static String getKey(String name, int version) {
}

public WorkflowTask getNextTask(String taskReferenceName) {
WorkflowTask workflowTask = getTaskByRefName(taskReferenceName);
if (workflowTask != null && TaskType.TERMINATE.name().equals(workflowTask.getType())) {
return null;
}

Iterator<WorkflowTask> iterator = tasks.iterator();
while (iterator.hasNext()) {
WorkflowTask task = iterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ public WorkflowTask next(String taskReferenceName, WorkflowTask parent) {
}
break;
case DYNAMIC:
case TERMINATE:
case SIMPLE:
return null;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
*/
package com.netflix.conductor.core.execution;

import static com.netflix.conductor.common.metadata.tasks.Task.Status.COMPLETED_WITH_ERRORS;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SKIPPED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.TIMED_OUT;
import static com.netflix.conductor.common.metadata.tasks.TaskType.SUB_WORKFLOW;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
Expand All @@ -33,13 +41,6 @@
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.metrics.Monitors;
import java.time.Duration;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand All @@ -50,13 +51,12 @@
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static com.netflix.conductor.common.metadata.tasks.Task.Status.COMPLETED_WITH_ERRORS;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SKIPPED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.TIMED_OUT;
import static com.netflix.conductor.common.metadata.tasks.TaskType.SUB_WORKFLOW;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

/**
* Decider evaluates the state of the workflow by inspecting the current state along with the blueprint. The result of
Expand All @@ -78,6 +78,10 @@ public class DeciderService {
private final Predicate<Task> isNonPendingTask = task -> !task.isRetried() && !task.getStatus().equals(SKIPPED)
&& !task.isExecuted();

private final Predicate<Workflow> containsSuccessfulTerminateTask = workflow -> workflow.getTasks().stream()
.anyMatch(task -> TERMINATE.name().equals(task.getTaskType())
&& task.getStatus().isTerminal() && task.getStatus().isSuccessful());

public DeciderService(ParametersUtils parametersUtils, MetadataDAO metadataDAO,
ExternalPayloadStorageUtils externalPayloadStorageUtils,
@Qualifier("taskProcessorsMap") Map<String, TaskMapper> taskMappers,
Expand Down Expand Up @@ -221,15 +225,17 @@ private DeciderOutcome decide(final Workflow workflow, List<Task> preScheduledTa
workflow.getWorkflowId());
outcome.tasksToBeScheduled.addAll(unScheduledTasks);
}
if (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow)) {
if (containsSuccessfulTerminateTask.test(workflow) || (outcome.tasksToBeScheduled.isEmpty()
&& checkForWorkflowCompletion(workflow))) {
LOGGER.debug("Marking workflow: {} as complete.", workflow);
outcome.isComplete = true;
}

return outcome;
}

protected List<Task> filterNextLoopOverTasks(List<Task> tasks, Task pendingTask, Workflow workflow) {
@VisibleForTesting
List<Task> filterNextLoopOverTasks(List<Task> tasks, Task pendingTask, Workflow workflow) {

//Update the task reference name and iteration
tasks.forEach(nextTask -> {
Expand Down Expand Up @@ -298,20 +304,32 @@ private List<Task> startWorkflow(Workflow workflow) throws TerminateWorkflowExce
*
* @param workflow the workflow instance
* @param task if not null, the output of this task will be copied to workflow output if no output parameters
* are specified in the workflow defintion if null, the output of the last task in the workflow will
* be copied to workflow output of no output parameters are specified in the workflow definition
* are specified in the workflow definition if null, the output of the last task in the workflow
* will be copied to workflow output of no output parameters are specified in the workflow
* definition
*/
void updateWorkflowOutput(final Workflow workflow, Task task) {
List<Task> allTasks = workflow.getTasks();
if (allTasks.isEmpty()) {
return;
}

Optional<Task> terminateTask = allTasks.stream()
.filter(t -> TaskType.TERMINATE.name().equals(t.getTaskType()) && t.getStatus().isTerminal()
&& t.getStatus().isSuccessful())
.findFirst();
if (terminateTask.isPresent()) {
if (!terminateTask.get().getOutputData().isEmpty()) {
workflow.setOutput(terminateTask.get().getOutputData());
}
return;
}

Task last = Optional.ofNullable(task).orElse(allTasks.get(allTasks.size() - 1));

WorkflowDef workflowDef = workflow.getWorkflowDefinition();
Map<String, Object> output;
if (workflowDef.getOutputParameters() != null && !workflowDef.getOutputParameters().isEmpty() && !(TaskType.TERMINATE.name().equals(last.getTaskType()))) {
if (workflowDef.getOutputParameters() != null && !workflowDef.getOutputParameters().isEmpty()) {
Workflow workflowInstance = populateWorkflowAndTaskData(workflow);
output = parametersUtils.getTaskInput(workflowDef.getOutputParameters(), workflowInstance, null, null);
} else if (StringUtils.isNotBlank(last.getExternalOutputPayloadStoragePath())) {
Expand All @@ -326,12 +344,17 @@ void updateWorkflowOutput(final Workflow workflow, Task task) {
externalizeWorkflowData(workflow);
}

private boolean checkForWorkflowCompletion(final Workflow workflow) throws TerminateWorkflowException {
@VisibleForTesting
boolean checkForWorkflowCompletion(final Workflow workflow) throws TerminateWorkflowException {
List<Task> allTasks = workflow.getTasks();
if (allTasks.isEmpty()) {
return false;
}

if (containsSuccessfulTerminateTask.test(workflow)) {
return true;
}

Map<String, Status> taskStatusMap = new HashMap<>();
workflow.getTasks().forEach(task -> taskStatusMap.put(task.getReferenceTaskName(), task.getStatus()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.execution.tasks.SubWorkflow;
import com.netflix.conductor.core.execution.tasks.Terminate;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.core.metadata.MetadataMapperService;
Expand Down Expand Up @@ -656,20 +657,43 @@ public Task getPendingTaskByWorkflow(String taskReferenceName, String workflowId
.orElse(null);
}

private void endExecution(Workflow workflow) {
Optional<Task> terminateTask = workflow.getTasks().stream()
.filter(t -> TERMINATE.name().equals(t.getTaskType()) && t.getStatus().isTerminal()
&& t.getStatus().isSuccessful())
.findFirst();
if (terminateTask.isPresent()) {
String terminationStatus =
(String) terminateTask.get().getInputData().get(Terminate.getTerminationStatusParameter());
String reason = String
.format("Workflow is %s by TERMINATE task: %s", terminationStatus, terminateTask.get().getTaskId());
if (WorkflowStatus.FAILED.name().equals(terminationStatus)) {
workflow.setStatus(WorkflowStatus.FAILED);
workflow = terminate(workflow, new TerminateWorkflowException(reason));
} else {
workflow.setReasonForIncompletion(reason);
workflow = completeWorkflow(workflow);
}
} else {
workflow = completeWorkflow(workflow);
}
cancelNonTerminalTasks(workflow);
}

/**
* @param wf the workflow to be completed
* @throws ApplicationException if workflow is not in terminal state
*/
@VisibleForTesting
void completeWorkflow(Workflow wf) {
Workflow completeWorkflow(Workflow wf) {
LOGGER.debug("Completing workflow execution for {}", wf.getWorkflowId());
Workflow workflow = executionDAOFacade.getWorkflowById(wf.getWorkflowId(), false);

if (workflow.getStatus().equals(WorkflowStatus.COMPLETED)) {
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); //remove from the sweep queue
executionDAOFacade.removeFromPendingWorkflow(workflow.getWorkflowName(), workflow.getWorkflowId());
LOGGER.debug("Workflow: {} has already been completed.", wf.getWorkflowId());
return;
return wf;
}

if (workflow.getStatus().isTerminal()) {
Expand All @@ -687,6 +711,7 @@ void completeWorkflow(Workflow wf) {
workflow.setStatus(WorkflowStatus.COMPLETED);
workflow.setTasks(wf.getTasks());
workflow.setOutput(wf.getOutput());
workflow.setReasonForIncompletion(wf.getReasonForIncompletion());
workflow.setExternalOutputPayloadStoragePath(wf.getExternalOutputPayloadStoragePath());
executionDAOFacade.updateWorkflow(workflow);
LOGGER.debug("Completed workflow execution for {}", workflow.getWorkflowId());
Expand All @@ -705,6 +730,7 @@ void completeWorkflow(Workflow wf) {

executionLockService.releaseLock(workflow.getWorkflowId());
executionLockService.deleteLock(workflow.getWorkflowId());
return workflow;
}

public void terminateWorkflow(String workflowId, String reason) {
Expand All @@ -721,7 +747,7 @@ public void terminateWorkflow(String workflowId, String reason) {
* @param reason the reason for termination
* @param failureWorkflow the failure workflow (if any) to be triggered as a result of this termination
*/
public void terminateWorkflow(Workflow workflow, String reason, String failureWorkflow) {
public Workflow terminateWorkflow(Workflow workflow, String reason, String failureWorkflow) {
try {
executionLockService.acquireLock(workflow.getWorkflowId(), 60000);

Expand Down Expand Up @@ -796,10 +822,17 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
workflowStatusListener.onWorkflowTerminated(workflow);
}

if (!erroredTasks.isEmpty()) {
if (erroredTasks.isEmpty()) {
try {
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId());
} catch (Exception e) {
LOGGER.error("Error removing workflow: {} from decider queue", workflow.getWorkflowId(), e);
}
} else {
throw new ApplicationException(Code.INTERNAL_ERROR, String.format("Error canceling system tasks: %s",
String.join(",", erroredTasks)));
}
return workflow;
} finally {
executionLockService.releaseLock(workflow.getWorkflowId());
executionLockService.deleteLock(workflow.getWorkflowId());
Expand Down Expand Up @@ -1024,7 +1057,7 @@ public boolean decide(String workflowId) {
try {
DeciderService.DeciderOutcome outcome = deciderService.decide(workflow);
if (outcome.isComplete) {
completeWorkflow(workflow);
endExecution(workflow);
return true;
}

Expand All @@ -1040,44 +1073,6 @@ public boolean decide(String workflowId) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
Workflow workflowInstance = deciderService.populateWorkflowAndTaskData(workflow);
if (!workflowSystemTask.isAsync() && workflowSystemTask.execute(workflowInstance, task, this)) {
// FIXME: temporary hack to workaround TERMINATE task
if (TERMINATE.name().equals(task.getTaskType())) {
deciderService.externalizeTaskData(task);
executionDAOFacade.updateTask(task);
workflow.setOutput(workflowInstance.getOutput());
List<Task> terminateTasksToBeUpdated = new ArrayList<Task>();
/*
* The TERMINATE task completes the workflow but does not do anything with SCHEDULED or IN_PROGRESS tasks to complete them
*/
for (Task workflowTask : workflow.getTasks()) {
if (workflowTask != task && !workflowTask.getStatus().isTerminal()) {
workflowTask.setStatus(SKIPPED);
terminateTasksToBeUpdated.add(workflowTask);
}
}
/*
* Now find nested subworkflows that also need to have their tasks skipped
*/
for (Task workflowTask : workflow.getTasks()) {
if (SUB_WORKFLOW.name().equals(workflowTask.getTaskType()) && StringUtils
.isNotBlank(workflowTask.getSubWorkflowId())) {
Workflow subWorkflow = executionDAOFacade
.getWorkflowById(workflowTask.getSubWorkflowId(), true);
if (subWorkflow != null) {
skipTasksAffectedByTerminateTask(subWorkflow);
}
}
}
executionDAOFacade.updateTasks(terminateTasksToBeUpdated);
if (workflowInstance.getStatus().equals(WorkflowStatus.COMPLETED)) {
completeWorkflow(workflow);
} else {
workflow.setStatus(workflowInstance.getStatus());
terminate(workflow, new TerminateWorkflowException(
"Workflow is FAILED by TERMINATE task: " + task.getTaskId()));
}
return true;
}
deciderService.externalizeTaskData(task);
tasksToBeUpdated.add(task);
stateChanged = true;
Expand Down Expand Up @@ -1162,13 +1157,6 @@ List<String> cancelNonTerminalTasks(Workflow workflow) {
executionDAOFacade.updateTask(task);
}
}
if (erroredTasks.isEmpty()) {
try {
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId());
} catch (Exception e) {
LOGGER.error("Error removing workflow: {} from decider queue", workflow.getWorkflowId(), e);
}
}
return erroredTasks;
}

Expand Down Expand Up @@ -1420,8 +1408,8 @@ void setTaskDomains(List<Task> tasks, Workflow workflow) {

/**
* Gets the active domain from the list of domains where the task is to be queued. The domain list must be ordered.
* In sequence, check if any worker has polled for last `activeWorkerLastPollMs`, if so that is the
* Active domain. When no active domains are found:
* In sequence, check if any worker has polled for last `activeWorkerLastPollMs`, if so that is the Active domain.
* When no active domains are found:
* <li> If NO_DOMAIN token is provided, return null.
* <li> Else, return last domain from list.
*
Expand Down Expand Up @@ -1582,7 +1570,7 @@ private void addTaskToQueue(final List<Task> tasks) {
}
}

private void terminate(final Workflow workflow, TerminateWorkflowException terminateWorkflowException) {
private Workflow terminate(final Workflow workflow, TerminateWorkflowException terminateWorkflowException) {
if (!workflow.getStatus().isTerminal()) {
workflow.setStatus(terminateWorkflowException.getWorkflowStatus());
}
Expand All @@ -1598,7 +1586,7 @@ private void terminate(final Workflow workflow, TerminateWorkflowException termi
if (terminateWorkflowException.getTask() != null) {
executionDAOFacade.updateTask(terminateWorkflowException.getTask());
}
terminateWorkflow(workflow, terminateWorkflowException.getMessage(), failureWorkflow);
return terminateWorkflow(workflow, terminateWorkflowException.getMessage(), failureWorkflow);
}

private boolean rerunWF(String workflowId, String taskId, Map<String, Object> taskInput,
Expand Down
Loading

0 comments on commit 6784379

Please sign in to comment.