From e39c3d241a569fa2778d33c3c3957b9be773b4bc Mon Sep 17 00:00:00 2001 From: Anoop Panicker Date: Tue, 4 Aug 2020 12:59:10 -0700 Subject: [PATCH] extend rate limit feature to simple tasks; fix start time for lambda task --- .../execution/mapper/LambdaTaskMapper.java | 64 ++++++------ .../execution/mapper/SimpleTaskMapper.java | 29 +++--- .../netflix/conductor/metrics/Monitors.java | 46 ++++----- .../conductor/service/ExecutionService.java | 10 +- .../test/util/WorkflowTestUtil.groovy | 9 +- .../test/TaskLimitsWorkflowSpec.groovy | 99 ++++++++++++++++--- ...simple_task_workflow_integration_test.json | 26 +++++ ...ystem_task_workflow_integration_test.json} | 2 +- 8 files changed, 190 insertions(+), 95 deletions(-) create mode 100644 test-harness/src/test/resources/rate_limited_simple_task_workflow_integration_test.json rename test-harness/src/test/resources/{rate_limited_task_workflow_integration_test.json => rate_limited_system_task_workflow_integration_test.json} (92%) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/LambdaTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/LambdaTaskMapper.java index 4271745436..103415bbfc 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/LambdaTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/LambdaTaskMapper.java @@ -1,19 +1,15 @@ -/** - * Copyright 2018 Netflix, Inc. +/* + * Copyright 2020 Netflix, Inc. *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ - package com.netflix.conductor.core.execution.mapper; import com.netflix.conductor.common.metadata.tasks.Task; @@ -21,51 +17,51 @@ import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.core.execution.ParametersUtils; import com.netflix.conductor.core.execution.tasks.Lambda; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author x-ultra */ public class LambdaTaskMapper implements TaskMapper { - public static final Logger logger = LoggerFactory.getLogger(LambdaTaskMapper.class); - private ParametersUtils parametersUtils; + public static final Logger LOGGER = LoggerFactory.getLogger(LambdaTaskMapper.class); + private final ParametersUtils parametersUtils; public LambdaTaskMapper(ParametersUtils parametersUtils) { this.parametersUtils = parametersUtils; } - @Override public List getMappedTasks(TaskMapperContext taskMapperContext) { - logger.debug("TaskMapperContext {} in LambdaTaskMapper", taskMapperContext); + LOGGER.debug("TaskMapperContext {} in LambdaTaskMapper", taskMapperContext); WorkflowTask taskToSchedule = taskMapperContext.getTaskToSchedule(); Workflow workflowInstance = taskMapperContext.getWorkflowInstance(); String taskId = taskMapperContext.getTaskId(); - Map taskInput = parametersUtils.getTaskInputV2(taskMapperContext.getTaskToSchedule().getInputParameters(), workflowInstance, taskId, null); + Map taskInput = parametersUtils + .getTaskInputV2(taskMapperContext.getTaskToSchedule().getInputParameters(), workflowInstance, taskId, null); - Task task = new Task(); - task.setTaskType(Lambda.TASK_NAME); - task.setTaskDefName(taskMapperContext.getTaskToSchedule().getName()); - task.setReferenceTaskName(taskMapperContext.getTaskToSchedule().getTaskReferenceName()); - task.setWorkflowInstanceId(workflowInstance.getWorkflowId()); - task.setWorkflowType(workflowInstance.getWorkflowName()); - task.setCorrelationId(workflowInstance.getCorrelationId()); - task.setScheduledTime(System.currentTimeMillis()); - task.setInputData(taskInput); - task.setTaskId(taskId); - task.setStatus(Task.Status.IN_PROGRESS); - task.setWorkflowTask(taskToSchedule); - task.setWorkflowPriority(workflowInstance.getPriority()); + Task lambdaTask = new Task(); + lambdaTask.setTaskType(Lambda.TASK_NAME); + lambdaTask.setTaskDefName(taskMapperContext.getTaskToSchedule().getName()); + lambdaTask.setReferenceTaskName(taskMapperContext.getTaskToSchedule().getTaskReferenceName()); + lambdaTask.setWorkflowInstanceId(workflowInstance.getWorkflowId()); + lambdaTask.setWorkflowType(workflowInstance.getWorkflowName()); + lambdaTask.setCorrelationId(workflowInstance.getCorrelationId()); + lambdaTask.setStartTime(System.currentTimeMillis()); + lambdaTask.setScheduledTime(System.currentTimeMillis()); + lambdaTask.setInputData(taskInput); + lambdaTask.setTaskId(taskId); + lambdaTask.setStatus(Task.Status.IN_PROGRESS); + lambdaTask.setWorkflowTask(taskToSchedule); + lambdaTask.setWorkflowPriority(workflowInstance.getPriority()); - return Arrays.asList(task); + return Collections.singletonList(lambdaTask); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SimpleTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SimpleTaskMapper.java index 559c979897..21f1f40a5c 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SimpleTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SimpleTaskMapper.java @@ -1,20 +1,15 @@ -/** - * Copyright 2018 Netflix, Inc. +/* + * Copyright 2020 Netflix, Inc. *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ - - package com.netflix.conductor.core.execution.mapper; import com.netflix.conductor.common.metadata.tasks.Task; @@ -25,14 +20,12 @@ import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.core.execution.ParametersUtils; import com.netflix.conductor.core.execution.TerminateWorkflowException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link TaskType#SIMPLE} @@ -42,7 +35,7 @@ public class SimpleTaskMapper implements TaskMapper { public static final Logger logger = LoggerFactory.getLogger(SimpleTaskMapper.class); - private ParametersUtils parametersUtils; + private final ParametersUtils parametersUtils; public SimpleTaskMapper(ParametersUtils parametersUtils) { this.parametersUtils = parametersUtils; @@ -91,6 +84,8 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) throws Ter simpleTask.setWorkflowTask(taskToSchedule); simpleTask.setRetriedTaskId(retriedTaskId); simpleTask.setWorkflowPriority(workflowInstance.getPriority()); + simpleTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency()); + simpleTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds()); return Collections.singletonList(simpleTask); } } diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index 7278bbdb3f..686b706f48 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -1,17 +1,14 @@ /* * Copyright 2020 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package com.netflix.conductor.metrics; @@ -21,6 +18,7 @@ import com.netflix.servo.monitor.BasicStopwatch; import com.netflix.servo.monitor.Stopwatch; import com.netflix.spectator.api.Counter; +import com.netflix.spectator.api.Gauge; import com.netflix.spectator.api.Id; import com.netflix.spectator.api.Registry; import com.netflix.spectator.api.Spectator; @@ -30,7 +28,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** * @author Viren @@ -38,18 +35,17 @@ */ public class Monitors { - private static Registry registry = Spectator.globalRegistry(); + private static final Registry registry = Spectator.globalRegistry(); - private static Map, Counter>> counters = new ConcurrentHashMap<>(); + private static final Map, Counter>> counters = new ConcurrentHashMap<>(); - private static Map, PercentileTimer>> timers = new ConcurrentHashMap<>(); + private static final Map, PercentileTimer>> timers = new ConcurrentHashMap<>(); - private static Map, AtomicLong>> gauges = new ConcurrentHashMap<>(); + private static final Map, Gauge>> gauges = new ConcurrentHashMap<>(); public static final String classQualifier = "WorkflowMonitor"; private Monitors() { - } /** @@ -91,10 +87,10 @@ private static void counter(String className, String name, String... additionalT * @param additionalTags */ private static void gauge(String className, String name, long measurement, String... additionalTags) { - getGauge(className, name, additionalTags).getAndSet(measurement); + getGauge(className, name, additionalTags).set(measurement); } - public static Timer getTimer(String className, String name, String... additionalTags) { + private static Timer getTimer(String className, String name, String... additionalTags) { Map tags = toMap(className, additionalTags); return timers.computeIfAbsent(name, s -> new ConcurrentHashMap<>()).computeIfAbsent(tags, t -> { Id id = registry.createId(name, tags); @@ -111,12 +107,12 @@ private static Counter getCounter(String className, String name, String... addit }); } - private static AtomicLong getGauge(String className, String name, String... additionalTags) { + private static Gauge getGauge(String className, String name, String... additionalTags) { Map tags = toMap(className, additionalTags); return gauges.computeIfAbsent(name, s -> new ConcurrentHashMap<>()).computeIfAbsent(tags, t -> { Id id = registry.createId(name, tags); - return registry.gauge(id, new AtomicLong(0)); + return registry.gauge(id); }); } @@ -291,7 +287,7 @@ public static void recordESIndexTime(String action, String docType, long val) { } public static void recordWorkerQueueSize(String queueType, int val) { - getGauge(Monitors.classQualifier, "indexing_worker_queue", "queueType", queueType).set(val); + gauge(Monitors.classQualifier, "indexing_worker_queue", val, "queueType", queueType); } public static void recordDiscardedIndexingCount(String queueType) { @@ -311,7 +307,7 @@ public static void recordWorkflowArchived(String workflowType, WorkflowStatus st } public static void recordArchivalDelayQueueSize(int val) { - getGauge(classQualifier, "workflow_archival_delay_queue_size").set(val); + gauge(classQualifier, "workflow_archival_delay_queue_size", val); } public static void recordDiscardedArchivalCount() { counter(classQualifier, "discarded_archival_count"); @@ -322,6 +318,6 @@ public static void recordSystemTaskWorkerPollingLimited(String queueName) { } public static void recordEventQueuePollSize(String queueType, int val) { - getGauge(Monitors.classQualifier, "event_queue_poll", "queueType", queueType).set(val); + gauge(Monitors.classQualifier, "event_queue_poll", val, "queueType", queueType); } } diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index e518296570..6dc67cf50b 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -17,6 +17,7 @@ import com.netflix.conductor.common.metadata.tasks.PollData; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.Task.Status; +import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.tasks.TaskExecLog; import com.netflix.conductor.common.metadata.tasks.TaskResult; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; @@ -143,11 +144,18 @@ public List poll(String taskType, String workerId, String domain, int coun } if (executionDAOFacade.exceedsInProgressLimit(task)) { - // Postpone a message, so that it would be available for poll again. + // Postpone this message, so that it would be available for poll again. queueDAO.postpone(queueName, taskId, task.getWorkflowPriority(), queueTaskMessagePostponeSeconds); logger.debug("Postponed task: {} in queue: {} by {} seconds", taskId, queueName, queueTaskMessagePostponeSeconds); continue; } + TaskDef taskDef = task.getTaskDefinition().isPresent() ? task.getTaskDefinition().get() : null; + if (task.getRateLimitPerFrequency() > 0 && executionDAOFacade.exceedsRateLimitPerFrequency(task, taskDef)) { + // Postpone this message, so that it would be available for poll again. + queueDAO.postpone(queueName, taskId, task.getWorkflowPriority(), queueTaskMessagePostponeSeconds); + logger.debug("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(), task.getRateLimitPerFrequency()); + continue; + } task.setStatus(Status.IN_PROGRESS); if (task.getStartTime() == 0) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy index c4528f9e4c..d0db05719c 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy @@ -136,9 +136,14 @@ class WorkflowTestUtil { TaskDef rateLimitedTask = new TaskDef() rateLimitedTask.name = 'test_task_with_rateLimits' - rateLimitedTask.rateLimitFrequencyInSeconds = 600 + rateLimitedTask.rateLimitFrequencyInSeconds = 10 rateLimitedTask.rateLimitPerFrequency = 1 + TaskDef rateLimitedSimpleTask = new TaskDef() + rateLimitedSimpleTask.name = 'test_simple_task_with_rateLimits' + rateLimitedSimpleTask.rateLimitFrequencyInSeconds = 10 + rateLimitedSimpleTask.rateLimitPerFrequency = 1 + TaskDef eventTaskX = new TaskDef() eventTaskX.name = 'eventX' eventTaskX.timeoutSeconds = 1 @@ -146,7 +151,7 @@ class WorkflowTestUtil { metadataService.registerTaskDef( [taskWithResponseTimeOut, optionalTask, simpleSubWorkflowTask, subWorkflowTask, waitTimeOutTask, userTask, eventTaskX, - rateLimitedTask, concurrentExecutionLimitedTask] + rateLimitedTask, rateLimitedSimpleTask, concurrentExecutionLimitedTask] ) } diff --git a/test-harness/src/test/groovy/com/netflix/counductor/integration/test/TaskLimitsWorkflowSpec.groovy b/test-harness/src/test/groovy/com/netflix/counductor/integration/test/TaskLimitsWorkflowSpec.groovy index 9b66dfa525..65263074c3 100644 --- a/test-harness/src/test/groovy/com/netflix/counductor/integration/test/TaskLimitsWorkflowSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/counductor/integration/test/TaskLimitsWorkflowSpec.groovy @@ -28,6 +28,8 @@ import spock.lang.Specification import javax.inject.Inject +import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask + @ModulesForTesting([TestModule.class]) class TaskLimitsWorkflowSpec extends Specification { @@ -43,13 +45,17 @@ class TaskLimitsWorkflowSpec extends Specification { @Inject QueueDAO queueDAO - def RATE_LIMITED_WORKFLOW = 'test_rate_limit_task_workflow' + @Inject + UserTask userTask + def RATE_LIMITED_SYSTEM_TASK_WORKFLOW = 'test_rate_limit_system_task_workflow' + def RATE_LIMITED_SIMPLE_TASK_WORKFLOW = 'test_rate_limit_simple_task_workflow' def CONCURRENCY_EXECUTION_LIMITED_WORKFLOW = 'test_concurrency_limits_workflow' def setup() { workflowTestUtil.registerWorkflows( - 'rate_limited_task_workflow_integration_test.json', + 'rate_limited_system_task_workflow_integration_test.json', + 'rate_limited_simple_task_workflow_integration_test.json', 'concurrency_limited_task_workflow_integration_test.json' ) } @@ -58,9 +64,9 @@ class TaskLimitsWorkflowSpec extends Specification { workflowTestUtil.clearWorkflows() } - def "Verify that the rate limiting of the tasks is honored"() { - when: "Start a workflow that has a rate limited task in it" - def workflowInstanceId = workflowExecutor.startWorkflow(RATE_LIMITED_WORKFLOW, 1, + def "Verify that the rate limiting for system tasks is honored"() { + when: "Start a workflow that has a rate limited system task in it" + def workflowInstanceId = workflowExecutor.startWorkflow(RATE_LIMITED_SYSTEM_TASK_WORKFLOW, 1, '', [:], null, null, null) then: "verify that the workflow is in a running state" @@ -73,7 +79,6 @@ class TaskLimitsWorkflowSpec extends Specification { when: "Execute the user task" def scheduledTask1 = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).tasks[0] - def userTask = new UserTask() workflowExecutor.executeSystemTask(userTask, scheduledTask1.taskId, 30) then: "Verify the state of the workflow is completed" @@ -85,7 +90,7 @@ class TaskLimitsWorkflowSpec extends Specification { } when: "A new instance of the workflow is started" - def workflowTwoInstanceId = workflowExecutor.startWorkflow(RATE_LIMITED_WORKFLOW, 1, + def workflowTwoInstanceId = workflowExecutor.startWorkflow(RATE_LIMITED_SYSTEM_TASK_WORKFLOW, 1, '', [:], null, null, null) then: "verify that the workflow is in a running state" @@ -107,7 +112,73 @@ class TaskLimitsWorkflowSpec extends Specification { tasks[0].taskType == 'USER_TASK' tasks[0].status == Task.Status.SCHEDULED } + } + + def "Verify that the rate limiting for simple tasks is honored"() { + when: "Start a workflow that has a rate limited simple task in it" + def workflowInstanceId = workflowExecutor.startWorkflow(RATE_LIMITED_SIMPLE_TASK_WORKFLOW, 1, '', [:], null, + null, null) + + then: "verify that the workflow is in a running state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].taskType == 'test_simple_task_with_rateLimits' + tasks[0].status == Task.Status.SCHEDULED + } + + when: "polling and completing the task" + Tuple polledAndCompletedTask = workflowTestUtil.pollAndCompleteTask('test_simple_task_with_rateLimits', 'rate.limit.test.worker') + + then: "verify that the task was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndCompletedTask) + + and: "the workflow is completed" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 1 + tasks[0].taskType == 'test_simple_task_with_rateLimits' + tasks[0].status == Task.Status.COMPLETED + } + + when: "A new instance of the workflow is started" + def workflowTwoInstanceId = workflowExecutor.startWorkflow(RATE_LIMITED_SIMPLE_TASK_WORKFLOW, 1, + '', [:], null, null, null) + then: "verify that the workflow is in a running state" + with(workflowExecutionService.getExecutionStatus(workflowTwoInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].taskType == 'test_simple_task_with_rateLimits' + tasks[0].status == Task.Status.SCHEDULED + } + + when: "polling for the task" + def polledTask = workflowExecutionService.poll('test_simple_task_with_rateLimits', 'rate.limit.test.worker') + + then: "verify that no task is returned" + !polledTask + + when: "sleep for 10 seconds to ensure rate limit duration is past" + Thread.sleep(10000L) + + and: "the task offset time is reset to ensure that a task is returned on the next poll" + queueDAO.resetOffsetTime('test_simple_task_with_rateLimits', + workflowExecutionService.getExecutionStatus(workflowTwoInstanceId, true).tasks[0].taskId) + + and: "polling and completing the task" + polledAndCompletedTask = workflowTestUtil.pollAndCompleteTask('test_simple_task_with_rateLimits', 'rate.limit.test.worker') + + then: "verify that the task was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndCompletedTask) + + and: "the workflow is completed" + with(workflowExecutionService.getExecutionStatus(workflowTwoInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 1 + tasks[0].taskType == 'test_simple_task_with_rateLimits' + tasks[0].status == Task.Status.COMPLETED + } } def "Verify that concurrency limited tasks are honored during workflow execution"() { @@ -128,7 +199,7 @@ class TaskLimitsWorkflowSpec extends Specification { def polledTask1 = workflowExecutionService.poll('test_task_with_concurrency_limit', 'test_task_worker') def ackPolledTask1 = workflowExecutionService.ackTaskReceived(polledTask1.taskId) - then:"Verify that the task was polled and acknowledged" + then: "Verify that the task was polled and acknowledged" polledTask1.taskType == 'test_task_with_concurrency_limit' polledTask1.workflowInstanceId == workflowInstanceId ackPolledTask1 @@ -148,10 +219,10 @@ class TaskLimitsWorkflowSpec extends Specification { when: "The task is polled" def polledTaskTry1 = workflowExecutionService.poll('test_task_with_concurrency_limit', 'test_task_worker') - then:"Verify that there is no task returned" + then: "Verify that there is no task returned" !polledTaskTry1 - when:"The task that was polled and acknowledged is completed" + when: "The task that was polled and acknowledged is completed" polledTask1.status = Task.Status.COMPLETED workflowExecutionService.updateTask(polledTask1) @@ -159,7 +230,7 @@ class TaskLimitsWorkflowSpec extends Specification { queueDAO.resetOffsetTime('test_task_with_concurrency_limit', workflowExecutionService.getExecutionStatus(workflowTwoInstanceId, true).tasks[0].taskId) - then:"Verify that the first workflow is in a completed state" + then: "Verify that the first workflow is in a completed state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.COMPLETED tasks.size() == 1 @@ -167,15 +238,13 @@ class TaskLimitsWorkflowSpec extends Specification { tasks[0].status == Task.Status.COMPLETED } - and:"The task is polled again and acknowledged" + and: "The task is polled again and acknowledged" def polledTaskTry2 = workflowExecutionService.poll('test_task_with_concurrency_limit', 'test_task_worker') def ackPolledTaskTry2 = workflowExecutionService.ackTaskReceived(polledTaskTry2.taskId) - then:"Verify that the task is returned since there are no tasks in progress" + then: "Verify that the task is returned since there are no tasks in progress" polledTaskTry2.taskType == 'test_task_with_concurrency_limit' polledTaskTry2.workflowInstanceId == workflowTwoInstanceId ackPolledTaskTry2 - } - } diff --git a/test-harness/src/test/resources/rate_limited_simple_task_workflow_integration_test.json b/test-harness/src/test/resources/rate_limited_simple_task_workflow_integration_test.json new file mode 100644 index 0000000000..61295599e3 --- /dev/null +++ b/test-harness/src/test/resources/rate_limited_simple_task_workflow_integration_test.json @@ -0,0 +1,26 @@ +{ + "name" : "test_rate_limit_simple_task_workflow", + "version" : 1, + "tasks" : [ { + "name" : "test_simple_task_with_rateLimits", + "taskReferenceName" : "test_simple_task_with_rateLimits", + "inputParameters" : { }, + "type" : "SIMPLE", + "decisionCases" : { }, + "defaultCase" : [ ], + "forkTasks" : [ ], + "startDelay" : 0, + "joinOn" : [ ], + "optional" : false, + "defaultExclusiveJoinTask" : [ ], + "asyncComplete" : false, + "loopOver" : [ ] + } ], + "inputParameters" : [ ], + "outputParameters" : { }, + "schemaVersion" : 2, + "restartable" : true, + "workflowStatusListenerEnabled" : false, + "timeoutPolicy" : "ALERT_ONLY", + "timeoutSeconds" : 0 +} \ No newline at end of file diff --git a/test-harness/src/test/resources/rate_limited_task_workflow_integration_test.json b/test-harness/src/test/resources/rate_limited_system_task_workflow_integration_test.json similarity index 92% rename from test-harness/src/test/resources/rate_limited_task_workflow_integration_test.json rename to test-harness/src/test/resources/rate_limited_system_task_workflow_integration_test.json index 08ce757898..465d880339 100644 --- a/test-harness/src/test/resources/rate_limited_task_workflow_integration_test.json +++ b/test-harness/src/test/resources/rate_limited_system_task_workflow_integration_test.json @@ -1,5 +1,5 @@ { - "name" : "test_rate_limit_task_workflow", + "name" : "test_rate_limit_system_task_workflow", "version" : 1, "tasks" : [ { "name" : "test_task_with_rateLimits",