Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1709 from Netflix/simple_workflow_test_cleanup
Browse files Browse the repository at this point in the history
Workflow and task configuration spec and features
  • Loading branch information
pctreddy committed May 26, 2020
2 parents 62c1d16 + 81fe222 commit b2332d0
Show file tree
Hide file tree
Showing 4 changed files with 795 additions and 80 deletions.
Expand Up @@ -123,7 +123,14 @@ class WorkflowTestUtil {
simpleSubWorkflowTask.setName('simple_task_in_sub_wf')
simpleSubWorkflowTask.setRetryCount(0)

metadataService.registerTaskDef([taskWithResponseTimeOut, optionalTask, simpleSubWorkflowTask])
TaskDef waitTimeOutTask = new TaskDef()
waitTimeOutTask.name = 'waitTimeout'
waitTimeOutTask.timeoutSeconds = 2
waitTimeOutTask.retryCount = 1
waitTimeOutTask.timeoutPolicy = TaskDef.TimeoutPolicy.RETRY
waitTimeOutTask.retryDelaySeconds = 10

metadataService.registerTaskDef([taskWithResponseTimeOut, optionalTask, simpleSubWorkflowTask, waitTimeOutTask])
}

/**
Expand Down
Expand Up @@ -63,16 +63,11 @@ class SimpleWorkflowSpec extends Specification {
@Shared
def INTEGRATION_TEST_WF_NON_RESTARTABLE = "integration_test_wf_non_restartable"

@Shared
def WORKFLOW_WITH_OPTIONAL_TASK = "optional_task_wf"


def setup() {
//Register LINEAR_WORKFLOW_T1_T2, TEST_WORKFLOW, RTOWF, WORKFLOW_WITH_OPTIONAL_TASK
//Register LINEAR_WORKFLOW_T1_T2, RTOWF, WORKFLOW_WITH_OPTIONAL_TASK
workflowTestUtil.registerWorkflows('simple_workflow_1_integration_test.json',
'simple_workflow_3_integration_test.json',
'simple_workflow_with_resp_time_out_integration_test.json',
'simple_workflow_with_optional_task_integration_test.json')
'simple_workflow_with_resp_time_out_integration_test.json')
}

def cleanup() {
Expand Down Expand Up @@ -970,76 +965,4 @@ class SimpleWorkflowSpec extends Specification {
simpleWorkflowDefinition.restartable = true
metadataService.updateWorkflowDef(simpleWorkflowDefinition)
}

def "Test simple workflow which has an optional task"() {

given:"A input parameters for a workflow with an optional task"
def correlationId = 'integration_test'+UUID.randomUUID().toString()
def workflowInput = new HashMap()
workflowInput['param1'] = 'p1 value'
workflowInput['param2'] = 'p2 value'

when:"An optional task workflow is started"
def workflowInstanceId = workflowExecutor.startWorkflow(WORKFLOW_WITH_OPTIONAL_TASK, 1,
correlationId, workflowInput,
null, null, null)

then:"verify that the workflow has started and the optional task is in a scheduled state"
workflowInstanceId
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].status == Task.Status.SCHEDULED
tasks[0].taskType == 'task_optional'
}

when:"The first optional task is polled and failed"
Tuple polledAndFailedTaskTry1 = workflowTestUtil.pollAndFailTask('task_optional',
'task1.integration.worker', 'NETWORK ERROR')

then:"Verify that the task_optional was polled and acknowledged"
verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry1)

when:"A decide is executed on the workflow"
workflowExecutor.decide(workflowInstanceId)

then:"verify that the workflow is still running and the first optional task has failed and the retry has kicked in"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 2
tasks[0].status == Task.Status.FAILED
tasks[0].taskType == 'task_optional'
tasks[1].status == Task.Status.SCHEDULED
tasks[1].taskType == 'task_optional'
}

when:"Poll the optional task again and do not complete it and run decide"
workflowExecutionService.poll('task_optional', 'task1.integration.worker')
Thread.sleep(5000)
workflowExecutor.decide(workflowInstanceId)

then:"Ensure that the workflow is updated"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 3
tasks[1].status == Task.Status.COMPLETED_WITH_ERRORS
tasks[1].taskType == 'task_optional'
tasks[2].status == Task.Status.SCHEDULED
tasks[2].taskType == 'integration_task_2'
}

when:"The second task 'integration_task_2' is polled and completed"
def task2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker')

then:"Verify that the task was polled and acknowledged"
verifyPolledAndAcknowledgedTask(task2Try1)

and:"Ensure that the workflow is in completed state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 3
tasks[2].status == Task.Status.COMPLETED
tasks[2].taskType == 'integration_task_2'
}
}
}

0 comments on commit b2332d0

Please sign in to comment.