Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
external payload storage specs and features
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Jul 13, 2020
1 parent b086bdc commit f541674
Show file tree
Hide file tree
Showing 5 changed files with 898 additions and 16 deletions.
Expand Up @@ -32,6 +32,8 @@ import javax.annotation.PostConstruct
import javax.inject.Inject
import javax.inject.Singleton

import static com.netflix.conductor.common.metadata.tasks.Task.Status.COMPLETED

/**
* This is a helper class used to initialize task definitions required by the tests when loaded up.
* The task definitions that are loaded up in {@link WorkflowTestUtil#taskDefinitions()} method as part of the post construct of the bean.
Expand Down Expand Up @@ -120,7 +122,15 @@ class WorkflowTestUtil {
waitTimeOutTask.timeoutPolicy = TaskDef.TimeoutPolicy.RETRY
waitTimeOutTask.retryDelaySeconds = 10

metadataService.registerTaskDef([taskWithResponseTimeOut, optionalTask, simpleSubWorkflowTask, subWorkflowTask, waitTimeOutTask])
TaskDef userTask = new TaskDef()
userTask.setName("user_task")
userTask.setTimeoutSeconds(20)
userTask.setRetryCount(1)
userTask.setTimeoutPolicy(TaskDef.TimeoutPolicy.RETRY)
userTask.setRetryDelaySeconds(10)

metadataService.registerTaskDef([taskWithResponseTimeOut, optionalTask, simpleSubWorkflowTask,
subWorkflowTask, waitTimeOutTask, userTask])
}

/**
Expand Down Expand Up @@ -229,7 +239,7 @@ class WorkflowTestUtil {
Tuple pollAndCompleteTask(String taskName, String workerId, Map<String, Object> outputParams = null, int waitAtEndSeconds = 0) {
def polledIntegrationTask = workflowExecutionService.poll(taskName, workerId)
def ackPolledIntegrationTask = workflowExecutionService.ackTaskReceived(polledIntegrationTask.taskId)
polledIntegrationTask.status = Task.Status.COMPLETED
polledIntegrationTask.status = COMPLETED
if (outputParams) {
outputParams.forEach { k, v ->
polledIntegrationTask.outputData[k] = v
Expand All @@ -239,6 +249,17 @@ class WorkflowTestUtil {
return waitAtEndSecondsAndReturn(waitAtEndSeconds, polledIntegrationTask, ackPolledIntegrationTask)
}

Tuple pollAndCompleteLargePayloadTask(String taskName, String workerId, String outputPayloadPath) {
def polledIntegrationTask = workflowExecutionService.poll(taskName, workerId)
def ackPolledIntegrationTask = workflowExecutionService.ackTaskReceived(polledIntegrationTask.taskId)
polledIntegrationTask.status = COMPLETED
polledIntegrationTask.outputData = null
polledIntegrationTask.externalOutputPayloadStoragePath = outputPayloadPath
polledIntegrationTask.status = COMPLETED
workflowExecutionService.updateTask(polledIntegrationTask)
return new Tuple(polledIntegrationTask, ackPolledIntegrationTask)
}

/**
* A helper method intended to be used in the <tt>then:</tt> block of the spock test feature, ideally intended to be called after either:
* pollAndCompleteTask function or pollAndFailTask function
Expand All @@ -259,4 +280,12 @@ class WorkflowTestUtil {
}
}
}

static void verifyPolledAndAcknowledgedLargePayloadTask(Tuple completedTaskAndAck) {
assert completedTaskAndAck[0] : "The task polled cannot be null"
def polledIntegrationTask = completedTaskAndAck[0] as Task
def ackPolledIntegrationTask = completedTaskAndAck[1] as boolean
assert polledIntegrationTask
assert ackPolledIntegrationTask
}
}

0 comments on commit f541674

Please sign in to comment.