diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/LambdaTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/LambdaTaskMapper.java
index 4271745436..103415bbfc 100644
--- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/LambdaTaskMapper.java
+++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/LambdaTaskMapper.java
@@ -1,19 +1,15 @@
-/**
- * Copyright 2018 Netflix, Inc.
+/*
+ * Copyright 2020 Netflix, Inc.
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-
package com.netflix.conductor.core.execution.mapper;
import com.netflix.conductor.common.metadata.tasks.Task;
@@ -21,51 +17,51 @@
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.execution.tasks.Lambda;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author x-ultra
*/
public class LambdaTaskMapper implements TaskMapper {
- public static final Logger logger = LoggerFactory.getLogger(LambdaTaskMapper.class);
- private ParametersUtils parametersUtils;
+ public static final Logger LOGGER = LoggerFactory.getLogger(LambdaTaskMapper.class);
+ private final ParametersUtils parametersUtils;
public LambdaTaskMapper(ParametersUtils parametersUtils) {
this.parametersUtils = parametersUtils;
}
-
@Override
public List getMappedTasks(TaskMapperContext taskMapperContext) {
- logger.debug("TaskMapperContext {} in LambdaTaskMapper", taskMapperContext);
+ LOGGER.debug("TaskMapperContext {} in LambdaTaskMapper", taskMapperContext);
WorkflowTask taskToSchedule = taskMapperContext.getTaskToSchedule();
Workflow workflowInstance = taskMapperContext.getWorkflowInstance();
String taskId = taskMapperContext.getTaskId();
- Map taskInput = parametersUtils.getTaskInputV2(taskMapperContext.getTaskToSchedule().getInputParameters(), workflowInstance, taskId, null);
+ Map taskInput = parametersUtils
+ .getTaskInputV2(taskMapperContext.getTaskToSchedule().getInputParameters(), workflowInstance, taskId, null);
- Task task = new Task();
- task.setTaskType(Lambda.TASK_NAME);
- task.setTaskDefName(taskMapperContext.getTaskToSchedule().getName());
- task.setReferenceTaskName(taskMapperContext.getTaskToSchedule().getTaskReferenceName());
- task.setWorkflowInstanceId(workflowInstance.getWorkflowId());
- task.setWorkflowType(workflowInstance.getWorkflowName());
- task.setCorrelationId(workflowInstance.getCorrelationId());
- task.setScheduledTime(System.currentTimeMillis());
- task.setInputData(taskInput);
- task.setTaskId(taskId);
- task.setStatus(Task.Status.IN_PROGRESS);
- task.setWorkflowTask(taskToSchedule);
- task.setWorkflowPriority(workflowInstance.getPriority());
+ Task lambdaTask = new Task();
+ lambdaTask.setTaskType(Lambda.TASK_NAME);
+ lambdaTask.setTaskDefName(taskMapperContext.getTaskToSchedule().getName());
+ lambdaTask.setReferenceTaskName(taskMapperContext.getTaskToSchedule().getTaskReferenceName());
+ lambdaTask.setWorkflowInstanceId(workflowInstance.getWorkflowId());
+ lambdaTask.setWorkflowType(workflowInstance.getWorkflowName());
+ lambdaTask.setCorrelationId(workflowInstance.getCorrelationId());
+ lambdaTask.setStartTime(System.currentTimeMillis());
+ lambdaTask.setScheduledTime(System.currentTimeMillis());
+ lambdaTask.setInputData(taskInput);
+ lambdaTask.setTaskId(taskId);
+ lambdaTask.setStatus(Task.Status.IN_PROGRESS);
+ lambdaTask.setWorkflowTask(taskToSchedule);
+ lambdaTask.setWorkflowPriority(workflowInstance.getPriority());
- return Arrays.asList(task);
+ return Collections.singletonList(lambdaTask);
}
}
diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SimpleTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SimpleTaskMapper.java
index 559c979897..21f1f40a5c 100644
--- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SimpleTaskMapper.java
+++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SimpleTaskMapper.java
@@ -1,20 +1,15 @@
-/**
- * Copyright 2018 Netflix, Inc.
+/*
+ * Copyright 2020 Netflix, Inc.
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-
-
package com.netflix.conductor.core.execution.mapper;
import com.netflix.conductor.common.metadata.tasks.Task;
@@ -25,14 +20,12 @@
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.execution.TerminateWorkflowException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link TaskType#SIMPLE}
@@ -42,7 +35,7 @@ public class SimpleTaskMapper implements TaskMapper {
public static final Logger logger = LoggerFactory.getLogger(SimpleTaskMapper.class);
- private ParametersUtils parametersUtils;
+ private final ParametersUtils parametersUtils;
public SimpleTaskMapper(ParametersUtils parametersUtils) {
this.parametersUtils = parametersUtils;
@@ -91,6 +84,8 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) throws Ter
simpleTask.setWorkflowTask(taskToSchedule);
simpleTask.setRetriedTaskId(retriedTaskId);
simpleTask.setWorkflowPriority(workflowInstance.getPriority());
+ simpleTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
+ simpleTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds());
return Collections.singletonList(simpleTask);
}
}
diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java
index 7278bbdb3f..686b706f48 100644
--- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java
+++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java
@@ -1,17 +1,14 @@
/*
* Copyright 2020 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.metrics;
@@ -21,6 +18,7 @@
import com.netflix.servo.monitor.BasicStopwatch;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.spectator.api.Counter;
+import com.netflix.spectator.api.Gauge;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
@@ -30,7 +28,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
/**
* @author Viren
@@ -38,18 +35,17 @@
*/
public class Monitors {
- private static Registry registry = Spectator.globalRegistry();
+ private static final Registry registry = Spectator.globalRegistry();
- private static Map, Counter>> counters = new ConcurrentHashMap<>();
+ private static final Map, Counter>> counters = new ConcurrentHashMap<>();
- private static Map, PercentileTimer>> timers = new ConcurrentHashMap<>();
+ private static final Map, PercentileTimer>> timers = new ConcurrentHashMap<>();
- private static Map, AtomicLong>> gauges = new ConcurrentHashMap<>();
+ private static final Map, Gauge>> gauges = new ConcurrentHashMap<>();
public static final String classQualifier = "WorkflowMonitor";
private Monitors() {
-
}
/**
@@ -91,10 +87,10 @@ private static void counter(String className, String name, String... additionalT
* @param additionalTags
*/
private static void gauge(String className, String name, long measurement, String... additionalTags) {
- getGauge(className, name, additionalTags).getAndSet(measurement);
+ getGauge(className, name, additionalTags).set(measurement);
}
- public static Timer getTimer(String className, String name, String... additionalTags) {
+ private static Timer getTimer(String className, String name, String... additionalTags) {
Map tags = toMap(className, additionalTags);
return timers.computeIfAbsent(name, s -> new ConcurrentHashMap<>()).computeIfAbsent(tags, t -> {
Id id = registry.createId(name, tags);
@@ -111,12 +107,12 @@ private static Counter getCounter(String className, String name, String... addit
});
}
- private static AtomicLong getGauge(String className, String name, String... additionalTags) {
+ private static Gauge getGauge(String className, String name, String... additionalTags) {
Map tags = toMap(className, additionalTags);
return gauges.computeIfAbsent(name, s -> new ConcurrentHashMap<>()).computeIfAbsent(tags, t -> {
Id id = registry.createId(name, tags);
- return registry.gauge(id, new AtomicLong(0));
+ return registry.gauge(id);
});
}
@@ -291,7 +287,7 @@ public static void recordESIndexTime(String action, String docType, long val) {
}
public static void recordWorkerQueueSize(String queueType, int val) {
- getGauge(Monitors.classQualifier, "indexing_worker_queue", "queueType", queueType).set(val);
+ gauge(Monitors.classQualifier, "indexing_worker_queue", val, "queueType", queueType);
}
public static void recordDiscardedIndexingCount(String queueType) {
@@ -311,7 +307,7 @@ public static void recordWorkflowArchived(String workflowType, WorkflowStatus st
}
public static void recordArchivalDelayQueueSize(int val) {
- getGauge(classQualifier, "workflow_archival_delay_queue_size").set(val);
+ gauge(classQualifier, "workflow_archival_delay_queue_size", val);
}
public static void recordDiscardedArchivalCount() {
counter(classQualifier, "discarded_archival_count");
@@ -322,6 +318,6 @@ public static void recordSystemTaskWorkerPollingLimited(String queueName) {
}
public static void recordEventQueuePollSize(String queueType, int val) {
- getGauge(Monitors.classQualifier, "event_queue_poll", "queueType", queueType).set(val);
+ gauge(Monitors.classQualifier, "event_queue_poll", val, "queueType", queueType);
}
}
diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java
index e518296570..6dc67cf50b 100644
--- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java
+++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java
@@ -17,6 +17,7 @@
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
+import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
@@ -143,11 +144,18 @@ public List poll(String taskType, String workerId, String domain, int coun
}
if (executionDAOFacade.exceedsInProgressLimit(task)) {
- // Postpone a message, so that it would be available for poll again.
+ // Postpone this message, so that it would be available for poll again.
queueDAO.postpone(queueName, taskId, task.getWorkflowPriority(), queueTaskMessagePostponeSeconds);
logger.debug("Postponed task: {} in queue: {} by {} seconds", taskId, queueName, queueTaskMessagePostponeSeconds);
continue;
}
+ TaskDef taskDef = task.getTaskDefinition().isPresent() ? task.getTaskDefinition().get() : null;
+ if (task.getRateLimitPerFrequency() > 0 && executionDAOFacade.exceedsRateLimitPerFrequency(task, taskDef)) {
+ // Postpone this message, so that it would be available for poll again.
+ queueDAO.postpone(queueName, taskId, task.getWorkflowPriority(), queueTaskMessagePostponeSeconds);
+ logger.debug("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(), task.getRateLimitPerFrequency());
+ continue;
+ }
task.setStatus(Status.IN_PROGRESS);
if (task.getStartTime() == 0) {
diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy
index c4528f9e4c..d0db05719c 100644
--- a/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy
+++ b/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy
@@ -136,9 +136,14 @@ class WorkflowTestUtil {
TaskDef rateLimitedTask = new TaskDef()
rateLimitedTask.name = 'test_task_with_rateLimits'
- rateLimitedTask.rateLimitFrequencyInSeconds = 600
+ rateLimitedTask.rateLimitFrequencyInSeconds = 10
rateLimitedTask.rateLimitPerFrequency = 1
+ TaskDef rateLimitedSimpleTask = new TaskDef()
+ rateLimitedSimpleTask.name = 'test_simple_task_with_rateLimits'
+ rateLimitedSimpleTask.rateLimitFrequencyInSeconds = 10
+ rateLimitedSimpleTask.rateLimitPerFrequency = 1
+
TaskDef eventTaskX = new TaskDef()
eventTaskX.name = 'eventX'
eventTaskX.timeoutSeconds = 1
@@ -146,7 +151,7 @@ class WorkflowTestUtil {
metadataService.registerTaskDef(
[taskWithResponseTimeOut, optionalTask, simpleSubWorkflowTask,
subWorkflowTask, waitTimeOutTask, userTask, eventTaskX,
- rateLimitedTask, concurrentExecutionLimitedTask]
+ rateLimitedTask, rateLimitedSimpleTask, concurrentExecutionLimitedTask]
)
}
diff --git a/test-harness/src/test/groovy/com/netflix/counductor/integration/test/TaskLimitsWorkflowSpec.groovy b/test-harness/src/test/groovy/com/netflix/counductor/integration/test/TaskLimitsWorkflowSpec.groovy
index 9b66dfa525..65263074c3 100644
--- a/test-harness/src/test/groovy/com/netflix/counductor/integration/test/TaskLimitsWorkflowSpec.groovy
+++ b/test-harness/src/test/groovy/com/netflix/counductor/integration/test/TaskLimitsWorkflowSpec.groovy
@@ -28,6 +28,8 @@ import spock.lang.Specification
import javax.inject.Inject
+import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask
+
@ModulesForTesting([TestModule.class])
class TaskLimitsWorkflowSpec extends Specification {
@@ -43,13 +45,17 @@ class TaskLimitsWorkflowSpec extends Specification {
@Inject
QueueDAO queueDAO
- def RATE_LIMITED_WORKFLOW = 'test_rate_limit_task_workflow'
+ @Inject
+ UserTask userTask
+ def RATE_LIMITED_SYSTEM_TASK_WORKFLOW = 'test_rate_limit_system_task_workflow'
+ def RATE_LIMITED_SIMPLE_TASK_WORKFLOW = 'test_rate_limit_simple_task_workflow'
def CONCURRENCY_EXECUTION_LIMITED_WORKFLOW = 'test_concurrency_limits_workflow'
def setup() {
workflowTestUtil.registerWorkflows(
- 'rate_limited_task_workflow_integration_test.json',
+ 'rate_limited_system_task_workflow_integration_test.json',
+ 'rate_limited_simple_task_workflow_integration_test.json',
'concurrency_limited_task_workflow_integration_test.json'
)
}
@@ -58,9 +64,9 @@ class TaskLimitsWorkflowSpec extends Specification {
workflowTestUtil.clearWorkflows()
}
- def "Verify that the rate limiting of the tasks is honored"() {
- when: "Start a workflow that has a rate limited task in it"
- def workflowInstanceId = workflowExecutor.startWorkflow(RATE_LIMITED_WORKFLOW, 1,
+ def "Verify that the rate limiting for system tasks is honored"() {
+ when: "Start a workflow that has a rate limited system task in it"
+ def workflowInstanceId = workflowExecutor.startWorkflow(RATE_LIMITED_SYSTEM_TASK_WORKFLOW, 1,
'', [:], null, null, null)
then: "verify that the workflow is in a running state"
@@ -73,7 +79,6 @@ class TaskLimitsWorkflowSpec extends Specification {
when: "Execute the user task"
def scheduledTask1 = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).tasks[0]
- def userTask = new UserTask()
workflowExecutor.executeSystemTask(userTask, scheduledTask1.taskId, 30)
then: "Verify the state of the workflow is completed"
@@ -85,7 +90,7 @@ class TaskLimitsWorkflowSpec extends Specification {
}
when: "A new instance of the workflow is started"
- def workflowTwoInstanceId = workflowExecutor.startWorkflow(RATE_LIMITED_WORKFLOW, 1,
+ def workflowTwoInstanceId = workflowExecutor.startWorkflow(RATE_LIMITED_SYSTEM_TASK_WORKFLOW, 1,
'', [:], null, null, null)
then: "verify that the workflow is in a running state"
@@ -107,7 +112,73 @@ class TaskLimitsWorkflowSpec extends Specification {
tasks[0].taskType == 'USER_TASK'
tasks[0].status == Task.Status.SCHEDULED
}
+ }
+
+ def "Verify that the rate limiting for simple tasks is honored"() {
+ when: "Start a workflow that has a rate limited simple task in it"
+ def workflowInstanceId = workflowExecutor.startWorkflow(RATE_LIMITED_SIMPLE_TASK_WORKFLOW, 1, '', [:], null,
+ null, null)
+
+ then: "verify that the workflow is in a running state"
+ with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
+ status == Workflow.WorkflowStatus.RUNNING
+ tasks.size() == 1
+ tasks[0].taskType == 'test_simple_task_with_rateLimits'
+ tasks[0].status == Task.Status.SCHEDULED
+ }
+
+ when: "polling and completing the task"
+ Tuple polledAndCompletedTask = workflowTestUtil.pollAndCompleteTask('test_simple_task_with_rateLimits', 'rate.limit.test.worker')
+
+ then: "verify that the task was polled and acknowledged"
+ verifyPolledAndAcknowledgedTask(polledAndCompletedTask)
+
+ and: "the workflow is completed"
+ with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
+ status == Workflow.WorkflowStatus.COMPLETED
+ tasks.size() == 1
+ tasks[0].taskType == 'test_simple_task_with_rateLimits'
+ tasks[0].status == Task.Status.COMPLETED
+ }
+
+ when: "A new instance of the workflow is started"
+ def workflowTwoInstanceId = workflowExecutor.startWorkflow(RATE_LIMITED_SIMPLE_TASK_WORKFLOW, 1,
+ '', [:], null, null, null)
+ then: "verify that the workflow is in a running state"
+ with(workflowExecutionService.getExecutionStatus(workflowTwoInstanceId, true)) {
+ status == Workflow.WorkflowStatus.RUNNING
+ tasks.size() == 1
+ tasks[0].taskType == 'test_simple_task_with_rateLimits'
+ tasks[0].status == Task.Status.SCHEDULED
+ }
+
+ when: "polling for the task"
+ def polledTask = workflowExecutionService.poll('test_simple_task_with_rateLimits', 'rate.limit.test.worker')
+
+ then: "verify that no task is returned"
+ !polledTask
+
+ when: "sleep for 10 seconds to ensure rate limit duration is past"
+ Thread.sleep(10000L)
+
+ and: "the task offset time is reset to ensure that a task is returned on the next poll"
+ queueDAO.resetOffsetTime('test_simple_task_with_rateLimits',
+ workflowExecutionService.getExecutionStatus(workflowTwoInstanceId, true).tasks[0].taskId)
+
+ and: "polling and completing the task"
+ polledAndCompletedTask = workflowTestUtil.pollAndCompleteTask('test_simple_task_with_rateLimits', 'rate.limit.test.worker')
+
+ then: "verify that the task was polled and acknowledged"
+ verifyPolledAndAcknowledgedTask(polledAndCompletedTask)
+
+ and: "the workflow is completed"
+ with(workflowExecutionService.getExecutionStatus(workflowTwoInstanceId, true)) {
+ status == Workflow.WorkflowStatus.COMPLETED
+ tasks.size() == 1
+ tasks[0].taskType == 'test_simple_task_with_rateLimits'
+ tasks[0].status == Task.Status.COMPLETED
+ }
}
def "Verify that concurrency limited tasks are honored during workflow execution"() {
@@ -128,7 +199,7 @@ class TaskLimitsWorkflowSpec extends Specification {
def polledTask1 = workflowExecutionService.poll('test_task_with_concurrency_limit', 'test_task_worker')
def ackPolledTask1 = workflowExecutionService.ackTaskReceived(polledTask1.taskId)
- then:"Verify that the task was polled and acknowledged"
+ then: "Verify that the task was polled and acknowledged"
polledTask1.taskType == 'test_task_with_concurrency_limit'
polledTask1.workflowInstanceId == workflowInstanceId
ackPolledTask1
@@ -148,10 +219,10 @@ class TaskLimitsWorkflowSpec extends Specification {
when: "The task is polled"
def polledTaskTry1 = workflowExecutionService.poll('test_task_with_concurrency_limit', 'test_task_worker')
- then:"Verify that there is no task returned"
+ then: "Verify that there is no task returned"
!polledTaskTry1
- when:"The task that was polled and acknowledged is completed"
+ when: "The task that was polled and acknowledged is completed"
polledTask1.status = Task.Status.COMPLETED
workflowExecutionService.updateTask(polledTask1)
@@ -159,7 +230,7 @@ class TaskLimitsWorkflowSpec extends Specification {
queueDAO.resetOffsetTime('test_task_with_concurrency_limit',
workflowExecutionService.getExecutionStatus(workflowTwoInstanceId, true).tasks[0].taskId)
- then:"Verify that the first workflow is in a completed state"
+ then: "Verify that the first workflow is in a completed state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 1
@@ -167,15 +238,13 @@ class TaskLimitsWorkflowSpec extends Specification {
tasks[0].status == Task.Status.COMPLETED
}
- and:"The task is polled again and acknowledged"
+ and: "The task is polled again and acknowledged"
def polledTaskTry2 = workflowExecutionService.poll('test_task_with_concurrency_limit', 'test_task_worker')
def ackPolledTaskTry2 = workflowExecutionService.ackTaskReceived(polledTaskTry2.taskId)
- then:"Verify that the task is returned since there are no tasks in progress"
+ then: "Verify that the task is returned since there are no tasks in progress"
polledTaskTry2.taskType == 'test_task_with_concurrency_limit'
polledTaskTry2.workflowInstanceId == workflowTwoInstanceId
ackPolledTaskTry2
-
}
-
}
diff --git a/test-harness/src/test/resources/rate_limited_simple_task_workflow_integration_test.json b/test-harness/src/test/resources/rate_limited_simple_task_workflow_integration_test.json
new file mode 100644
index 0000000000..61295599e3
--- /dev/null
+++ b/test-harness/src/test/resources/rate_limited_simple_task_workflow_integration_test.json
@@ -0,0 +1,26 @@
+{
+ "name" : "test_rate_limit_simple_task_workflow",
+ "version" : 1,
+ "tasks" : [ {
+ "name" : "test_simple_task_with_rateLimits",
+ "taskReferenceName" : "test_simple_task_with_rateLimits",
+ "inputParameters" : { },
+ "type" : "SIMPLE",
+ "decisionCases" : { },
+ "defaultCase" : [ ],
+ "forkTasks" : [ ],
+ "startDelay" : 0,
+ "joinOn" : [ ],
+ "optional" : false,
+ "defaultExclusiveJoinTask" : [ ],
+ "asyncComplete" : false,
+ "loopOver" : [ ]
+ } ],
+ "inputParameters" : [ ],
+ "outputParameters" : { },
+ "schemaVersion" : 2,
+ "restartable" : true,
+ "workflowStatusListenerEnabled" : false,
+ "timeoutPolicy" : "ALERT_ONLY",
+ "timeoutSeconds" : 0
+}
\ No newline at end of file
diff --git a/test-harness/src/test/resources/rate_limited_task_workflow_integration_test.json b/test-harness/src/test/resources/rate_limited_system_task_workflow_integration_test.json
similarity index 92%
rename from test-harness/src/test/resources/rate_limited_task_workflow_integration_test.json
rename to test-harness/src/test/resources/rate_limited_system_task_workflow_integration_test.json
index 08ce757898..465d880339 100644
--- a/test-harness/src/test/resources/rate_limited_task_workflow_integration_test.json
+++ b/test-harness/src/test/resources/rate_limited_system_task_workflow_integration_test.json
@@ -1,5 +1,5 @@
{
- "name" : "test_rate_limit_task_workflow",
+ "name" : "test_rate_limit_system_task_workflow",
"version" : 1,
"tasks" : [ {
"name" : "test_task_with_rateLimits",