Skip to content

Commit

Permalink
ISICO-14427: take recent changes in DoWhile from Netflix/conductor
Browse files Browse the repository at this point in the history
release 3.13.3. Changes done in PR - Netflix#3351
  • Loading branch information
Raghavendra Chary B committed Mar 5, 2023
1 parent aee0e59 commit 3e4e6e7
Showing 1 changed file with 134 additions and 77 deletions.
Expand Up @@ -28,11 +28,11 @@
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.utils.TaskUtils;
import com.netflix.conductor.core.config.ConfigProp;
import com.netflix.conductor.core.events.ScriptEvaluator;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.utils.ParametersUtils;

Expand Down Expand Up @@ -65,35 +65,52 @@ public void cancel(Workflow workflow, Task task, WorkflowExecutor executor) {
}

@Override
public boolean execute(Workflow workflow, Task task, WorkflowExecutor workflowExecutor) {
public boolean execute(
Workflow workflow, Task doWhileTaskModel, WorkflowExecutor workflowExecutor) {

boolean allDone = true;
boolean hasFailures = false;
StringBuilder failureReason = new StringBuilder();
Map<String, Object> output = new HashMap<>();
task.getOutputData().put("iteration", task.getIteration());
LOGGER.debug("iteration value: {}", task.getOutputData().get("iteration"));

/*
* Get the latest set of tasks (the ones that have the highest retry count). We don't want to evaluate any tasks
* that have already failed if there is a more current one (a later retry count).
*/
Map<String, Task> relevantTasks = new LinkedHashMap<>();
Task relevantTask = null;
LOGGER.debug("workflow.getTasks() size: {}", workflow.getTasks().size());
Task relevantTask;
for (Task t : workflow.getTasks()) {
if (task.getWorkflowTask()
if (doWhileTaskModel
.getWorkflowTask()
.has(TaskUtils.removeIterationFromTaskRefName(t.getReferenceTaskName()))
&& !task.getReferenceTaskName().equals(t.getReferenceTaskName())) {
&& !doWhileTaskModel.getReferenceTaskName().equals(t.getReferenceTaskName())
&& doWhileTaskModel.getIteration() == t.getIteration()) {
relevantTask = relevantTasks.get(t.getReferenceTaskName());
if (relevantTask == null || t.getRetryCount() > relevantTask.getRetryCount()) {
relevantTasks.put(t.getReferenceTaskName(), t);
}
}
}
Collection<Task> loopOver = relevantTasks.values();
Collection<Task> loopOverTasks = relevantTasks.values();

for (Task loopOverTask : loopOver) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Workflow {} waiting for tasks {} to complete iteration {}",
workflow.getWorkflowId(),
loopOverTasks.stream()
.map(Task::getReferenceTaskName)
.collect(Collectors.toList()),
doWhileTaskModel.getIteration());
}

// if the loopOverTasks collection is empty, no tasks inside the loop have been scheduled.
// so schedule it and exit the method.
if (loopOverTasks.isEmpty()) {
doWhileTaskModel.setIteration(1);
doWhileTaskModel.getOutputData().put("iteration", doWhileTaskModel.getIteration());
return scheduleNextIteration(doWhileTaskModel, workflow, workflowExecutor);
}

for (Task loopOverTask : loopOverTasks) {
Status taskStatus = loopOverTask.getStatus();
hasFailures = !taskStatus.isSuccessful();
if (hasFailures) {
Expand All @@ -102,112 +119,151 @@ public boolean execute(Workflow workflow, Task task, WorkflowExecutor workflowEx
output.put(
TaskUtils.removeIterationFromTaskRefName(loopOverTask.getReferenceTaskName()),
loopOverTask.getOutputData());
allDone = taskStatus.isTerminal();
if (!allDone || hasFailures) {
if (hasFailures) {
break;
}
}
task.getOutputData().put(String.valueOf(task.getIteration()), output);
LOGGER.debug(
"task.getOutputData() map for task: taskId:{}, taskReferenceName:{}",
task.getTaskId(),
task.getReferenceTaskName());
for (Map.Entry<String, Object> entry : task.getOutputData().entrySet()) {
LOGGER.debug(entry.getKey() + "=" + entry.getValue());
}
doWhileTaskModel
.getOutputData()
.put(String.valueOf(doWhileTaskModel.getIteration()), output);

if (hasFailures) {
LOGGER.debug(
"taskid {} failed in {} iteration", task.getTaskId(), task.getIteration() + 1);
return updateLoopTask(task, Status.FAILED, failureReason.toString());
} else if (!allDone) {
"Task {} failed in {} iteration",
doWhileTaskModel.getTaskId(),
doWhileTaskModel.getIteration() + 1);
return markTaskFailure(doWhileTaskModel, Status.FAILED, failureReason.toString());
}

if (!isIterationComplete(doWhileTaskModel, relevantTasks)) {
// current iteration is not complete (all tasks inside the loop are not terminal)
return false;
}

// if we are here, the iteration is complete, and we need to check if there is a next
// iteration by evaluating the loopCondition
boolean shouldContinue;
try {
shouldContinue = getEvaluatedCondition(workflow, task, workflowExecutor);
LOGGER.debug("taskid {} condition evaluated to {}", task.getTaskId(), shouldContinue);
shouldContinue = evaluateCondition(workflow, doWhileTaskModel);
LOGGER.debug(
"Task {} condition evaluated to {}",
doWhileTaskModel.getTaskId(),
shouldContinue);
if (shouldContinue) {
if (task.getIteration() == MAX_ALLOWED_ITERATION) {
if (doWhileTaskModel.getIteration() == MAX_ALLOWED_ITERATION) {
String message =
String.format(
"Terminating Loop <%s>. Maximum %d iteration is allowed for task id %s",
task.getReferenceTaskName(),
doWhileTaskModel.getReferenceTaskName(),
MAX_ALLOWED_ITERATION,
task.getTaskId());
doWhileTaskModel.getTaskId());
LOGGER.error(message);
LOGGER.error("Marking task {} failed with error.", task.getTaskId());
return updateLoopTask(task, Status.FAILED_WITH_TERMINAL_ERROR, message);
LOGGER.error(
"Marking task {} failed with error.", doWhileTaskModel.getTaskId());
return markTaskFailure(
doWhileTaskModel, Status.FAILED_WITH_TERMINAL_ERROR, message);
}
task.setIteration(task.getIteration() + 1);
return scheduleNextIteration(task, workflow, workflowExecutor);
doWhileTaskModel.setIteration(doWhileTaskModel.getIteration() + 1);
doWhileTaskModel.getOutputData().put("iteration", doWhileTaskModel.getIteration());
return scheduleNextIteration(doWhileTaskModel, workflow, workflowExecutor);
} else {
LOGGER.debug(
"taskid {} took {} iterations to complete",
task.getTaskId(),
task.getIteration() + 1);
return markLoopTaskSuccess(task);
"Task {} took {} iterations to complete",
doWhileTaskModel.getTaskId(),
doWhileTaskModel.getIteration() + 1);
return markTaskSuccess(doWhileTaskModel);
}
} catch (ScriptException e) {
String message =
String.format(
"Unable to evaluate condition %s , exception %s",
task.getWorkflowTask().getLoopCondition(), e.getMessage());
"Unable to evaluate condition %s, exception %s",
doWhileTaskModel.getWorkflowTask().getLoopCondition(), e.getMessage());
LOGGER.error(message);
LOGGER.error("Marking task {} failed with error.", task.getTaskId());
return updateLoopTask(task, Status.FAILED_WITH_TERMINAL_ERROR, message);
return markTaskFailure(doWhileTaskModel, Status.FAILED_WITH_TERMINAL_ERROR, message);
}
}

