Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
fix subworkflow task def use within fork join
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Jul 9, 2020
1 parent 0bc38a4 commit 23aad0b
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 47 deletions.
Expand Up @@ -20,6 +20,7 @@
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SKIPPED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.TIMED_OUT;
import static com.netflix.conductor.common.metadata.workflow.TaskType.SUB_WORKFLOW;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.Task;
Expand Down Expand Up @@ -594,7 +595,7 @@ boolean isResponseTimedOut(TaskDef taskDefinition, Task task) {
LOGGER.warn("missing task type : {}, workflowId= {}", task.getTaskDefName(), task.getWorkflowInstanceId());
return false;
}
if (task.getStatus().isTerminal()) {
if (task.getStatus().isTerminal() || task.getTaskType().equals(SUB_WORKFLOW.name())) {
return false;
}

Expand Down
Expand Up @@ -667,8 +667,8 @@ void completeWorkflow(Workflow wf) {
executionDAOFacade.updateTasks(workflow.getTasks());
LOGGER.debug("Completed workflow execution for {}", workflow.getWorkflowId());

// If the following task, for some reason fails, the sweep will take care of this again!
if (StringUtils.isNotEmpty(workflow.getParentWorkflowId())) {
updateParentWorkflowTask(workflow);
decide(workflow.getParentWorkflowId());
}
Monitors.recordWorkflowCompletion(workflow.getWorkflowName(), workflow.getEndTime() - workflow.getStartTime(), workflow.getOwnerApp());
Expand Down Expand Up @@ -738,9 +738,8 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
}
}

// If the following lines, for some reason fails, the sweep will take
// care of this again!
if (workflow.getParentWorkflowId() != null) {
updateParentWorkflowTask(workflow);
decide(workflow.getParentWorkflowId());
}

Expand Down Expand Up @@ -1001,20 +1000,14 @@ public boolean decide(String workflowId) {
completeWorkflow(workflow);
} else {
workflow.setStatus(workflowInstance.getStatus());
terminate(workflow, new TerminateWorkflowException("Workflow is FAILED by TERMINATE task: " + task.getTaskId()));
terminate(workflow, new TerminateWorkflowException(
"Workflow is FAILED by TERMINATE task: " + task.getTaskId()));
}
return true;
}
deciderService.externalizeTaskData(task);
tasksToBeUpdated.add(task);
stateChanged = true;
} else if (SUB_WORKFLOW.name().equals(task.getTaskType()) && task.getStatus().equals(IN_PROGRESS)) {
// Verifies and updates the task inplace, based on the Subworkflow and parent Workflow state,
// and continues with the current decide.
if (updateParentWorkflow(task, workflow)) {
tasksToBeUpdated.add(task);
stateChanged = true;
}
}
}
}
Expand Down Expand Up @@ -1568,11 +1561,6 @@ private boolean updateParentWorkflow(Workflow subWorkflow) {
return updateParentWorkflow(subWorkflowTask, subWorkflow, parentWorkflow);
}

private boolean updateParentWorkflow(Task subWorkflowTask, Workflow parentWorkflow) {
Workflow subWorkflow = executionDAOFacade.getWorkflowById(subWorkflowTask.getSubWorkflowId(), false);
return updateParentWorkflow(subWorkflowTask, subWorkflow, parentWorkflow);
}

