Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Do_while task relevant loop over task calculation fix #3351

Merged
merged 10 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,18 @@ private boolean isIterationComplete(
break;
}
}
return allTasksTerminal;

if (!allTasksTerminal) {
// Cases where tasks directly inside loop over are not completed.
// loopOver -> [task1 -> COMPLETED, task2 -> IN_PROGRESS]
return false;
}

// Check all the tasks in referenceNameToModel are completed or not. These are set of tasks
// which are not directly inside loopOver tasks, but they are under hierarchy
// loopOver -> [decisionTask -> COMPLETED [ task1 -> COMPLETED, task2 -> IN_PROGRESS]]
return referenceNameToModel.values().stream()
.noneMatch(taskModel -> !taskModel.getStatus().isTerminal());
}

boolean scheduleNextIteration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired

import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.metadata.tasks.TaskDef
import com.netflix.conductor.common.metadata.tasks.TaskResult
import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.common.utils.TaskUtils
import com.netflix.conductor.core.execution.tasks.Join
Expand All @@ -41,6 +42,7 @@ class DoWhileSpec extends AbstractSpecification {
'do_while_sub_workflow_integration_test.json',
'do_while_five_loop_over_integration_test.json',
'do_while_system_tasks.json',
'do_while_with_decision_task.json',
'do_while_set_variable_fix.json')
}

Expand Down Expand Up @@ -1144,6 +1146,91 @@ class DoWhileSpec extends AbstractSpecification {
}
}

def "Test workflow with Do While task contains decision task"() {
given: "The loop condition is set to use set variable"
def workflowInput = new HashMap()
def array = new ArrayList()
array.add(1);
array.add(2);
workflowInput['list'] = array

when: "A do_while workflow is started"
def workflowInstanceId = startWorkflow("DO_While_with_Decision_task", 1, "looptest", workflowInput, null)

then: "Verify that the loop over task is waiting for the wait task to get completed"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 4
tasks[0].taskType == 'DO_WHILE'
tasks[0].status == Task.Status.IN_PROGRESS
tasks[1].taskType == 'INLINE'
tasks[1].status == Task.Status.COMPLETED
tasks[2].taskType == 'SWITCH'
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == 'WAIT'
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "The wait task is completed"
def waitTask = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).tasks[3]
waitTask.status = Task.Status.COMPLETED
workflowExecutor.updateTask(new TaskResult(waitTask))

then: "Verify that the next iteration is scheduled and workflow is in running state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 8
tasks[0].taskType == 'DO_WHILE'
tasks[0].status == Task.Status.IN_PROGRESS
tasks[0].iteration == 2
tasks[1].taskType == 'INLINE'
tasks[1].status == Task.Status.COMPLETED
tasks[2].taskType == 'SWITCH'
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == 'WAIT'
tasks[3].status == Task.Status.COMPLETED
tasks[4].taskType == 'INLINE'
tasks[4].status == Task.Status.COMPLETED
tasks[5].taskType == 'INLINE'
tasks[5].status == Task.Status.COMPLETED
tasks[6].taskType == 'SWITCH'
tasks[6].status == Task.Status.COMPLETED
tasks[7].taskType == 'WAIT'
tasks[7].status == Task.Status.IN_PROGRESS
}

when: "The wait task is completed"
waitTask = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).tasks[7]
waitTask.status = Task.Status.COMPLETED
workflowExecutor.updateTask(new TaskResult(waitTask))

then: "Verify that the workflow is completed"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 9
tasks[0].taskType == 'DO_WHILE'
tasks[0].status == Task.Status.COMPLETED
tasks[0].iteration == 2
tasks[1].taskType == 'INLINE'
tasks[1].status == Task.Status.COMPLETED
tasks[2].taskType == 'SWITCH'
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == 'WAIT'
tasks[3].status == Task.Status.COMPLETED
tasks[4].taskType == 'INLINE'
tasks[4].status == Task.Status.COMPLETED
tasks[5].taskType == 'INLINE'
tasks[5].status == Task.Status.COMPLETED
tasks[6].taskType == 'SWITCH'
tasks[6].status == Task.Status.COMPLETED
tasks[7].taskType == 'WAIT'
tasks[7].status == Task.Status.COMPLETED
tasks[8].taskType == 'INLINE'
tasks[8].status == Task.Status.COMPLETED
}
}


void verifyTaskIteration(Task task, int iteration) {
assert task.getReferenceTaskName().endsWith(TaskUtils.getLoopOverTaskRefNameSuffix(task.getIteration()))
assert task.iteration == iteration
Expand Down
62 changes: 62 additions & 0 deletions test-harness/src/test/resources/do_while_with_decision_task.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"name": "DO_While_with_Decision_task",
"description": "Program for testing loop behaviour",
"version": 1,
"schemaVersion": 2,
"ownerEmail": "xyz@company.eu",
"tasks": [
{
"name": "LoopTask",
"taskReferenceName": "LoopTask",
"type": "DO_WHILE",
"inputParameters": {
"list": "${workflow.input.list}"
},
"loopCondition": "$.LoopTask['iteration'] < $.list.length",
"loopOver": [
{
"name": "GetNumberAtIndex",
"taskReferenceName": "GetNumberAtIndex",
"type": "INLINE",
"inputParameters": {
"evaluatorType": "javascript",
"list": "${workflow.input.list}",
"iterator": "${LoopTask.output.iteration}",
"expression": "function getElement() { return $.list.get($.iterator - 1); } getElement();"
}
},
{
"name": "SwitchTask",
"taskReferenceName": "SwitchTask",
"type": "SWITCH",
"evaluatorType": "javascript",
"inputParameters": {
"param": "${GetNumberAtIndex.output.result}"
},
"expression": "$.param > 0",
"decisionCases": {
"true": [
{
"name": "WaitTask",
"taskReferenceName": "WaitTask",
"type": "WAIT",
"inputParameters": {
}
},
{
"name": "ComputeNumber",
"taskReferenceName": "ComputeNumber",
"type": "INLINE",
"inputParameters": {
"evaluatorType": "javascript",
"number": "${GetNumberAtIndex.output.result.number}",
"expression": "function compute() { return $.number+10; } compute();"
}
}
]
}
}
]
}
]
}