Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1819 from Netflix/minor_enhancements
Browse files Browse the repository at this point in the history
extend rate limit feature to simple tasks; fix start time for lambda task
  • Loading branch information
apanicker-nflx committed Aug 4, 2020
2 parents 1a7a206 + e39c3d2 commit 7d1caa2
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 95 deletions.
@@ -1,71 +1,67 @@
/**
* Copyright 2018 Netflix, Inc.
/*
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package com.netflix.conductor.core.execution.mapper;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.execution.tasks.Lambda;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author x-ultra
*/
public class LambdaTaskMapper implements TaskMapper {

public static final Logger logger = LoggerFactory.getLogger(LambdaTaskMapper.class);
private ParametersUtils parametersUtils;
public static final Logger LOGGER = LoggerFactory.getLogger(LambdaTaskMapper.class);
private final ParametersUtils parametersUtils;

public LambdaTaskMapper(ParametersUtils parametersUtils) {
this.parametersUtils = parametersUtils;
}


@Override
public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) {

logger.debug("TaskMapperContext {} in LambdaTaskMapper", taskMapperContext);
LOGGER.debug("TaskMapperContext {} in LambdaTaskMapper", taskMapperContext);

WorkflowTask taskToSchedule = taskMapperContext.getTaskToSchedule();
Workflow workflowInstance = taskMapperContext.getWorkflowInstance();
String taskId = taskMapperContext.getTaskId();

Map<String, Object> taskInput = parametersUtils.getTaskInputV2(taskMapperContext.getTaskToSchedule().getInputParameters(), workflowInstance, taskId, null);
Map<String, Object> taskInput = parametersUtils
.getTaskInputV2(taskMapperContext.getTaskToSchedule().getInputParameters(), workflowInstance, taskId, null);

Task task = new Task();
task.setTaskType(Lambda.TASK_NAME);
task.setTaskDefName(taskMapperContext.getTaskToSchedule().getName());
task.setReferenceTaskName(taskMapperContext.getTaskToSchedule().getTaskReferenceName());
task.setWorkflowInstanceId(workflowInstance.getWorkflowId());
task.setWorkflowType(workflowInstance.getWorkflowName());
task.setCorrelationId(workflowInstance.getCorrelationId());
task.setScheduledTime(System.currentTimeMillis());
task.setInputData(taskInput);
task.setTaskId(taskId);
task.setStatus(Task.Status.IN_PROGRESS);
task.setWorkflowTask(taskToSchedule);
task.setWorkflowPriority(workflowInstance.getPriority());
Task lambdaTask = new Task();
lambdaTask.setTaskType(Lambda.TASK_NAME);
lambdaTask.setTaskDefName(taskMapperContext.getTaskToSchedule().getName());
lambdaTask.setReferenceTaskName(taskMapperContext.getTaskToSchedule().getTaskReferenceName());
lambdaTask.setWorkflowInstanceId(workflowInstance.getWorkflowId());
lambdaTask.setWorkflowType(workflowInstance.getWorkflowName());
lambdaTask.setCorrelationId(workflowInstance.getCorrelationId());
lambdaTask.setStartTime(System.currentTimeMillis());
lambdaTask.setScheduledTime(System.currentTimeMillis());
lambdaTask.setInputData(taskInput);
lambdaTask.setTaskId(taskId);
lambdaTask.setStatus(Task.Status.IN_PROGRESS);
lambdaTask.setWorkflowTask(taskToSchedule);
lambdaTask.setWorkflowPriority(workflowInstance.getPriority());

return Arrays.asList(task);
return Collections.singletonList(lambdaTask);
}
}
@@ -1,20 +1,15 @@
/**
* Copyright 2018 Netflix, Inc.
/*
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/


package com.netflix.conductor.core.execution.mapper;

import com.netflix.conductor.common.metadata.tasks.Task;
Expand All @@ -25,14 +20,12 @@
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.execution.TerminateWorkflowException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link TaskType#SIMPLE}
Expand All @@ -42,7 +35,7 @@ public class SimpleTaskMapper implements TaskMapper {

public static final Logger logger = LoggerFactory.getLogger(SimpleTaskMapper.class);

private ParametersUtils parametersUtils;
private final ParametersUtils parametersUtils;

public SimpleTaskMapper(ParametersUtils parametersUtils) {
this.parametersUtils = parametersUtils;
Expand Down Expand Up @@ -91,6 +84,8 @@ public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) throws Ter
simpleTask.setWorkflowTask(taskToSchedule);
simpleTask.setRetriedTaskId(retriedTaskId);
simpleTask.setWorkflowPriority(workflowInstance.getPriority());
simpleTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
simpleTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds());
return Collections.singletonList(simpleTask);
}
}
46 changes: 21 additions & 25 deletions core/src/main/java/com/netflix/conductor/metrics/Monitors.java
@@ -1,17 +1,14 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.metrics;

Expand All @@ -21,6 +18,7 @@
import com.netflix.servo.monitor.BasicStopwatch;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Gauge;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
Expand All @@ -30,26 +28,24 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* @author Viren
*
*/
public class Monitors {

private static Registry registry = Spectator.globalRegistry();
private static final Registry registry = Spectator.globalRegistry();

private static Map<String, Map<Map<String, String>, Counter>> counters = new ConcurrentHashMap<>();
private static final Map<String, Map<Map<String, String>, Counter>> counters = new ConcurrentHashMap<>();

private static Map<String, Map<Map<String, String>, PercentileTimer>> timers = new ConcurrentHashMap<>();
private static final Map<String, Map<Map<String, String>, PercentileTimer>> timers = new ConcurrentHashMap<>();

private static Map<String, Map<Map<String, String>, AtomicLong>> gauges = new ConcurrentHashMap<>();
private static final Map<String, Map<Map<String, String>, Gauge>> gauges = new ConcurrentHashMap<>();

public static final String classQualifier = "WorkflowMonitor";

private Monitors() {

}

/**
Expand Down Expand Up @@ -91,10 +87,10 @@ private static void counter(String className, String name, String... additionalT
* @param additionalTags
*/
private static void gauge(String className, String name, long measurement, String... additionalTags) {
getGauge(className, name, additionalTags).getAndSet(measurement);
getGauge(className, name, additionalTags).set(measurement);
}

public static Timer getTimer(String className, String name, String... additionalTags) {
private static Timer getTimer(String className, String name, String... additionalTags) {
Map<String, String> tags = toMap(className, additionalTags);
return timers.computeIfAbsent(name, s -> new ConcurrentHashMap<>()).computeIfAbsent(tags, t -> {
Id id = registry.createId(name, tags);
Expand All @@ -111,12 +107,12 @@ private static Counter getCounter(String className, String name, String... addit
});
}

private static AtomicLong getGauge(String className, String name, String... additionalTags) {
private static Gauge getGauge(String className, String name, String... additionalTags) {
Map<String, String> tags = toMap(className, additionalTags);

return gauges.computeIfAbsent(name, s -> new ConcurrentHashMap<>()).computeIfAbsent(tags, t -> {
Id id = registry.createId(name, tags);
return registry.gauge(id, new AtomicLong(0));
return registry.gauge(id);
});
}

Expand Down Expand Up @@ -291,7 +287,7 @@ public static void recordESIndexTime(String action, String docType, long val) {
}

public static void recordWorkerQueueSize(String queueType, int val) {
getGauge(Monitors.classQualifier, "indexing_worker_queue", "queueType", queueType).set(val);
gauge(Monitors.classQualifier, "indexing_worker_queue", val, "queueType", queueType);
}

public static void recordDiscardedIndexingCount(String queueType) {
Expand All @@ -311,7 +307,7 @@ public static void recordWorkflowArchived(String workflowType, WorkflowStatus st
}

public static void recordArchivalDelayQueueSize(int val) {
getGauge(classQualifier, "workflow_archival_delay_queue_size").set(val);
gauge(classQualifier, "workflow_archival_delay_queue_size", val);
}
public static void recordDiscardedArchivalCount() {
counter(classQualifier, "discarded_archival_count");
Expand All @@ -322,6 +318,6 @@ public static void recordSystemTaskWorkerPollingLimited(String queueName) {
}

public static void recordEventQueuePollSize(String queueType, int val) {
getGauge(Monitors.classQualifier, "event_queue_poll", "queueType", queueType).set(val);
gauge(Monitors.classQualifier, "event_queue_poll", val, "queueType", queueType);
}
}
Expand Up @@ -17,6 +17,7 @@
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
Expand Down Expand Up @@ -143,11 +144,18 @@ public List<Task> poll(String taskType, String workerId, String domain, int coun
}

if (executionDAOFacade.exceedsInProgressLimit(task)) {
// Postpone a message, so that it would be available for poll again.
// Postpone this message, so that it would be available for poll again.
queueDAO.postpone(queueName, taskId, task.getWorkflowPriority(), queueTaskMessagePostponeSeconds);
logger.debug("Postponed task: {} in queue: {} by {} seconds", taskId, queueName, queueTaskMessagePostponeSeconds);
continue;
}
TaskDef taskDef = task.getTaskDefinition().isPresent() ? task.getTaskDefinition().get() : null;
if (task.getRateLimitPerFrequency() > 0 && executionDAOFacade.exceedsRateLimitPerFrequency(task, taskDef)) {
// Postpone this message, so that it would be available for poll again.
queueDAO.postpone(queueName, taskId, task.getWorkflowPriority(), queueTaskMessagePostponeSeconds);
logger.debug("RateLimit Execution limited for {}:{}, limit:{}", taskId, task.getTaskDefName(), task.getRateLimitPerFrequency());
continue;
}

task.setStatus(Status.IN_PROGRESS);
if (task.getStartTime() == 0) {
Expand Down
Expand Up @@ -136,17 +136,22 @@ class WorkflowTestUtil {

TaskDef rateLimitedTask = new TaskDef()
rateLimitedTask.name = 'test_task_with_rateLimits'
rateLimitedTask.rateLimitFrequencyInSeconds = 600
rateLimitedTask.rateLimitFrequencyInSeconds = 10
rateLimitedTask.rateLimitPerFrequency = 1

TaskDef rateLimitedSimpleTask = new TaskDef()
rateLimitedSimpleTask.name = 'test_simple_task_with_rateLimits'
rateLimitedSimpleTask.rateLimitFrequencyInSeconds = 10
rateLimitedSimpleTask.rateLimitPerFrequency = 1

TaskDef eventTaskX = new TaskDef()
eventTaskX.name = 'eventX'
eventTaskX.timeoutSeconds = 1

metadataService.registerTaskDef(
[taskWithResponseTimeOut, optionalTask, simpleSubWorkflowTask,
subWorkflowTask, waitTimeOutTask, userTask, eventTaskX,
rateLimitedTask, concurrentExecutionLimitedTask]
rateLimitedTask, rateLimitedSimpleTask, concurrentExecutionLimitedTask]
)
}

Expand Down

0 comments on commit 7d1caa2

Please sign in to comment.