/**
* Update parent Workflow based on Subworkflow state.
* Updates the provided subWorkflowTask and/or parentWorkflow inplace, where applicable.
Expand Down Expand Up @@ -1642,4 +1630,18 @@ protected boolean updateParentWorkflow(Task subWorkflowTask, Workflow subWorkflo
}
return false;
}
}

@VisibleForTesting
void updateParentWorkflowTask(Workflow subWorkflow) {
SubWorkflow subWorkflowSystemTask = new SubWorkflow();
Task subWorkflowTask = executionDAOFacade.getTaskById(subWorkflow.getParentWorkflowTaskId());
subWorkflowSystemTask.execute(subWorkflow, subWorkflowTask, this);
if (subWorkflowTask.getStatus().isTerminal() && subWorkflowTask.getExternalOutputPayloadStoragePath() != null && !subWorkflowTask.getOutputData().isEmpty()) {
Map<String, Object> parentWorkflowTaskOutputData = subWorkflowTask.getOutputData();
deciderService.populateTaskData(subWorkflowTask);
subWorkflowTask.getOutputData().putAll(parentWorkflowTaskOutputData);
deciderService.externalizeTaskData(subWorkflowTask);
}
executionDAOFacade.updateTask(subWorkflowTask);
}
}
Expand Up @@ -866,9 +866,14 @@ public void testIsResponseTimedOut() {
task.setTaskDefName("test_rt");
task.setStatus(Status.IN_PROGRESS);
task.setTaskId("aa");
task.setTaskType(TaskType.TASK_TYPE_SIMPLE);
task.setUpdateTime(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(11));

assertTrue(deciderService.isResponseTimedOut(taskDef, task));

// verify that sub workflow tasks are not response timed out
task.setTaskType(TaskType.TASK_TYPE_SUB_WORKFLOW);
assertFalse(deciderService.isResponseTimedOut(taskDef, task));
}

@Test
Expand Down
Expand Up @@ -1236,6 +1236,31 @@ public void testResetCallbacksForWorkflowTasks() {
verify(queueDAO, times(1)).resetOffsetTime(anyString(), anyString());
}

@Test
public void testUpdateParentWorkflowTask() {
String parentWorkflowTaskId = "parent_workflow_task_id";
String workflowId = "workflow_id";

Workflow subWorkflow = new Workflow();
subWorkflow.setWorkflowId(workflowId);
subWorkflow.setParentWorkflowTaskId(parentWorkflowTaskId);
subWorkflow.setStatus(WorkflowStatus.COMPLETED);

Task subWorkflowTask = new Task();
subWorkflowTask.setSubWorkflowId(workflowId);
subWorkflowTask.setStatus(Status.IN_PROGRESS);
subWorkflowTask.setExternalOutputPayloadStoragePath(null);

when(executionDAOFacade.getTaskById(parentWorkflowTaskId)).thenReturn(subWorkflowTask);
when(executionDAOFacade.getWorkflowById(workflowId, false)).thenReturn(subWorkflow);

workflowExecutor.updateParentWorkflowTask(subWorkflow);
ArgumentCaptor<Task> argumentCaptor = ArgumentCaptor.forClass(Task.class);
verify(executionDAOFacade, times(1)).updateTask(argumentCaptor.capture());
assertEquals(Status.COMPLETED, argumentCaptor.getAllValues().get(0).getStatus());
assertEquals(workflowId, argumentCaptor.getAllValues().get(0).getSubWorkflowId());
}

private Workflow generateSampleWorkflow() {
//setup
Workflow workflow = new Workflow();
Expand Down
Expand Up @@ -32,22 +32,6 @@ import javax.annotation.PostConstruct
import javax.inject.Inject
import javax.inject.Singleton

/**
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This is a helper class used to initialize task definitions required by the tests when loaded up.
* The task definitions that are loaded up in {@link WorkflowTestUtil#taskDefinitions()} method as part of the post construct of the bean.
Expand Down Expand Up @@ -123,14 +107,20 @@ class WorkflowTestUtil {
simpleSubWorkflowTask.setName('simple_task_in_sub_wf')
simpleSubWorkflowTask.setRetryCount(0)

TaskDef subWorkflowTask = new TaskDef()
subWorkflowTask.setName('sub_workflow_task')
subWorkflowTask.setRetryCount(1)
subWorkflowTask.setResponseTimeoutSeconds(5)
subWorkflowTask.setRetryDelaySeconds(0)

TaskDef waitTimeOutTask = new TaskDef()
waitTimeOutTask.name = 'waitTimeout'
waitTimeOutTask.timeoutSeconds = 2
waitTimeOutTask.retryCount = 1
waitTimeOutTask.timeoutPolicy = TaskDef.TimeoutPolicy.RETRY
waitTimeOutTask.retryDelaySeconds = 10

metadataService.registerTaskDef([taskWithResponseTimeOut, optionalTask, simpleSubWorkflowTask, waitTimeOutTask])
metadataService.registerTaskDef([taskWithResponseTimeOut, optionalTask, simpleSubWorkflowTask, subWorkflowTask, waitTimeOutTask])
}

/**
Expand Down
Expand Up @@ -58,6 +58,9 @@ class ForkJoinSpec extends Specification {
@Shared
def WORKFLOW_FORK_JOIN_OPTIONAL_SW = "integration_test_fork_join_optional_sw"

@Shared
def FORK_JOIN_SUB_WORKFLOW = 'integration_test_fork_join_sw'

def cleanup() {
workflowTestUtil.clearWorkflows()
}
Expand All @@ -69,7 +72,8 @@ class ForkJoinSpec extends Specification {
'simple_workflow_1_integration_test.json',
'nested_fork_join_with_sub_workflow_integration_test.json',
'simple_one_task_sub_workflow_integration_test.json',
'fork_join_with_optional_sub_workflow_forks_integration_test.json'
'fork_join_with_optional_sub_workflow_forks_integration_test.json',
'fork_join_sub_workflow.json'
)
}

Expand Down Expand Up @@ -854,4 +858,181 @@ class ForkJoinSpec extends Specification {
}
}

def "Test fork join with sub workflow task using task definition"() {
given: "A input to the workflow that has fork with sub workflow task"
Map workflowInput = new HashMap<String, Object>()
workflowInput['param1'] = 'p1 value'
workflowInput['param2'] = 'p2 value'

when: "A workflow that has fork with sub workflow task is started"
def workflowInstanceId = workflowExecutor.startWorkflow(FORK_JOIN_SUB_WORKFLOW, 1, '', workflowInput, null,
null, null)

then: "verify that the workflow is in a RUNNING state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 4
tasks[0].taskType == 'FORK'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'SUB_WORKFLOW'
tasks[1].status == Task.Status.SCHEDULED
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == 'JOIN'
tasks[3].inputData['joinOn'] == ['st1', 't2']
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "the subworkflow is started by issuing a system task call"
def parentWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowTaskId = parentWorkflow.getTaskByRefName('st1').taskId
workflowExecutor.executeSystemTask(new SubWorkflow(), subWorkflowTaskId, 1)

then: "verify that the sub workflow task is in a IN_PROGRESS state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 4
tasks[0].taskType == 'FORK'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'SUB_WORKFLOW'
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == 'JOIN'
tasks[3].inputData['joinOn'] == ['st1', 't2']
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "sub workflow is retrieved"
def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowInstanceId = workflow.getTaskByRefName('st1').subWorkflowId

then: "verify that the sub workflow is in a RUNNING state"
with(workflowExecutionService.getExecutionStatus(subWorkflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].status == Task.Status.SCHEDULED
tasks[0].taskType == 'simple_task_in_sub_wf'
}

when: "the 'simple_task_in_sub_wf' belonging to the sub workflow is polled and failed"
def polledAndFailSubWorkflowTask = workflowTestUtil.pollAndFailTask('simple_task_in_sub_wf',
'task1.worker', 'Failed....')

then: "verify that the task was polled and failed"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndFailSubWorkflowTask)

and: "verify that the sub workflow is in failed state"
with(workflowExecutionService.getExecutionStatus(subWorkflowInstanceId, true)) {
status == Workflow.WorkflowStatus.FAILED
tasks.size() == 1
tasks[0].status == Task.Status.FAILED
tasks[0].taskType == 'simple_task_in_sub_wf'
}

and: "verify that the workflow is in a RUNNING state and sub workflow task is retried"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 5
tasks[0].taskType == 'FORK'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'SUB_WORKFLOW'
tasks[1].status == Task.Status.FAILED
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == 'JOIN'
tasks[3].inputData['joinOn'] == ['st1', 't2']
tasks[3].status == Task.Status.IN_PROGRESS
tasks[4].taskType == 'SUB_WORKFLOW'
tasks[4].status == Task.Status.SCHEDULED
}

when: "the sub workflow is started by issuing a system task call"
parentWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
subWorkflowTaskId = parentWorkflow.getTaskByRefName('st1').taskId
workflowExecutor.executeSystemTask(new SubWorkflow(), subWorkflowTaskId, 1)

then: "verify that the sub workflow task is in a IN PROGRESS state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 5
tasks[0].taskType == 'FORK'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'SUB_WORKFLOW'
tasks[1].status == Task.Status.FAILED
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == 'JOIN'
tasks[3].inputData['joinOn'] == ['st1', 't2']
tasks[3].status == Task.Status.IN_PROGRESS
tasks[4].taskType == 'SUB_WORKFLOW'
tasks[4].status == Task.Status.IN_PROGRESS
}

when: "sub workflow is retrieved"
workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
subWorkflowInstanceId = workflow.getTaskByRefName('st1').subWorkflowId

then: "verify that the sub workflow is in a RUNNING state"
with(workflowExecutionService.getExecutionStatus(subWorkflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].status == Task.Status.SCHEDULED
tasks[0].taskType == 'simple_task_in_sub_wf'
}

when: "the 'simple_task_in_sub_wf' belonging to the sub workflow is polled and completed"
def polledAndCompletedSubWorkflowTask = workflowTestUtil.pollAndCompleteTask('simple_task_in_sub_wf', 'subworkflow.task.worker')

then: "verify that the task was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndCompletedSubWorkflowTask)

and: "verify that the sub workflow is in COMPLETED state"
with(workflowExecutionService.getExecutionStatus(subWorkflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 1
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'simple_task_in_sub_wf'
}

and: "verify that the workflow is in a RUNNING state and sub workflow task is completed"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 5
tasks[0].taskType == 'FORK'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'SUB_WORKFLOW'
tasks[1].status == Task.Status.FAILED
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == 'JOIN'
tasks[3].inputData['joinOn'] == ['st1', 't2']
tasks[3].status == Task.Status.IN_PROGRESS
tasks[4].taskType == 'SUB_WORKFLOW'
tasks[4].status == Task.Status.COMPLETED
}

when: "the simple task is polled and completed"
def polledAndCompletedSimpleTask = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.worker')

then: "verify that the task was polled and acknowledged"
workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndCompletedSimpleTask)

and: "verify that the workflow is in a COMPLETED state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 5
tasks[0].taskType == 'FORK'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'SUB_WORKFLOW'
tasks[1].status == Task.Status.FAILED
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == 'JOIN'
tasks[3].inputData['joinOn'] == ['st1', 't2']
tasks[3].status == Task.Status.COMPLETED
tasks[4].taskType == 'SUB_WORKFLOW'
tasks[4].status == Task.Status.COMPLETED
}
}
}

0 comments on commit 23aad0b

Please sign in to comment.