Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Chnaged rate limiting implementation to rely on the current persisted…
Browse files Browse the repository at this point in the history
… task definition rather than the definition on the task
  • Loading branch information
pctreddy committed Dec 2, 2019
1 parent 4b4523a commit 73b158b
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 25 deletions.
Expand Up @@ -1180,7 +1180,7 @@ public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, int
LOGGER.warn("Concurrent Execution limited for {}:{}", taskId, task.getTaskDefName());
return;
}
if (task.getRateLimitPerFrequency() > 0 && executionDAOFacade.exceedsRateLimitPerFrequency(task)) {
if (task.getRateLimitPerFrequency() > 0 && executionDAOFacade.exceedsRateLimitPerFrequency(task, metadataDAO.getTaskDef(task.getTaskDefName()))) {
LOGGER.warn("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(), task.getRateLimitPerFrequency());
return;
}
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.Workflow;
Expand Down Expand Up @@ -398,8 +399,8 @@ public boolean exceedsInProgressLimit(Task task) {
return executionDAO.exceedsInProgressLimit(task);
}

public boolean exceedsRateLimitPerFrequency(Task task) {
return rateLimitingDao.exceedsRateLimitPerFrequency(task);
public boolean exceedsRateLimitPerFrequency(Task task, TaskDef taskDef) {
return rateLimitingDao.exceedsRateLimitPerFrequency(task, taskDef);
}

public void addTaskExecLog(List<TaskExecLog> logs) {
Expand Down
@@ -1,6 +1,7 @@
package com.netflix.conductor.dao;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;

/**
* An abstraction to enable different Rate Limiting implementations
Expand All @@ -13,5 +14,5 @@ public interface RateLimitingDao {
* @return true: If the {@link Task} is rateLimited
* false: If the {@link Task} is not rateLimited
*/
boolean exceedsRateLimitPerFrequency(Task task);
boolean exceedsRateLimitPerFrequency(Task task, TaskDef taskDef);
}
Expand Up @@ -146,7 +146,7 @@ public void updateTask(Task task) {
* @return
*/
@Override
public boolean exceedsRateLimitPerFrequency(Task task) {
public boolean exceedsRateLimitPerFrequency(Task task, TaskDef taskDef) {
return false;
}

Expand Down
Expand Up @@ -146,7 +146,7 @@ public void updateTask(Task task) {
* @return
*/
@Override
public boolean exceedsRateLimitPerFrequency(Task task) {
public boolean exceedsRateLimitPerFrequency(Task task, TaskDef taskDef) {
return false;
}

Expand Down
Expand Up @@ -3,15 +3,18 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.dao.RateLimitingDao;
import com.netflix.conductor.dyno.DynoProxy;
import com.netflix.conductor.metrics.Monitors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Optional;

@Singleton
@Trace
Expand All @@ -27,7 +30,8 @@ protected RedisRateLimitingDao(DynoProxy dynoClient, ObjectMapper objectMapper,
}

/**
* This method evaluates if the {@link Task} is rate limited or not based on {@link Task#getRateLimitPerFrequency()}
* This method evaluates if the {@link TaskDef} is rate limited or not based on {@link Task#getRateLimitPerFrequency()}
* and {@link Task#getRateLimitFrequencyInSeconds()} if not checks the {@link Task} is rate limited or not based on {@link Task#getRateLimitPerFrequency()}
* and {@link Task#getRateLimitFrequencyInSeconds()}
*
* The rate limiting is implemented using the Redis constructs of sorted set and TTL of each element in the rate limited bucket.
Expand All @@ -44,16 +48,21 @@ protected RedisRateLimitingDao(DynoProxy dynoClient, ObjectMapper objectMapper,
* false: If the {@link Task} is not rateLimited
*/
@Override
public boolean exceedsRateLimitPerFrequency(Task task) {
int rateLimitPerFrequency = task.getRateLimitPerFrequency();
int rateLimitFrequencyInSeconds = task.getRateLimitFrequencyInSeconds();
public boolean exceedsRateLimitPerFrequency(Task task, TaskDef taskDef) {
//Check if the TaskDefinition is not null then pick the definition values or else pick from the Task
ImmutablePair<Integer, Integer> rateLimitPair = Optional.ofNullable(taskDef)
.map(definition -> new ImmutablePair<>(definition.getRateLimitPerFrequency(), definition.getRateLimitFrequencyInSeconds()))
.orElse(new ImmutablePair<>(task.getRateLimitPerFrequency(), task.getRateLimitFrequencyInSeconds()));

int rateLimitPerFrequency = rateLimitPair.getLeft();
int rateLimitFrequencyInSeconds = rateLimitPair.getRight();
if (rateLimitPerFrequency <= 0 || rateLimitFrequencyInSeconds <=0) {
logger.debug("Rate limit not applied to the Task: {} either rateLimitPerFrequency: {} or rateLimitFrequencyInSeconds: {} is 0 or less",
task, rateLimitPerFrequency, rateLimitFrequencyInSeconds);
return false;
} else {
logger.debug("Evaluating rate limiting for Task: {} with rateLimitPerFrequency: {} and rateLimitFrequencyInSeconds: {}",
task, rateLimitPerFrequency, rateLimitFrequencyInSeconds);
logger.debug("Evaluating rate limiting for TaskId: {} with TaskDefinition of: {} with rateLimitPerFrequency: {} and rateLimitFrequencyInSeconds: {}",
task.getTaskId(), task.getTaskDefName(),rateLimitPerFrequency, rateLimitFrequencyInSeconds);
long currentTimeEpochMillis = System.currentTimeMillis();
long currentTimeEpochMinusRateLimitBucket = currentTimeEpochMillis - (rateLimitFrequencyInSeconds * 1000);
String key = nsKey(TASK_RATE_LIMIT_BUCKET, task.getTaskDefName());
Expand All @@ -65,13 +74,13 @@ public boolean exceedsRateLimitPerFrequency(Task task) {
if (currentBucketCount < rateLimitPerFrequency) {
dynoClient.zadd(key, currentTimeEpochMillis, String.valueOf(currentTimeEpochMillis));
dynoClient.expire(key, rateLimitFrequencyInSeconds);
logger.info("Task: {} with rateLimitPerFrequency: {} and rateLimitFrequencyInSeconds: {} within the rate limit with current count {}",
task, rateLimitPerFrequency, rateLimitFrequencyInSeconds, ++currentBucketCount);
logger.info("TaskId: {} with TaskDefinition of: {} has rateLimitPerFrequency: {} and rateLimitFrequencyInSeconds: {} within the rate limit with current count {}",
task.getTaskId(), task.getTaskDefName(), rateLimitPerFrequency, rateLimitFrequencyInSeconds, ++currentBucketCount);
Monitors.recordTaskRateLimited(task.getTaskDefName(), rateLimitPerFrequency);
return false;
} else {
logger.info("Task: {} with rateLimitPerFrequency: {} and rateLimitFrequencyInSeconds: {} is out of bounds of rate limit with current count {}",
task, rateLimitPerFrequency, rateLimitFrequencyInSeconds, currentBucketCount);
logger.info("TaskId: {} with TaskDefinition of: {} has rateLimitPerFrequency: {} and rateLimitFrequencyInSeconds: {} is out of bounds of rate limit with current count {}",
task.getTaskId(), task.getTaskDefName(), rateLimitPerFrequency, rateLimitFrequencyInSeconds, currentBucketCount);
return true;
}
}
Expand Down
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.utils.JsonMapperProvider;
import com.netflix.conductor.config.TestConfiguration;
import com.netflix.conductor.core.config.Configuration;
Expand All @@ -13,6 +14,8 @@
import org.mockito.runners.MockitoJUnitRunner;
import redis.clients.jedis.commands.JedisCommands;

import java.util.UUID;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

Expand All @@ -34,22 +37,31 @@ public void init() {

@Test
public void testExceedsRateLimitWhenNoRateLimitSet() {
TaskDef taskDef = new TaskDef("TestTaskDefinition");
Task task =new Task();
assertFalse(rateLimitingDao.exceedsRateLimitPerFrequency(task));
task.setTaskId(UUID.randomUUID().toString());
task.setTaskDefName(taskDef.getName());
assertFalse(rateLimitingDao.exceedsRateLimitPerFrequency(task, taskDef));
}
@Test
public void testExceedsRateLimitWithinLimit() {
TaskDef taskDef = new TaskDef("TestTaskDefinition");
taskDef.setRateLimitFrequencyInSeconds(60);
taskDef.setRateLimitPerFrequency(20);
Task task =new Task();
task.setRateLimitFrequencyInSeconds(60);
task.setRateLimitPerFrequency(20);
assertFalse(rateLimitingDao.exceedsRateLimitPerFrequency(task));
task.setTaskId(UUID.randomUUID().toString());
task.setTaskDefName(taskDef.getName());
assertFalse(rateLimitingDao.exceedsRateLimitPerFrequency(task, taskDef));
}
@Test
public void testExceedsRateLimitOutOfLimit() {
TaskDef taskDef = new TaskDef("TestTaskDefinition");
taskDef.setRateLimitFrequencyInSeconds(60);
taskDef.setRateLimitPerFrequency(1);
Task task =new Task();
task.setRateLimitFrequencyInSeconds(60);
task.setRateLimitPerFrequency(1);
assertFalse(rateLimitingDao.exceedsRateLimitPerFrequency(task));
assertTrue(rateLimitingDao.exceedsRateLimitPerFrequency(task));
task.setTaskId(UUID.randomUUID().toString());
task.setTaskDefName(taskDef.getName());
assertFalse(rateLimitingDao.exceedsRateLimitPerFrequency(task, taskDef));
assertTrue(rateLimitingDao.exceedsRateLimitPerFrequency(task, taskDef));
}
}
}

0 comments on commit 73b158b

Please sign in to comment.