Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
allow parent workflow termination to terminate subworkflows at all le…
Browse files Browse the repository at this point in the history
…vels
  • Loading branch information
apanicker-nflx committed May 22, 2019
1 parent cade890 commit 8a199bc
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 8 deletions.
Expand Up @@ -108,7 +108,7 @@ public void cancel(Workflow workflow, Task task, WorkflowExecutor provider) {
if(StringUtils.isEmpty(workflowId)) {
return;
}
Workflow subWorkflow = provider.getWorkflow(workflowId, false);
Workflow subWorkflow = provider.getWorkflow(workflowId, true);
subWorkflow.setStatus(WorkflowStatus.TERMINATED);
provider.terminateWorkflow(subWorkflow, "Parent workflow has been terminated with status " + workflow.getStatus(), null);
}
Expand Down
Expand Up @@ -220,7 +220,7 @@ public void testCancelWithWorkflowId() {

when(workflowExecutor.startWorkflow(eq("UnitWorkFlow"), eq(2), eq(inputData), eq(null), any(), any(), any(), eq(null), eq(null)))
.thenReturn("workflow_1");
when(workflowExecutor.getWorkflow(eq("sub-workflow-id"), eq(false)))
when(workflowExecutor.getWorkflow(eq("sub-workflow-id"), eq(true)))
.thenReturn(subWorkflowInstance);

workflowInstance.setStatus(Workflow.WorkflowStatus.TIMED_OUT);
Expand Down
Expand Up @@ -81,6 +81,7 @@
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.TIMED_OUT;
import static com.netflix.conductor.common.run.Workflow.WorkflowStatus.RUNNING;
import static com.netflix.conductor.common.run.Workflow.WorkflowStatus.TERMINATED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -139,6 +140,8 @@ public abstract class AbstractWorkflowServiceTest {

private static final String LINEAR_WORKFLOW_T1_T2_SW = "junit_test_wf_sw";

private static final String WORKFLOW_MULTI_LEVEL_SW = "junit_test_multi_level_sw";

private static final String LONG_RUNNING = "longRunningWf";

private static final String TEST_WORKFLOW_NAME_3 = "junit_test_wf3";
Expand Down Expand Up @@ -1594,13 +1597,63 @@ public void testSimpleWorkflow() {
assertEquals("task1.Done", workflow.getOutput().get("o3"));
}

@Test
public void testTerminateMultiLevelWorkflow() {
createWorkflowWthMultiLevelSubWorkflows();

Map<String, Object> workflowInput = new HashMap<>();
workflowInput.put("param1", "p1 value");
workflowInput.put("param2", "p2 value");
String workflowId = startOrLoadWorkflowExecution(WORKFLOW_MULTI_LEVEL_SW, 1, "", workflowInput, null, null);
assertNotNull(workflowId);

Workflow workflow = workflowExecutionService.getExecutionStatus(workflowId, true);
assertNotNull(workflow);
assertEquals(RUNNING, workflow.getStatus());
assertEquals(1, workflow.getTasks().size());

String level1SubWorkflowId = workflow.getTasks().get(0).getOutputData().get(SubWorkflow.SUB_WORKFLOW_ID).toString();
Workflow level1SubWorkflow = workflowExecutionService.getExecutionStatus(level1SubWorkflowId, true);
assertNotNull(level1SubWorkflow);
assertEquals(RUNNING, level1SubWorkflow.getStatus());
assertEquals(1, level1SubWorkflow.getTasks().size());

String level2SubWorkflowId = level1SubWorkflow.getTasks().get(0).getOutputData().get(SubWorkflow.SUB_WORKFLOW_ID).toString();
Workflow level2SubWorkflow = workflowExecutionService.getExecutionStatus(level2SubWorkflowId, true);
assertNotNull(level2SubWorkflow);
assertEquals(RUNNING, level2SubWorkflow.getStatus());
assertEquals(1, level2SubWorkflow.getTasks().size());

String level3SubWorkflowId = level2SubWorkflow.getTasks().get(0).getOutputData().get(SubWorkflow.SUB_WORKFLOW_ID).toString();
Workflow level3SubWorkflow = workflowExecutionService.getExecutionStatus(level3SubWorkflowId, true);
assertNotNull(level3SubWorkflow);
assertEquals(RUNNING, level3SubWorkflow.getStatus());
assertEquals(1, level3SubWorkflow.getTasks().size());
assertEquals("junit_task_3", level3SubWorkflow.getTasks().get(0).getTaskType());

// terminate the top-level parent workflow
workflowExecutor.terminateWorkflow(workflow.getWorkflowId(), "terminate_test");

workflow = workflowExecutionService.getExecutionStatus(workflowId, true);
assertEquals(TERMINATED, workflow.getStatus());

level1SubWorkflow = workflowExecutionService.getExecutionStatus(level1SubWorkflowId, true);
assertEquals(TERMINATED, level1SubWorkflow.getStatus());

level2SubWorkflow = workflowExecutionService.getExecutionStatus(level2SubWorkflowId, true);
assertEquals(TERMINATED, level2SubWorkflow.getStatus());

level3SubWorkflow = workflowExecutionService.getExecutionStatus(level3SubWorkflowId, true);
assertEquals(TERMINATED, level3SubWorkflow.getStatus());
}

@Test
public void testSimpleWorkflowWithResponseTimeout() throws Exception {

createWFWithResponseTimeout();

String correlationId = "unit_test_1";
Map<String, Object> workflowInput = new HashMap<String, Object>();
Map<String, Object> workflowInput = new HashMap<>();
String inputParam1 = "p1 value";
workflowInput.put("param1", inputParam1);
workflowInput.put("param2", "p2 value");
Expand Down Expand Up @@ -2452,7 +2505,7 @@ private void validate(String wfid, String[] sequence, String[] executedTasks, in

Workflow workflow = workflowExecutionService.getExecutionStatus(wfid, true);
assertNotNull(workflow);
assertTrue(!workflow.getTasks().isEmpty());
assertFalse(workflow.getTasks().isEmpty());
if (i < sequence.length - 1) {
assertEquals(RUNNING, workflow.getStatus());
} else {
Expand Down Expand Up @@ -4197,7 +4250,7 @@ public void testEventWorkflow() {
Task eventTask = workflow.getTasks().get(0);
assertEquals(TaskType.EVENT.name(), eventTask.getTaskType());
assertEquals(COMPLETED, eventTask.getStatus());
assertTrue(!eventTask.getOutputData().isEmpty());
assertFalse(eventTask.getOutputData().isEmpty());
assertNotNull(eventTask.getOutputData().get("event_produced"));

Task task = workflowExecutionService.poll("junit_task_1", "test");
Expand Down Expand Up @@ -5058,6 +5111,79 @@ private void createWFWithResponseTimeout() {
metadataService.updateWorkflowDef(def);
}

private void createWorkflowWthMultiLevelSubWorkflows() {
final String subWorkflowLevel1 = "junit_sw_level_1";
final String subWorkflowLevel2 = "junit_sw_level_2";
final String subWorkflowLevel3 = "junit_sw_level_3";

// level 3
WorkflowDef workflowDef_level3 = new WorkflowDef();
workflowDef_level3.setName(subWorkflowLevel3);
workflowDef_level3.setDescription(workflowDef_level3.getName());
workflowDef_level3.setVersion(1);
workflowDef_level3.setSchemaVersion(2);

LinkedList<WorkflowTask> workflowTasks_level3 = new LinkedList<>();
WorkflowTask simpleWorkflowTask = new WorkflowTask();
simpleWorkflowTask.setName("junit_task_3");
simpleWorkflowTask.setInputParameters(new HashMap<>());
simpleWorkflowTask.setTaskReferenceName("t1");
workflowTasks_level3.add(simpleWorkflowTask);
workflowDef_level3.setTasks(workflowTasks_level3);

metadataService.updateWorkflowDef(workflowDef_level3);

// level 2
WorkflowDef workflowDef_level2 = new WorkflowDef();
workflowDef_level2.setName(subWorkflowLevel2);
workflowDef_level2.setDescription(workflowDef_level2.getName());
workflowDef_level2.setVersion(1);
workflowDef_level2.setSchemaVersion(2);

LinkedList<WorkflowTask> workflowTasks_level2 = new LinkedList<>();
workflowTasks_level2.add(createSubWorkflowTask(subWorkflowLevel3));
workflowDef_level2.setTasks(workflowTasks_level2);

metadataService.updateWorkflowDef(workflowDef_level2);

// level 1
WorkflowDef workflowDef_level1 = new WorkflowDef();
workflowDef_level1.setName(subWorkflowLevel1);
workflowDef_level1.setDescription(workflowDef_level1.getName());
workflowDef_level1.setVersion(1);
workflowDef_level1.setSchemaVersion(2);

LinkedList<WorkflowTask> workflowTasks_level1 = new LinkedList<>();
workflowTasks_level1.add(createSubWorkflowTask(subWorkflowLevel2));
workflowDef_level1.setTasks(workflowTasks_level1);

metadataService.updateWorkflowDef(workflowDef_level1);

// top-level parent workflow
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName(WORKFLOW_MULTI_LEVEL_SW);
workflowDef.setDescription(workflowDef.getName());
workflowDef.setVersion(1);
workflowDef.setInputParameters(Arrays.asList("param1", "param2"));
workflowDef.setSchemaVersion(2);

LinkedList<WorkflowTask> workflowTasks = new LinkedList<>();
workflowTasks.add(createSubWorkflowTask(subWorkflowLevel1));
workflowDef.setTasks(workflowTasks);

metadataService.updateWorkflowDef(workflowDef);
}

private WorkflowTask createSubWorkflowTask(String subWorkflowName) {
WorkflowTask subWorkflowTask = new WorkflowTask();
subWorkflowTask.setType(TaskType.SUB_WORKFLOW.name());
SubWorkflowParams subWorkflowParams = new SubWorkflowParams();
subWorkflowParams.setName(subWorkflowName);
subWorkflowTask.setSubWorkflowParam(subWorkflowParams);
subWorkflowTask.setTaskReferenceName(subWorkflowName + "_task");
return subWorkflowTask;
}

private String runWorkflowWithSubworkflow() {
clearWorkflows();
createWorkflowDefForDomain();
Expand Down
Expand Up @@ -126,12 +126,18 @@ public void testForkJoinNestedWithSubWorkflow() {
@Ignore
@Test
@Override
public void testTerminateTaskWithFailedStatus() {}
public void testTerminateTaskWithFailedStatus() {
}

@Ignore
@Test
@Override
public void testTerminateTaskWithCompletedStatus() {}

public void testTerminateTaskWithCompletedStatus() {
}

@Ignore
@Test
@Override
public void testTerminateMultiLevelWorkflow() {
}
}

0 comments on commit 8a199bc

Please sign in to comment.