Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Updating Subworflow logic to enable async mode execution for start wi…
Browse files Browse the repository at this point in the history
…th delays, and subworkflow task status to reflect the actual workflow state.

Added subworkflowId to Task object, instead of keeping it in input/output.
  • Loading branch information
kishorebanala committed Feb 24, 2020
1 parent 86b2678 commit 7a181b3
Show file tree
Hide file tree
Showing 10 changed files with 421 additions and 234 deletions.
Expand Up @@ -190,7 +190,7 @@ public boolean isRetriable() {

@ProtoField(id = 36)
private int workflowPriority;

@ProtoField(id = 37)
private String executionNameSpace;

Expand All @@ -200,6 +200,9 @@ public boolean isRetriable() {
@ProtoField(id = 40)
private int iteration;

@ProtoField(id = 41)
private String subWorkflowId;

public Task() {
}

Expand Down Expand Up @@ -733,6 +736,15 @@ public void setWorkflowPriority(int workflowPriority) {
this.workflowPriority = workflowPriority;
}

public String getSubWorkflowId() {
return subWorkflowId;
}

public void setSubWorkflowId(String subWorkflowId) {
this.subWorkflowId = subWorkflowId;
}


public Task copy() {
Task copy = new Task();
copy.setCallbackAfterSeconds(callbackAfterSeconds);
Expand Down Expand Up @@ -763,6 +775,7 @@ public Task copy() {
copy.setIteration(iteration);
copy.setExecutionNameSpace(executionNameSpace);
copy.setIsolationGroupId(isolationGroupId);
copy.setSubWorkflowId(subWorkflowId);

return copy;
}
Expand Down
Expand Up @@ -69,6 +69,8 @@ public enum Status {

private String externalOutputPayloadStoragePath;

private String subWorkflowId;

public TaskResult(Task task) {
this.workflowInstanceId = task.getWorkflowInstanceId();
this.taskId = task.getTaskId();
Expand All @@ -77,6 +79,7 @@ public TaskResult(Task task) {
this.workerId = task.getWorkerId();
this.outputData = task.getOutputData();
this.externalOutputPayloadStoragePath = task.getExternalOutputPayloadStoragePath();
this.subWorkflowId = task.getSubWorkflowId();
switch (task.getStatus()) {
case CANCELED:
case COMPLETED_WITH_ERRORS:
Expand Down Expand Up @@ -240,6 +243,14 @@ public void setExternalOutputPayloadStoragePath(String externalOutputPayloadStor
this.externalOutputPayloadStoragePath = externalOutputPayloadStoragePath;
}

public String getSubWorkflowId() {
return subWorkflowId;
}

public void setSubWorkflowId(String subWorkflowId) {
this.subWorkflowId = subWorkflowId;
}

@Override
public String toString() {
return "TaskResult{" +
Expand Down
Expand Up @@ -98,7 +98,7 @@ public void testTaskQueueWaitTime() {
public void testDeepCopyTask() {
final Task task = new Task();
// In order to avoid forgetting putting inside the copy method the newly added fields check the number of declared fields.
final int expectedTaskFieldsNumber = 39;
final int expectedTaskFieldsNumber = 40;
final int declaredFieldsNumber = task.getClass().getDeclaredFields().length;

assertEquals(expectedTaskFieldsNumber, declaredFieldsNumber);
Expand Down Expand Up @@ -137,6 +137,7 @@ public void testDeepCopyTask() {
task.setRetried(false);
task.setReasonForIncompletion("");
task.setWorkerId("");
task.setSubWorkflowId("");

final Task copy = task.deepCopy();
assertEquals(task, copy);
Expand Down
Expand Up @@ -24,7 +24,6 @@
import static com.netflix.conductor.core.execution.ApplicationException.Code.CONFLICT;
import static com.netflix.conductor.core.execution.ApplicationException.Code.INVALID_INPUT;
import static com.netflix.conductor.core.execution.ApplicationException.Code.NOT_FOUND;
import static com.netflix.conductor.core.execution.tasks.SubWorkflow.SUB_WORKFLOW_ID;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -824,6 +823,7 @@ public void updateTask(TaskResult taskResult) {
task.setWorkerId(taskResult.getWorkerId());
task.setCallbackAfterSeconds(taskResult.getCallbackAfterSeconds());
task.setOutputData(taskResult.getOutputData());
task.setSubWorkflowId(taskResult.getSubWorkflowId());

if (task.getOutputData() != null) {
deciderService.externalizeTaskData(task);
Expand Down Expand Up @@ -1383,7 +1383,7 @@ void rollbackTasks(String workflowId, List<Task> createdTasks) {
createdTasks.forEach(task -> new RetryUtil<>().retryOnException(() ->
{
if (task.getTaskType().equals(SUB_WORKFLOW.name())) {
executionDAOFacade.removeWorkflow((String) task.getOutputData().get(SUB_WORKFLOW_ID), false);
executionDAOFacade.removeWorkflow(task.getSubWorkflowId(), false);
}
executionDAOFacade.removeTask(task.getTaskId());
return null;
Expand Down Expand Up @@ -1454,7 +1454,7 @@ private boolean rerunWF(String workflowId, String taskId, Map<String, Object> ta
} else {
// If not found look into sub workflows
if (task.getTaskType().equalsIgnoreCase(SubWorkflow.NAME)) {
String subWorkflowId = task.getInputData().get(SUB_WORKFLOW_ID).toString();
String subWorkflowId = task.getSubWorkflowId();
if (rerunWF(subWorkflowId, taskId, taskInput, null, null)) {
rerunFromTask = task;
break;
Expand Down
Expand Up @@ -22,6 +22,8 @@
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.Workflow.WorkflowStatus;
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.core.execution.Code;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -38,7 +40,6 @@ public class SubWorkflow extends WorkflowSystemTask {

private static final Logger logger = LoggerFactory.getLogger(SubWorkflow.class);
public static final String NAME = "SUB_WORKFLOW";
public static final String SUB_WORKFLOW_ID = "subWorkflowId";

public SubWorkflow() {
super(NAME);
Expand All @@ -63,9 +64,27 @@ public void start(Workflow workflow, Task task, WorkflowExecutor provider) {

try {
String subWorkflowId = provider.startWorkflow(name, version, wfInput, null, correlationId, workflow.getWorkflowId(), task.getTaskId(), null, taskToDomain);
task.getOutputData().put(SUB_WORKFLOW_ID, subWorkflowId);
task.getInputData().put(SUB_WORKFLOW_ID, subWorkflowId);
task.setStatus(Status.IN_PROGRESS);

task.setSubWorkflowId(subWorkflowId);

// Set task status based on current sub-workflow status, as the status can change in recursion by the time we update here.
Workflow subWorkflow = provider.getWorkflow(subWorkflowId, false);
switch (subWorkflow.getStatus()) {
case RUNNING:
case PAUSED:
task.setStatus(Status.IN_PROGRESS);
break;
case COMPLETED:
task.setStatus(Status.COMPLETED);
break;
case FAILED:
case TIMED_OUT:
case TERMINATED:
task.setStatus(Status.FAILED);
break;
default:
throw new ApplicationException(ApplicationException.Code.INTERNAL_ERROR, "Subworkflow status does not conform to relevant task status.");
}
} catch (Exception e) {
task.setStatus(Status.FAILED);
task.setReasonForIncompletion(e.getMessage());
Expand All @@ -75,7 +94,7 @@ public void start(Workflow workflow, Task task, WorkflowExecutor provider) {

@Override
public boolean execute(Workflow workflow, Task task, WorkflowExecutor provider) {
String workflowId = getSubWorkflowId(task);
String workflowId = task.getSubWorkflowId();
if(StringUtils.isEmpty(workflowId)) {
return false;
}
Expand All @@ -101,7 +120,7 @@ public boolean execute(Workflow workflow, Task task, WorkflowExecutor provider)

@Override
public void cancel(Workflow workflow, Task task, WorkflowExecutor provider) {
String workflowId = getSubWorkflowId(task);
String workflowId = task.getSubWorkflowId();
if(StringUtils.isEmpty(workflowId)) {
return;
}
Expand All @@ -112,16 +131,6 @@ public void cancel(Workflow workflow, Task task, WorkflowExecutor provider) {

@Override
public boolean isAsync() {
return false;
}

private String getSubWorkflowId(Task task) {
String subWorkflowId = (String) task.getOutputData().get(SUB_WORKFLOW_ID);
if (subWorkflowId == null) {
subWorkflowId = Optional.ofNullable(task.getInputData())
.map(data -> (String)data.get(SUB_WORKFLOW_ID)) //Backward compatibility
.orElse("");
}
return subWorkflowId;
return true;
}
}
Expand Up @@ -15,7 +15,6 @@
*/
package com.netflix.conductor.core.execution;

import static com.netflix.conductor.core.execution.tasks.SubWorkflow.SUB_WORKFLOW_ID;
import static java.util.Comparator.comparingInt;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.maxBy;
Expand Down Expand Up @@ -1000,7 +999,7 @@ public void testRollbackTasks() {
task3.setRetryCount(0);
task3.setWorkflowTask(subWorkflowTask);
task3.setOutputData(new HashMap<>());
task3.getOutputData().put(SUB_WORKFLOW_ID, IDGenerator.generate());
task3.setSubWorkflowId(IDGenerator.generate());

AtomicInteger removeWorkflowCalledCounter = new AtomicInteger(0);
doAnswer(invocation -> {
Expand Down
Expand Up @@ -9,12 +9,12 @@
import org.junit.Before;
import org.junit.Test;


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -46,12 +46,29 @@ public void testStartSubWorkflow() {
inputData.put("subWorkflowVersion", 3);
task.setInputData(inputData);

String workflowId = "workflow_1";
Workflow workflow = new Workflow();
workflow.setWorkflowId(workflowId);

when(workflowExecutor.startWorkflow(eq("UnitWorkFlow"), eq(3), eq(inputData), eq(null), any(), any(), any(), eq(null), any()))
.thenReturn("workflow_1");
.thenReturn(workflowId);

when(workflowExecutor.getWorkflow(anyString(), eq(false))).thenReturn(workflow);

workflow.setStatus(Workflow.WorkflowStatus.RUNNING);
subWorkflow.start(workflowInstance, task, workflowExecutor);
assertEquals("workflow_1", task.getOutputData().get(SubWorkflow.SUB_WORKFLOW_ID));
assertEquals("workflow_1", task.getSubWorkflowId());
assertEquals(Task.Status.IN_PROGRESS, task.getStatus());

workflow.setStatus(Workflow.WorkflowStatus.TERMINATED);
subWorkflow.start(workflowInstance, task, workflowExecutor);
assertEquals("workflow_1", task.getSubWorkflowId());
assertEquals(Task.Status.FAILED, task.getStatus());

workflow.setStatus(Workflow.WorkflowStatus.COMPLETED);
subWorkflow.start(workflowInstance, task, workflowExecutor);
assertEquals("workflow_1", task.getSubWorkflowId());
assertEquals(Task.Status.COMPLETED, task.getStatus());
}

@Test
Expand All @@ -75,7 +92,7 @@ public void testStartSubWorkflowWithEmptyWorkflowInput() {
.thenReturn("workflow_1");

subWorkflow.start(workflowInstance, task, workflowExecutor);
assertEquals("workflow_1", task.getOutputData().get(SubWorkflow.SUB_WORKFLOW_ID));
assertEquals("workflow_1", task.getSubWorkflowId());
}

@Test
Expand All @@ -100,7 +117,7 @@ public void testStartSubWorkflowWithWorkflowInput() {
.thenReturn("workflow_1");

subWorkflow.start(workflowInstance, task, workflowExecutor);
assertEquals("workflow_1", task.getOutputData().get(SubWorkflow.SUB_WORKFLOW_ID));
assertEquals("workflow_1", task.getSubWorkflowId());
}

@Test
Expand All @@ -123,7 +140,7 @@ public void testStartSubWorkflowTaskToDomain() {
.thenReturn("workflow_1");

subWorkflow.start(workflowInstance, task, workflowExecutor);
assertEquals("workflow_1", task.getOutputData().get(SubWorkflow.SUB_WORKFLOW_ID));
assertEquals("workflow_1", task.getSubWorkflowId());
}

@Test
Expand Down Expand Up @@ -156,8 +173,8 @@ public void testExecuteWorkflowStatus() {

Task task = new Task();
Map<String, Object> outputData = new HashMap<>();
outputData.put(SubWorkflow.SUB_WORKFLOW_ID, "sub-workflow-id");
task.setOutputData(outputData);
task.setSubWorkflowId("sub-workflow-id");

Map<String, Object> inputData = new HashMap<>();
inputData.put("subWorkflowName", "UnitWorkFlow");
Expand Down Expand Up @@ -213,8 +230,8 @@ public void testCancelWithWorkflowId() {

Task task = new Task();
Map<String, Object> outputData = new HashMap<>();
outputData.put(SubWorkflow.SUB_WORKFLOW_ID, "sub-workflow-id");
task.setOutputData(outputData);
task.setSubWorkflowId("sub-workflow-id");

Map<String, Object> inputData = new HashMap<>();
inputData.put("subWorkflowName", "UnitWorkFlow");
Expand Down Expand Up @@ -261,6 +278,6 @@ public void testCancelWithoutWorkflowId() {
@Test
public void testIsAsync() {
SubWorkflow subWorkflow = new SubWorkflow();
assertFalse(subWorkflow.isAsync());
assertTrue(subWorkflow.isAsync());
}
}
Expand Up @@ -557,6 +557,9 @@ public TaskPb.Task toProto(Task from) {
to.setIsolationGroupId( from.getIsolationGroupId() );
}
to.setIteration( from.getIteration() );
if (from.getSubWorkflowId() != null) {
to.setSubWorkflowId( from.getSubWorkflowId() );
}
return to.build();
}

Expand Down Expand Up @@ -614,6 +617,7 @@ public Task fromProto(TaskPb.Task from) {
to.setExecutionNameSpace( from.getExecutionNameSpace() );
to.setIsolationGroupId( from.getIsolationGroupId() );
to.setIteration( from.getIteration() );
to.setSubWorkflowId( from.getSubWorkflowId() );
return to;
}

Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/proto/model/task.proto
Expand Up @@ -59,4 +59,5 @@ message Task {
string execution_name_space = 37;
string isolation_group_id = 38;
int32 iteration = 40;
string sub_workflow_id = 41;
}

0 comments on commit 7a181b3

Please sign in to comment.