boolean scheduleNextIteration(Task task, Workflow workflow, WorkflowExecutor workflowExecutor) {
/**
* Check if all tasks in the current iteration have reached terminal state.
*
* @param doWhileTaskModel The {@link Task} of DO_WHILE.
* @param referenceNameToModel Map of taskReferenceName to {@link Task}.
* @return true if all tasks in DO_WHILE.loopOver are in <code>referenceNameToModel</code> and
* reached terminal state.
*/
private boolean isIterationComplete(
Task doWhileTaskModel, Map<String, Task> referenceNameToModel) {
List<WorkflowTask> workflowTasksInsideDoWhile =
doWhileTaskModel.getWorkflowTask().getLoopOver();
int iteration = doWhileTaskModel.getIteration();
boolean allTasksTerminal = true;
for (WorkflowTask workflowTaskInsideDoWhile : workflowTasksInsideDoWhile) {
String taskReferenceName =
TaskUtils.appendIteration(
workflowTaskInsideDoWhile.getTaskReferenceName(), iteration);
if (referenceNameToModel.containsKey(taskReferenceName)) {
Task taskModel = referenceNameToModel.get(taskReferenceName);
if (!taskModel.getStatus().isTerminal()) {
allTasksTerminal = false;
break;
}
} else {
allTasksTerminal = false;
break;
}
}

if (!allTasksTerminal) {
// Cases where tasks directly inside loop over are not completed.
// loopOver -> [task1 -> COMPLETED, task2 -> IN_PROGRESS]
return false;
}

// Check all the tasks in referenceNameToModel are completed or not. These are set of tasks
// which are not directly inside loopOver tasks, but they are under hierarchy
// loopOver -> [decisionTask -> COMPLETED [ task1 -> COMPLETED, task2 -> IN_PROGRESS]]
return referenceNameToModel.values().stream()
.noneMatch(taskModel -> !taskModel.getStatus().isTerminal());
}

boolean scheduleNextIteration(
Task doWhileTaskModel, Workflow workflow, WorkflowExecutor workflowExecutor) {
LOGGER.debug(
"Scheduling loop tasks for taskid {} as condition {} evaluated to true",
task.getTaskId(),
task.getWorkflowTask().getLoopCondition());
workflowExecutor.scheduleNextIteration(task, workflow);
"Scheduling loop tasks for task {} as condition {} evaluated to true",
doWhileTaskModel.getTaskId(),
doWhileTaskModel.getWorkflowTask().getLoopCondition());
workflowExecutor.scheduleNextIteration(doWhileTaskModel, workflow);
return true; // Return true even though status not changed. Iteration has to be updated in
// execution DAO.
}

