diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java index 3571c46119..60484eeb28 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java @@ -300,6 +300,9 @@ public WorkflowTask getNextTask(String taskReferenceName){ WorkflowTask nextTask = task.next(taskReferenceName, null); if(nextTask != null){ return nextTask; + } else if (TaskType.DO_WHILE.name().equals(task.getType()) && !task.getTaskReferenceName().equals(taskReferenceName) && task.has(taskReferenceName)) { + // If the task is child of Loop Task and at last position, return null. + return null; } if(task.getTaskReferenceName().equals(taskReferenceName) || task.has(taskReferenceName)){ diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index f1918d829f..3f52e575a9 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -192,7 +192,7 @@ private DeciderOutcome decide(final Workflow workflow, List preScheduledTa if (!pendingTask.isExecuted() && !pendingTask.isRetried() && pendingTask.getStatus().isTerminal()) { pendingTask.setExecuted(true); List nextTasks = getNextTask(workflow, pendingTask); - if (pendingTask.isLoopOverTask() && !nextTasks.isEmpty()) { + if (pendingTask.isLoopOverTask() && !TaskType.DO_WHILE.name().equals(pendingTask.getTaskType()) && !nextTasks.isEmpty()) { nextTasks = filterNextLoopOverTasks(nextTasks, pendingTask, workflow); } nextTasks.forEach(nextTask -> tasksToBeScheduled.putIfAbsent(nextTask.getReferenceTaskName(), nextTask)); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/DoWhile.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/DoWhile.java index b057f4fb16..34f229ee98 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/DoWhile.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/DoWhile.java @@ -22,6 +22,7 @@ import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.Task.Status; import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.utils.TaskUtils; import com.netflix.conductor.core.events.ScriptEvaluator; @@ -65,7 +66,7 @@ public boolean execute(Workflow workflow, Task task, WorkflowExecutor workflowEx StringBuilder failureReason = new StringBuilder(); Map output = new HashMap<>(); task.getOutputData().put("iteration", task.getIteration()); - List loopOver = workflow.getTasks().stream().filter(t -> (t.getReferenceTaskName().endsWith(TaskUtils.getLoopOverTaskRefNameSuffix(task.getIteration()))) && t.isLoopOverTask()).collect(Collectors.toList()); + List loopOver = workflow.getTasks().stream().filter(t -> (task.getWorkflowTask().has(TaskUtils.removeIterationFromTaskRefName(t.getReferenceTaskName())) && !task.getReferenceTaskName().equals(t.getReferenceTaskName()))).collect(Collectors.toList()); for (Task loopOverTask : loopOver) { Status taskStatus = loopOverTask.getStatus(); @@ -73,7 +74,7 @@ public boolean execute(Workflow workflow, Task task, WorkflowExecutor workflowEx if (hasFailures) { failureReason.append(loopOverTask.getReasonForIncompletion()).append(" "); } - output.put(loopOverTask.getReferenceTaskName(), loopOverTask.getOutputData()); + output.put(TaskUtils.removeIterationFromTaskRefName(loopOverTask.getReferenceTaskName()), loopOverTask.getOutputData()); allDone = taskStatus.isTerminal(); if (!allDone || hasFailures) { break; @@ -129,10 +130,10 @@ boolean getEvaluatedCondition(Workflow workflow, Task task, WorkflowExecutor wor TaskDef taskDefinition = workflowExecutor.getTaskDefinition(task); Map taskInput = parametersUtils.getTaskInputV2(task.getWorkflowTask().getInputParameters(), workflow, task.getTaskId(), taskDefinition); taskInput.put(task.getReferenceTaskName(), task.getOutputData()); - List loopOver = workflow.getTasks().stream().filter(t -> (t.getReferenceTaskName().endsWith(TaskUtils.getLoopOverTaskRefNameSuffix(task.getIteration()))) && t.isLoopOverTask()).collect(Collectors.toList()); + List loopOver = workflow.getTasks().stream().filter(t -> (task.getWorkflowTask().has(TaskUtils.removeIterationFromTaskRefName(t.getReferenceTaskName())) && !task.getReferenceTaskName().equals(t.getReferenceTaskName()))).collect(Collectors.toList()); for (Task loopOverTask : loopOver) { - taskInput.put(loopOverTask.getReferenceTaskName().split(TaskUtils.getLoopOverTaskRefNameSuffix(task.getIteration()))[0], loopOverTask.getOutputData()); + taskInput.put(TaskUtils.removeIterationFromTaskRefName(loopOverTask.getReferenceTaskName()), loopOverTask.getOutputData()); } String condition = task.getWorkflowTask().getLoopCondition(); boolean shouldContinue = false; diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/DoWhileTest.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/DoWhileTest.java index e8184f4914..2e89eb508b 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/DoWhileTest.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/DoWhileTest.java @@ -81,21 +81,21 @@ public void setup() { provider = spy(new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService, workflowStatusListener, executionDAOFacade, config, executionLockService)); loopWorkflowTask1 = new WorkflowTask(); - loopWorkflowTask1.setTaskReferenceName("task1__1"); - loopWorkflowTask1.setName("task1__1"); + loopWorkflowTask1.setTaskReferenceName("task1"); + loopWorkflowTask1.setName("task1"); loopWorkflowTask2 = new WorkflowTask(); - loopWorkflowTask2.setTaskReferenceName("task2__1"); - loopWorkflowTask2.setName("task2__1"); + loopWorkflowTask2.setTaskReferenceName("task2"); + loopWorkflowTask2.setName("task2"); task1 = new Task(); task1.setWorkflowTask(loopWorkflowTask1); - task1.setReferenceTaskName("task1__1"); + task1.setReferenceTaskName("task1"); task1.setStatus(Task.Status.COMPLETED); task1.setTaskType(TaskType.HTTP.name()); task1.setInputData(new HashMap<>()); task1.setIteration(1); task2 = new Task(); task2.setWorkflowTask(loopWorkflowTask2); - task2.setReferenceTaskName("task2__1"); + task2.setReferenceTaskName("task2"); task2.setStatus(Task.Status.COMPLETED); task2.setTaskType(TaskType.HTTP.name()); task2.setInputData(new HashMap<>()); @@ -107,6 +107,7 @@ public void setup() { loopTask.setIteration(1); loopWorkflowTask = new WorkflowTask(); loopWorkflowTask.setTaskReferenceName("loopTask"); + loopWorkflowTask.setType(TaskType.DO_WHILE.name()); loopWorkflowTask.setName("loopTask"); loopWorkflowTask.setLoopCondition("if ($.loopTask['iteration'] < 1) { false; } else { true; }"); loopWorkflowTask.setLoopOver(Arrays.asList(task1.getWorkflowTask(), task2.getWorkflowTask())); diff --git a/docs/docs/configuration/systask.md b/docs/docs/configuration/systask.md index 571597546f..06e874a14b 100644 --- a/docs/docs/configuration/systask.md +++ b/docs/docs/configuration/systask.md @@ -576,7 +576,7 @@ Each iteration of loop over task will be scheduled as taskRefname__iteration. It Do while task output number of iterations with iteration as key and value as number of iterations. Each iteration's output will be stored as, iteration as key and loopover task's output as value Taskname which contains arithmetic operator must not be used in loopCondition. Any of loopOver task can be reference outside do while task same way other tasks are referenced. To reference specific iteration's output, ```$.LoopTask['iteration]['first_task']``` -Do while task does NOT support domain or isolation group execution. +Do while task does NOT support domain or isolation group execution. Nesting of DO_WHILE task is not supported. Loopover task must not be reused in neither workflow nor another DO_WHILE task. **Parameters:** diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java index db3a025129..de2eea8d3f 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java @@ -724,7 +724,7 @@ public void testDoWhileTwoIteration() throws Exception { TaskDef taskDef = new TaskDef(); taskDef.setName("http1"); - taskDef.setTimeoutSeconds(2); + taskDef.setTimeoutSeconds(5); taskDef.setRetryCount(1); taskDef.setTimeoutPolicy(TimeoutPolicy.RETRY); taskDef.setRetryDelaySeconds(10); @@ -732,7 +732,7 @@ public void testDoWhileTwoIteration() throws Exception { TaskDef taskDef2 = new TaskDef(); taskDef2.setName("http0"); - taskDef2.setTimeoutSeconds(2); + taskDef2.setTimeoutSeconds(5); taskDef2.setRetryCount(1); taskDef2.setTimeoutPolicy(TimeoutPolicy.RETRY); taskDef2.setRetryDelaySeconds(10); @@ -740,12 +740,20 @@ public void testDoWhileTwoIteration() throws Exception { TaskDef taskDef1 = new TaskDef(); taskDef1.setName("http2"); - taskDef1.setTimeoutSeconds(2); + taskDef1.setTimeoutSeconds(5); taskDef1.setRetryCount(1); taskDef1.setTimeoutPolicy(TimeoutPolicy.RETRY); taskDef1.setRetryDelaySeconds(10); metadataService.registerTaskDef(Arrays.asList(taskDef1)); + TaskDef taskDef3 = new TaskDef(); + taskDef1.setName("http3"); + taskDef1.setTimeoutSeconds(5); + taskDef1.setRetryCount(1); + taskDef1.setTimeoutPolicy(TimeoutPolicy.RETRY); + taskDef1.setRetryDelaySeconds(10); + metadataService.registerTaskDef(Arrays.asList(taskDef3)); + Map input = new HashMap<>(); String workflowId = startOrLoadWorkflowExecution(DO_WHILE_WF + "_2", 1, "looptest", input, null, null); System.out.println("testDoWhile.wfid=" + workflowId); @@ -819,6 +827,18 @@ public void testDoWhileTwoIteration() throws Exception { task = workflowExecutionService.poll("JOIN", "test"); assertNull(task); + workflow = workflowExecutionService.getExecutionStatus(workflowId, true); + assertNotNull(workflow); + assertEquals("Found " + workflow.getTasks(), RUNNING, workflow.getStatus()); + + task = workflowExecutionService.poll("HTTP", "test"); + assertNotNull(task); + assertTrue(task.getReferenceTaskName().endsWith(TaskUtils.getLoopOverTaskRefNameSuffix(task.getIteration()))); + assertTrue(workflowExecutionService.ackTaskReceived(task.getTaskId())); + + task.setStatus(COMPLETED); + workflowExecutionService.updateTask(task); + workflow = workflowExecutionService.getExecutionStatus(workflowId, true); assertNotNull(workflow); assertEquals("Found " + workflow.getTasks(), WorkflowStatus.COMPLETED, workflow.getStatus()); @@ -1784,7 +1804,7 @@ private void createDoWhileWorkflowWithIteration(int iteration, boolean isInputPa TaskDef taskDef = new TaskDef(); taskDef.setName("loopTask"); - taskDef.setTimeoutSeconds(2); + taskDef.setTimeoutSeconds(200); taskDef.setRetryCount(1); taskDef.setTimeoutPolicy(TimeoutPolicy.RETRY); taskDef.setRetryDelaySeconds(10); @@ -1836,6 +1856,32 @@ private void createDoWhileWorkflowWithIteration(int iteration, boolean isInputPa } workflowDef.getTasks().add(loopTask); + + if (iteration == 2 && isInputParameter == false) { + TaskDef taskDef2 = new TaskDef(); + taskDef2.setName("loopTask2"); + taskDef2.setTimeoutSeconds(200); + taskDef2.setRetryCount(3); + taskDef2.setTimeoutPolicy(TimeoutPolicy.RETRY); + taskDef2.setRetryDelaySeconds(10); + + metadataService.registerTaskDef(Arrays.asList(taskDef2)); + WorkflowTask loopTask2 = new WorkflowTask(); + loopTask2.setType(TaskType.DO_WHILE.name()); + loopTask2.setTaskReferenceName("loopTask2"); + loopTask2.setName("loopTask2"); + loopTask2.setWorkflowTaskType(TaskType.DO_WHILE); + loopTask2.setInputParameters(input); + WorkflowTask http3 = new WorkflowTask(); + http3.setName("http3"); + http3.setInputParameters(inputParams1); + http3.setTaskReferenceName("http3"); + http3.setWorkflowTaskType(TaskType.HTTP); + loopTask2.setLoopCondition("if ($.loopTask2['iteration'] < 1) { true; } else { false; }"); + loopTask2.getLoopOver().add(http3); + workflowDef.getTasks().add(loopTask2); + } + metadataService.registerWorkflowDef(workflowDef); } diff --git a/test-harness/src/test/resources/integration/scenarios/legacy/DoWhileTest_2.json b/test-harness/src/test/resources/integration/scenarios/legacy/DoWhileTest_2.json index 027a8e5c33..65f1f6d254 100644 --- a/test-harness/src/test/resources/integration/scenarios/legacy/DoWhileTest_2.json +++ b/test-harness/src/test/resources/integration/scenarios/legacy/DoWhileTest_2.json @@ -57,10 +57,6 @@ "correlationId": "looptest", "pollCount": 0, "taskDefName": "DO_WHILE", - "scheduledTime": 1534983505159, - "startTime": 1534983505161, - "endTime": 1534983505164, - "updateTime": 1534983505164, "startDelayInSeconds": 0, "retried": false, "executed": false, @@ -135,6 +131,92 @@ "rateLimitPerSecond": 0, "taskStatus": "SCHEDULED", "queueWaitTime": 2 + },{ + "taskType": "HTTP", + "status": "SCHEDULED", + "inputData": { + "p1": null, + "p2": null + }, + "referenceTaskName": "http3__1", + "iteration": 1, + "retryCount": 0, + "seq": 2, + "correlationId": "looptest", + "pollCount": 0, + "taskDefName": "http0", + "scheduledTime": 1534983505160, + "startTime": 0, + "endTime": 0, + "updateTime": 1534983505160, + "startDelayInSeconds": 0, + "retried": false, + "executed": false, + "callbackFromWorker": true, + "responseTimeoutSeconds": 3600, + "workflowInstanceId": "WORKFLOW_INSTANCE_ID", + "workflowType": "DoWhileTest_2", + "taskId": "feb6c6c2-0974-4e78-b4df-df4389c46aea", + "callbackAfterSeconds": 0, + "workflowTask": { + "name": "http3__1", + "taskReferenceName": "http3__1", + "iteration": 1, + "inputParameters": { + "tp1": "workflow.input.param1" + }, + "type": "HTTP", + "startDelay": 0, + "optional": false + }, + "rateLimitPerSecond": 0, + "taskStatus": "SCHEDULED", + "queueWaitTime": 0 + }, + { + "taskType": "DO_WHILE", + "status": "IN_PROGRESS", + "referenceTaskName": "loopTask2", + "iteration": 2, + "retryCount": 0, + "seq": 11, + "correlationId": "looptest", + "pollCount": 0, + "taskDefName": "DO_WHILE", + "scheduledTime": 1534983505159, + "startDelayInSeconds": 0, + "retried": false, + "executed": false, + "callbackFromWorker": true, + "responseTimeoutSeconds": 0, + "workflowInstanceId": "WORKFLOW_INSTANCE_ID", + "workflowType": "DoWhileTest_2", + "taskId": "3e66a03d-08ef-4e71-8588-49d6c82bc75d", + "callbackAfterSeconds": 0, + "workflowTask": { + "taskReferenceName": "loopTask2", + "iteration": 1, + "name": "loopTask2", + "type": "DO_WHILE", + "loopCondition": "if ($.loopTask2['iteration'] < 1 ) { true;} else {false;} ", + "loopOver": [ + { + "name": "http3__1", + "taskReferenceName": "http3__1", + "inputParameters": { + "tp1": "workflow.input.param1" + }, + "type": "HTTP", + "startDelay": 0, + "optional": false + } + ], + "startDelay": 0, + "optional": false + }, + "rateLimitPerSecond": 0, + "taskStatus": "SCHEDULED", + "queueWaitTime": 2 } ], "workflowType": "DoWhileTest_2",