Skip to content

Commit

Permalink
Merge pull request #180 from conductor-oss/sub_workflow_sync
Browse files Browse the repository at this point in the history
Change Sub workflow to sync task
  • Loading branch information
v1r3n committed Jun 19, 2024
2 parents c5d3838 + d481f6c commit 6d81255
Show file tree
Hide file tree
Showing 16 changed files with 76 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,6 @@ public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor work
workflowExecutor.terminateWorkflow(subWorkflow, reason, null);
}

@Override
public boolean isAsync() {
return true;
}

/**
* Keep Subworkflow task asyncComplete. The Subworkflow task will be executed once
* asynchronously to move to IN_PROGRESS state, and will move to termination by Subworkflow's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ public void testCancelWithoutWorkflowId() {

@Test
public void testIsAsync() {
assertTrue(subWorkflow.isAsync());
assertFalse(subWorkflow.isAsync());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,9 @@ class DoWhileSpec extends AbstractSpecification {
tasks[5].taskType == 'JOIN'
tasks[5].status == Task.Status.COMPLETED
tasks[6].taskType == 'SUB_WORKFLOW'
tasks[6].status == Task.Status.SCHEDULED
tasks[6].status == Task.Status.IN_PROGRESS
}

when: "the sub workflow is started by issuing a system task call"
def parentWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowTaskId = parentWorkflow.getTaskByRefName('st1__1').taskId
asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId)

then: "verify that the sub workflow task is in a IN PROGRESS state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ class DynamicForkJoinSpec extends AbstractSpecification {
tasks[1].taskType == 'FORK'
tasks[1].status == Task.Status.COMPLETED
tasks[2].taskType == 'SUB_WORKFLOW'
tasks[2].status == Task.Status.SCHEDULED
tasks[2].status == Task.Status.IN_PROGRESS
tasks[3].taskType == 'integration_task_10'
tasks[3].status == Task.Status.SCHEDULED
tasks[4].taskType == 'JOIN'
Expand All @@ -446,9 +446,6 @@ class DynamicForkJoinSpec extends AbstractSpecification {

when: "the subworkflow is started by issuing a system task call"
def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("dynamicfanouttask_join").taskId
List<String> polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200)
String subworkflowTaskId = polledTaskIds.get(0)
asyncSystemTaskExecutor.execute(subWorkflowTask, subworkflowTaskId)

then: "verify that the sub workflow task is in a IN_PROGRESS state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,15 +448,14 @@ class ExternalPayloadStorageSpec extends AbstractSpecification {
tasks[0].outputData.isEmpty()

tasks[1].taskType == TaskType.SUB_WORKFLOW.name()
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[1].inputData.isEmpty()

}

when: "the subworkflow is started by issuing a system task call"
def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowTaskId = workflow.getTaskByRefName('swt').taskId
asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId)

then: "verify that the sub workflow task is in a IN_PROGRESS state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
Expand Down Expand Up @@ -756,7 +755,7 @@ class ExternalPayloadStorageSpec extends AbstractSpecification {
tasks[2].taskType == 'SUB_WORKFLOW'
!tasks[2].inputData

tasks[2].status == Task.Status.SCHEDULED
tasks[2].status == Task.Status.IN_PROGRESS
tasks[3].taskType == 'JOIN'
tasks[3].status == Task.Status.IN_PROGRESS
tasks[3].referenceTaskName == 'dynamicfanouttask_join'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,27 +87,9 @@ class FailureWorkflowSpec extends AbstractSpecification {
def workflowInstanceId = startWorkflow(PARENT_WORKFLOW_WITH_FAILURE_TASK, 1,
'', workflowInput, null)

then: "verify that the workflow has started and the tasks are as expected"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 2
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'LAMBDA'
tasks[0].referenceTaskName == 'lambdaTask1'
tasks[0].seq == 1
tasks[1].status == Task.Status.SCHEDULED
tasks[1].taskType == 'SUB_WORKFLOW'
tasks[1].seq == 2
}

when: "subworkflow is retrieved"
then: "verify that the sub workflow has failed"
def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowTaskId = workflow.getTaskByRefName("test_task_failed_sub_wf").getTaskId()
asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId)
workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowId = workflow.getTaskByRefName("test_task_failed_sub_wf").subWorkflowId

then: "verify that the sub workflow has failed"
with(workflowExecutionService.getExecutionStatus(subWorkflowId, true)) {
status == Workflow.WorkflowStatus.FAILED
tasks.size() == 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ class ForkJoinSpec extends AbstractSpecification {
tasks[5].inputData['joinOn'] == ['t14', 't20']

tasks[6].taskType == 'SUB_WORKFLOW'
tasks[6].status == Task.Status.SCHEDULED
tasks[6].status == Task.Status.IN_PROGRESS
tasks[7].taskType == 'JOIN'
tasks[7].status == Task.Status.IN_PROGRESS
tasks[7].inputData['joinOn'] == ['t11', 'join2', 'sw1']
Expand Down Expand Up @@ -905,7 +905,7 @@ class ForkJoinSpec extends AbstractSpecification {
tasks[5].inputData['joinOn'] == ['t14', 't20']

tasks[6].taskType == 'SUB_WORKFLOW'
tasks[6].status == Task.Status.SCHEDULED
tasks[6].status == Task.Status.IN_PROGRESS
tasks[7].taskType == 'JOIN'
tasks[7].status == Task.Status.IN_PROGRESS
tasks[7].inputData['joinOn'] == ['t11', 'join2', 'sw1']
Expand All @@ -923,8 +923,6 @@ class ForkJoinSpec extends AbstractSpecification {

and: "Get the sub workflow id associated with the SubWorkflow Task sw1 and start the system task"
def workflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowTaskId = workflow.getTaskByRefName("sw1").getTaskId()
asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId)
def updatedWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowInstanceId = updatedWorkflow.getTaskByRefName('sw1').subWorkflowId

Expand Down Expand Up @@ -1079,19 +1077,15 @@ class ForkJoinSpec extends AbstractSpecification {
tasks[0].taskType == 'FORK'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'SUB_WORKFLOW'
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'SUB_WORKFLOW'
tasks[2].status == Task.Status.SCHEDULED
tasks[2].status == Task.Status.IN_PROGRESS
tasks[3].taskType == 'JOIN'
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "both the sub workflows are started by issuing a system task call"
def workflowWithScheduledSubWorkflows = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowTaskId1 = workflowWithScheduledSubWorkflows.getTaskByRefName('st1').taskId
asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId1)
def subWorkflowTaskId2 = workflowWithScheduledSubWorkflows.getTaskByRefName('st2').taskId
asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId2)
def joinTaskId = workflowWithScheduledSubWorkflows.getTaskByRefName("fanouttask_join").taskId

then: "verify that the sub workflow tasks are in a IN PROGRESS state"
Expand Down Expand Up @@ -1212,7 +1206,7 @@ class ForkJoinSpec extends AbstractSpecification {
tasks[0].taskType == 'FORK'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'SUB_WORKFLOW'
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == 'JOIN'
Expand All @@ -1224,7 +1218,6 @@ class ForkJoinSpec extends AbstractSpecification {
def parentWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
def subWorkflowTaskId = parentWorkflow.getTaskByRefName('st1').taskId
def jointaskId = parentWorkflow.getTaskByRefName("fanouttask_join").taskId
asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId)

then: "verify that the sub workflow task is in a IN_PROGRESS state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
Expand Down Expand Up @@ -1283,13 +1276,12 @@ class ForkJoinSpec extends AbstractSpecification {
tasks[3].inputData['joinOn'] == ['st1', 't2']
tasks[3].status == Task.Status.IN_PROGRESS
tasks[4].taskType == 'SUB_WORKFLOW'
tasks[4].status == Task.Status.SCHEDULED
tasks[4].status == Task.Status.IN_PROGRESS
}

when: "the sub workflow is started by issuing a system task call"
parentWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true)
subWorkflowTaskId = parentWorkflow.getTaskByRefName('st1').taskId
asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId)

then: "verify that the sub workflow task is in a IN PROGRESS state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == TASK_TYPE_JOIN
Expand All @@ -100,10 +100,6 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
then: "verify that the 'integration_task_2' was polled and acknowledged"
verifyPolledAndAcknowledgedTask(pollAndCompleteTask)

when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call"
List<String> polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])

then: "verify that the 'sub_workflow_task' is in a IN_PROGRESS state"
def rootWorkflowInstance = workflowExecutionService.getExecutionStatus(rootWorkflowId, true)
with(rootWorkflowInstance) {
Expand All @@ -119,19 +115,17 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == TASK_TYPE_JOIN
tasks[3].status == Task.Status.IN_PROGRESS
}

and: "poll and complete the integration_task_2 task in the mid-level workflow"
and: "poll and complete the integration_task_2 task in the root-level workflow"
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])

when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call"
polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def midLevelWorkflowInstance = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true)

then: "verify that the leaf workflow is RUNNING, and first task is in SCHEDULED state"
Expand Down Expand Up @@ -221,7 +215,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == TASK_TYPE_JOIN
Expand All @@ -231,11 +225,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
when: "poll and complete the integration_task_2 task in the root workflow"
def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])

and: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call"
def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId
def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new mid level workflow is created and is in RUNNING state"
newMidLevelWorkflowId != midLevelWorkflowId
Expand All @@ -245,21 +235,17 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[2].status == Task.Status.COMPLETED
tasks[3].taskType == TASK_TYPE_JOIN
tasks[3].status == Task.Status.IN_PROGRESS
}

when: "poll and complete the integration_task_2 task in the mid-level workflow"
when: "poll and complete the integration_task_2 task in the root-level workflow"
def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])

and: "poll and execute the sub workflow task"
polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId
def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new leaf workflow is created and is in RUNNING state"
newLeafWorkflowId != leafWorkflowId
Expand Down Expand Up @@ -323,7 +309,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
tasks[0].taskType == TASK_TYPE_FORK
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW
tasks[1].status == Task.Status.SCHEDULED
tasks[1].status == Task.Status.IN_PROGRESS
tasks[2].taskType == 'integration_task_2'
tasks[2].status == Task.Status.SCHEDULED
tasks[3].taskType == TASK_TYPE_JOIN
Expand All @@ -349,11 +335,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification {
def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done'])

and: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call"
def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200)
asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0])
def newLeafWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId
def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTasks().get(1).subWorkflowId

then: "verify that a new leaf workflow is created and is in RUNNING state"
newLeafWorkflowId != leafWorkflowId
Expand Down
Loading

0 comments on commit 6d81255

Please sign in to comment.