boolean updateLoopTask(Task task, Status status, String failureReason) {
task.setReasonForIncompletion(failureReason);
task.setStatus(status);
boolean markTaskFailure(Task taskModel, Status status, String failureReason) {
LOGGER.error("Marking task {} failed with error.", taskModel.getTaskId());
taskModel.setReasonForIncompletion(failureReason);
taskModel.setStatus(status);
return true;
}

boolean markLoopTaskSuccess(Task task) {
boolean markTaskSuccess(Task taskModel) {
LOGGER.debug(
"taskid {} took {} iterations to complete",
task.getTaskId(),
task.getIteration() + 1);
task.setStatus(Status.COMPLETED);
"Task {} took {} iterations to complete",
taskModel.getTaskId(),
taskModel.getIteration() + 1);
taskModel.setStatus(Status.COMPLETED);
return true;
}

@VisibleForTesting
boolean getEvaluatedCondition(Workflow workflow, Task task, WorkflowExecutor workflowExecutor)
throws ScriptException {
TaskDef taskDefinition = null;
try {
taskDefinition = workflowExecutor.getTaskDefinition(task);
} catch (TerminateWorkflowException e) {
// It is ok to not have a task definition for a DO_WHILE task
}

Map<String, Object> taskInput =
boolean evaluateCondition(Workflow workflow, Task task) throws ScriptException {
TaskDef taskDefinition = task.getTaskDefinition().orElse(null);
// Use paramUtils to compute the task input
Map<String, Object> conditionInput =
parametersUtils.getTaskInputV2(
task.getWorkflowTask().getInputParameters(),
workflow,
task.getTaskId(),
taskDefinition);
taskInput.put(task.getReferenceTaskName(), task.getOutputData());
LOGGER.debug(
"taskInput map for task: taskId:{}, taskReferenceName:{}",
task.getTaskId(),
task.getReferenceTaskName());
for (Map.Entry<String, Object> entry : taskInput.entrySet()) {
LOGGER.debug(entry.getKey() + "=" + entry.getValue());
}
conditionInput.put(task.getReferenceTaskName(), task.getOutputData());
List<Task> loopOver =
workflow.getTasks().stream()
.filter(
Expand All @@ -223,17 +279,18 @@ boolean getEvaluatedCondition(Workflow workflow, Task task, WorkflowExecutor wor
.collect(Collectors.toList());

for (Task loopOverTask : loopOver) {
taskInput.put(
conditionInput.put(
TaskUtils.removeIterationFromTaskRefName(loopOverTask.getReferenceTaskName()),
loopOverTask.getOutputData());
}

String condition = task.getWorkflowTask().getLoopCondition();
boolean shouldContinue = false;
boolean result = false;
if (condition != null) {
LOGGER.debug("Condition: {} is being evaluated", condition);
// Evaluate the expression by using the Nashhorn based script evaluator
shouldContinue = ScriptEvaluator.evalBool(condition, taskInput);
// Evaluate the expression by using the Nashorn based script evaluator
result = ScriptEvaluator.evalBool(condition, conditionInput);
}
return shouldContinue;
return result;
}
}

0 comments on commit 3e4e6e7

Please sign in to comment.