diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java index 826fbc95ff..684ca051fc 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java @@ -108,7 +108,7 @@ public void cancel(Workflow workflow, Task task, WorkflowExecutor provider) { if(StringUtils.isEmpty(workflowId)) { return; } - Workflow subWorkflow = provider.getWorkflow(workflowId, false); + Workflow subWorkflow = provider.getWorkflow(workflowId, true); subWorkflow.setStatus(WorkflowStatus.TERMINATED); provider.terminateWorkflow(subWorkflow, "Parent workflow has been terminated with status " + workflow.getStatus(), null); } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java index 670385795f..764e02ef04 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java @@ -220,7 +220,7 @@ public void testCancelWithWorkflowId() { when(workflowExecutor.startWorkflow(eq("UnitWorkFlow"), eq(2), eq(inputData), eq(null), any(), any(), any(), eq(null), eq(null))) .thenReturn("workflow_1"); - when(workflowExecutor.getWorkflow(eq("sub-workflow-id"), eq(false))) + when(workflowExecutor.getWorkflow(eq("sub-workflow-id"), eq(true))) .thenReturn(subWorkflowInstance); workflowInstance.setStatus(Workflow.WorkflowStatus.TIMED_OUT); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java index 9a01c71813..8e9c74f4f0 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java @@ -81,6 +81,7 @@ import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED; import static com.netflix.conductor.common.metadata.tasks.Task.Status.TIMED_OUT; import static com.netflix.conductor.common.run.Workflow.WorkflowStatus.RUNNING; +import static com.netflix.conductor.common.run.Workflow.WorkflowStatus.TERMINATED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -139,6 +140,8 @@ public abstract class AbstractWorkflowServiceTest { private static final String LINEAR_WORKFLOW_T1_T2_SW = "junit_test_wf_sw"; + private static final String WORKFLOW_MULTI_LEVEL_SW = "junit_test_multi_level_sw"; + private static final String LONG_RUNNING = "longRunningWf"; private static final String TEST_WORKFLOW_NAME_3 = "junit_test_wf3"; @@ -1594,13 +1597,63 @@ public void testSimpleWorkflow() { assertEquals("task1.Done", workflow.getOutput().get("o3")); } + @Test + public void testTerminateMultiLevelWorkflow() { + createWorkflowWthMultiLevelSubWorkflows(); + + Map workflowInput = new HashMap<>(); + workflowInput.put("param1", "p1 value"); + workflowInput.put("param2", "p2 value"); + String workflowId = startOrLoadWorkflowExecution(WORKFLOW_MULTI_LEVEL_SW, 1, "", workflowInput, null, null); + assertNotNull(workflowId); + + Workflow workflow = workflowExecutionService.getExecutionStatus(workflowId, true); + assertNotNull(workflow); + assertEquals(RUNNING, workflow.getStatus()); + assertEquals(1, workflow.getTasks().size()); + + String level1SubWorkflowId = workflow.getTasks().get(0).getOutputData().get(SubWorkflow.SUB_WORKFLOW_ID).toString(); + Workflow level1SubWorkflow = workflowExecutionService.getExecutionStatus(level1SubWorkflowId, true); + assertNotNull(level1SubWorkflow); + assertEquals(RUNNING, level1SubWorkflow.getStatus()); + assertEquals(1, level1SubWorkflow.getTasks().size()); + + String level2SubWorkflowId = level1SubWorkflow.getTasks().get(0).getOutputData().get(SubWorkflow.SUB_WORKFLOW_ID).toString(); + Workflow level2SubWorkflow = workflowExecutionService.getExecutionStatus(level2SubWorkflowId, true); + assertNotNull(level2SubWorkflow); + assertEquals(RUNNING, level2SubWorkflow.getStatus()); + assertEquals(1, level2SubWorkflow.getTasks().size()); + + String level3SubWorkflowId = level2SubWorkflow.getTasks().get(0).getOutputData().get(SubWorkflow.SUB_WORKFLOW_ID).toString(); + Workflow level3SubWorkflow = workflowExecutionService.getExecutionStatus(level3SubWorkflowId, true); + assertNotNull(level3SubWorkflow); + assertEquals(RUNNING, level3SubWorkflow.getStatus()); + assertEquals(1, level3SubWorkflow.getTasks().size()); + assertEquals("junit_task_3", level3SubWorkflow.getTasks().get(0).getTaskType()); + + // terminate the top-level parent workflow + workflowExecutor.terminateWorkflow(workflow.getWorkflowId(), "terminate_test"); + + workflow = workflowExecutionService.getExecutionStatus(workflowId, true); + assertEquals(TERMINATED, workflow.getStatus()); + + level1SubWorkflow = workflowExecutionService.getExecutionStatus(level1SubWorkflowId, true); + assertEquals(TERMINATED, level1SubWorkflow.getStatus()); + + level2SubWorkflow = workflowExecutionService.getExecutionStatus(level2SubWorkflowId, true); + assertEquals(TERMINATED, level2SubWorkflow.getStatus()); + + level3SubWorkflow = workflowExecutionService.getExecutionStatus(level3SubWorkflowId, true); + assertEquals(TERMINATED, level3SubWorkflow.getStatus()); + } + @Test public void testSimpleWorkflowWithResponseTimeout() throws Exception { createWFWithResponseTimeout(); String correlationId = "unit_test_1"; - Map workflowInput = new HashMap(); + Map workflowInput = new HashMap<>(); String inputParam1 = "p1 value"; workflowInput.put("param1", inputParam1); workflowInput.put("param2", "p2 value"); @@ -2452,7 +2505,7 @@ private void validate(String wfid, String[] sequence, String[] executedTasks, in Workflow workflow = workflowExecutionService.getExecutionStatus(wfid, true); assertNotNull(workflow); - assertTrue(!workflow.getTasks().isEmpty()); + assertFalse(workflow.getTasks().isEmpty()); if (i < sequence.length - 1) { assertEquals(RUNNING, workflow.getStatus()); } else { @@ -4197,7 +4250,7 @@ public void testEventWorkflow() { Task eventTask = workflow.getTasks().get(0); assertEquals(TaskType.EVENT.name(), eventTask.getTaskType()); assertEquals(COMPLETED, eventTask.getStatus()); - assertTrue(!eventTask.getOutputData().isEmpty()); + assertFalse(eventTask.getOutputData().isEmpty()); assertNotNull(eventTask.getOutputData().get("event_produced")); Task task = workflowExecutionService.poll("junit_task_1", "test"); @@ -5058,6 +5111,79 @@ private void createWFWithResponseTimeout() { metadataService.updateWorkflowDef(def); } + private void createWorkflowWthMultiLevelSubWorkflows() { + final String subWorkflowLevel1 = "junit_sw_level_1"; + final String subWorkflowLevel2 = "junit_sw_level_2"; + final String subWorkflowLevel3 = "junit_sw_level_3"; + + // level 3 + WorkflowDef workflowDef_level3 = new WorkflowDef(); + workflowDef_level3.setName(subWorkflowLevel3); + workflowDef_level3.setDescription(workflowDef_level3.getName()); + workflowDef_level3.setVersion(1); + workflowDef_level3.setSchemaVersion(2); + + LinkedList workflowTasks_level3 = new LinkedList<>(); + WorkflowTask simpleWorkflowTask = new WorkflowTask(); + simpleWorkflowTask.setName("junit_task_3"); + simpleWorkflowTask.setInputParameters(new HashMap<>()); + simpleWorkflowTask.setTaskReferenceName("t1"); + workflowTasks_level3.add(simpleWorkflowTask); + workflowDef_level3.setTasks(workflowTasks_level3); + + metadataService.updateWorkflowDef(workflowDef_level3); + + // level 2 + WorkflowDef workflowDef_level2 = new WorkflowDef(); + workflowDef_level2.setName(subWorkflowLevel2); + workflowDef_level2.setDescription(workflowDef_level2.getName()); + workflowDef_level2.setVersion(1); + workflowDef_level2.setSchemaVersion(2); + + LinkedList workflowTasks_level2 = new LinkedList<>(); + workflowTasks_level2.add(createSubWorkflowTask(subWorkflowLevel3)); + workflowDef_level2.setTasks(workflowTasks_level2); + + metadataService.updateWorkflowDef(workflowDef_level2); + + // level 1 + WorkflowDef workflowDef_level1 = new WorkflowDef(); + workflowDef_level1.setName(subWorkflowLevel1); + workflowDef_level1.setDescription(workflowDef_level1.getName()); + workflowDef_level1.setVersion(1); + workflowDef_level1.setSchemaVersion(2); + + LinkedList workflowTasks_level1 = new LinkedList<>(); + workflowTasks_level1.add(createSubWorkflowTask(subWorkflowLevel2)); + workflowDef_level1.setTasks(workflowTasks_level1); + + metadataService.updateWorkflowDef(workflowDef_level1); + + // top-level parent workflow + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName(WORKFLOW_MULTI_LEVEL_SW); + workflowDef.setDescription(workflowDef.getName()); + workflowDef.setVersion(1); + workflowDef.setInputParameters(Arrays.asList("param1", "param2")); + workflowDef.setSchemaVersion(2); + + LinkedList workflowTasks = new LinkedList<>(); + workflowTasks.add(createSubWorkflowTask(subWorkflowLevel1)); + workflowDef.setTasks(workflowTasks); + + metadataService.updateWorkflowDef(workflowDef); + } + + private WorkflowTask createSubWorkflowTask(String subWorkflowName) { + WorkflowTask subWorkflowTask = new WorkflowTask(); + subWorkflowTask.setType(TaskType.SUB_WORKFLOW.name()); + SubWorkflowParams subWorkflowParams = new SubWorkflowParams(); + subWorkflowParams.setName(subWorkflowName); + subWorkflowTask.setSubWorkflowParam(subWorkflowParams); + subWorkflowTask.setTaskReferenceName(subWorkflowName + "_task"); + return subWorkflowTask; + } + private String runWorkflowWithSubworkflow() { clearWorkflows(); createWorkflowDefForDomain(); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowLegacyMigrationTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowLegacyMigrationTest.java index d47f8823d5..c013a0b4a9 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowLegacyMigrationTest.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowLegacyMigrationTest.java @@ -126,12 +126,18 @@ public void testForkJoinNestedWithSubWorkflow() { @Ignore @Test @Override - public void testTerminateTaskWithFailedStatus() {} + public void testTerminateTaskWithFailedStatus() { + } @Ignore @Test @Override - public void testTerminateTaskWithCompletedStatus() {} - + public void testTerminateTaskWithCompletedStatus() { + } + @Ignore + @Test + @Override + public void testTerminateMultiLevelWorkflow() { + } }