Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' of github.com:Netflix/conductor into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
aravindanr committed Sep 30, 2020
2 parents 5280f43 + cdc7db8 commit d6b5ca9
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 42 deletions.
Expand Up @@ -23,33 +23,26 @@
import com.google.inject.multibindings.ProvidesIntoMap;
import com.google.inject.multibindings.StringMapKey;
import com.google.inject.name.Named;
import com.netflix.conductor.core.events.ActionProcessor;
import com.netflix.conductor.core.events.EventProcessor;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.SimpleActionProcessor;
import com.netflix.conductor.core.events.SimpleEventProcessor;
import com.netflix.conductor.core.events.queue.EventPollSchedulerProvider;
import com.netflix.conductor.core.events.queue.dyno.DynoEventQueueProvider;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.execution.mapper.DecisionTaskMapper;
import com.netflix.conductor.core.execution.mapper.DoWhileTaskMapper;
import com.netflix.conductor.core.execution.mapper.DynamicTaskMapper;
import com.netflix.conductor.core.execution.mapper.EventTaskMapper;
import com.netflix.conductor.core.execution.mapper.ExclusiveJoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinDynamicTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.HTTPTaskMapper;
import com.netflix.conductor.core.execution.mapper.JoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.JsonJQTransformTaskMapper;
import com.netflix.conductor.core.execution.mapper.KafkaPublishTaskMapper;
import com.netflix.conductor.core.execution.mapper.LambdaTaskMapper;
import com.netflix.conductor.core.execution.mapper.SetVariableTaskMapper;
import com.netflix.conductor.core.execution.mapper.SimpleTaskMapper;
import com.netflix.conductor.core.execution.mapper.SubWorkflowTaskMapper;
import com.netflix.conductor.core.execution.mapper.TaskMapper;
import com.netflix.conductor.core.execution.mapper.TerminateTaskMapper;
import com.netflix.conductor.core.execution.mapper.UserDefinedTaskMapper;
import com.netflix.conductor.core.execution.mapper.WaitTaskMapper;
import com.netflix.conductor.core.execution.mapper.DoWhileTaskMapper;
import com.netflix.conductor.core.execution.mapper.JsonJQTransformTaskMapper;
import com.netflix.conductor.core.execution.mapper.SetVariableTaskMapper;
import com.netflix.conductor.core.execution.tasks.Event;
import com.netflix.conductor.core.execution.tasks.IsolatedTaskQueueProducer;
import com.netflix.conductor.core.execution.tasks.Lambda;
Expand All @@ -60,34 +53,30 @@
import com.netflix.conductor.core.execution.tasks.Wait;
import com.netflix.conductor.core.utils.JsonUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import rx.Scheduler;

import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_DECISION;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_DO_WHILE;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_DYNAMIC;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_EVENT;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_EXCLUSIVE_JOIN;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_FORK_JOIN;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_FORK_JOIN_DYNAMIC;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_HTTP;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_JOIN;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_JSON_JQ_TRANSFORM;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_KAFKA_PUBLISH;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_LAMBDA;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_SET_VARIABLE;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_SIMPLE;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_SUB_WORKFLOW;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_TERMINATE;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_USER_DEFINED;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_WAIT;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_DO_WHILE;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_JSON_JQ_TRANSFORM;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_SET_VARIABLE;
import static com.netflix.conductor.core.events.EventQueues.EVENT_QUEUE_PROVIDERS_QUALIFIER;
/**
* @author Viren
*/
public class CoreModule extends AbstractModule {

private static final String CONDUCTOR_QUALIFIER = "conductor";
private static final String TASK_MAPPERS_QUALIFIER = "TaskMappers";

@Override
Expand All @@ -101,10 +90,6 @@ protected void configure() {
bind(Terminate.class).asEagerSingleton();
bind(IsolatedTaskQueueProducer.class).asEagerSingleton();
bind(SetVariable.class).asEagerSingleton();
// start processing events when instance starts
bind(ActionProcessor.class).to(SimpleActionProcessor.class);
bind(EventProcessor.class).to(SimpleEventProcessor.class).asEagerSingleton();
bind(Scheduler.class).toProvider(EventPollSchedulerProvider.class).asEagerSingleton();
}

@Provides
Expand All @@ -119,14 +104,6 @@ public JsonUtils getJsonUtils() {
return new JsonUtils();
}

@ProvidesIntoMap
@StringMapKey(CONDUCTOR_QUALIFIER)
@Singleton
@Named(EVENT_QUEUE_PROVIDERS_QUALIFIER)
public EventQueueProvider getDynoEventQueueProvider(QueueDAO queueDAO, Configuration configuration, Scheduler eventScheduler) {
return new DynoEventQueueProvider(queueDAO, configuration, eventScheduler);
}

@ProvidesIntoMap
@StringMapKey(TASK_TYPE_DECISION)
@Singleton
Expand Down
@@ -0,0 +1,39 @@
package com.netflix.conductor.core.config;

import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
import com.google.inject.multibindings.ProvidesIntoMap;
import com.google.inject.multibindings.StringMapKey;
import com.google.inject.name.Named;
import com.netflix.conductor.core.events.ActionProcessor;
import com.netflix.conductor.core.events.EventProcessor;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.SimpleActionProcessor;
import com.netflix.conductor.core.events.SimpleEventProcessor;
import com.netflix.conductor.core.events.queue.EventPollSchedulerProvider;
import com.netflix.conductor.core.events.queue.dyno.DynoEventQueueProvider;
import com.netflix.conductor.dao.QueueDAO;
import rx.Scheduler;

import static com.netflix.conductor.core.events.EventQueues.EVENT_QUEUE_PROVIDERS_QUALIFIER;

public class EventModule extends AbstractModule {

public static final String CONDUCTOR_QUALIFIER = "conductor";

@Override
protected void configure() {
// start processing events when instance starts
bind(ActionProcessor.class).to(SimpleActionProcessor.class);
bind(EventProcessor.class).to(SimpleEventProcessor.class).asEagerSingleton();
bind(Scheduler.class).toProvider(EventPollSchedulerProvider.class).asEagerSingleton();
}

@ProvidesIntoMap
@StringMapKey(CONDUCTOR_QUALIFIER)
@Singleton
@Named(EVENT_QUEUE_PROVIDERS_QUALIFIER)
public EventQueueProvider getDynoEventQueueProvider(QueueDAO queueDAO, Configuration configuration, Scheduler eventScheduler) {
return new DynoEventQueueProvider(queueDAO, configuration, eventScheduler);
}
}
Expand Up @@ -31,6 +31,11 @@
import com.netflix.conductor.service.ExecutionService;
import com.netflix.conductor.service.MetadataService;
import com.spotify.futures.CompletableFutures;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -45,10 +50,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Viren
Expand All @@ -65,7 +66,7 @@ public class SimpleEventProcessor implements EventProcessor {
private final ActionProcessor actionProcessor;
private final EventQueues eventQueues;

private ExecutorService executorService;
private final ExecutorService executorService;
private final Map<String, ObservableQueue> eventToQueueMap = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper;
private final JsonUtils jsonUtils;
Expand Down Expand Up @@ -95,6 +96,7 @@ public SimpleEventProcessor(ExecutionService executionService,
logger.info("Event Processing is ENABLED. executorThreadCount set to {}", executorThreadCount);
} else {
logger.warn("Event processing is DISABLED. executorThreadCount set to {}", executorThreadCount);
executorService = null;
}
}

Expand Down Expand Up @@ -146,7 +148,7 @@ private void listen(ObservableQueue queue) {
queue.observe().subscribe((Message msg) -> handle(queue, msg));
}

private void handle(ObservableQueue queue, Message msg) {
protected void handle(ObservableQueue queue, Message msg) {
try {
if (isEventMessageIndexingEnabled) {
executionService.addMessage(queue.getName(), msg);
Expand Down Expand Up @@ -178,7 +180,7 @@ private void handle(ObservableQueue queue, Message msg) {
*
* @return a list of {@link EventExecution} that failed due to transient failures.
*/
private List<EventExecution> executeEvent(String event, Message msg) throws Exception {
protected List<EventExecution> executeEvent(String event, Message msg) throws Exception {
List<EventHandler> eventHandlerList = metadataService.getEventHandlersForEvent(event, true);
Object payloadObject = getPayloadObject(msg.getPayload());

Expand Down Expand Up @@ -221,7 +223,7 @@ private List<EventExecution> executeEvent(String event, Message msg) throws Exce
* @param msg the {@link Message} that triggered the event
* @return a {@link CompletableFuture} holding a list of {@link EventExecution}s for the {@link Action}s executed in the event handler
*/
private CompletableFuture<List<EventExecution>> executeActionsForEventHandler(EventHandler eventHandler, Message msg) {
protected CompletableFuture<List<EventExecution>> executeActionsForEventHandler(EventHandler eventHandler, Message msg) {
List<CompletableFuture<EventExecution>> futuresList = new ArrayList<>();
int i = 0;
for (Action action : eventHandler.getActions()) {
Expand Down Expand Up @@ -249,7 +251,7 @@ private CompletableFuture<List<EventExecution>> executeActionsForEventHandler(Ev
* the input event execution, if the execution failed due to transient error
*/
@VisibleForTesting
EventExecution execute(EventExecution eventExecution, Action action, Object payload) {
protected EventExecution execute(EventExecution eventExecution, Action action, Object payload) {
try {
String methodName = "executeEventAction";
String description = String.format("Executing action: %s for event: %s with messageId: %s with payload: %s", action.getAction(), eventExecution.getId(), eventExecution.getMessageId(), payload);
Expand Down Expand Up @@ -282,7 +284,7 @@ EventExecution execute(EventExecution eventExecution, Action action, Object payl
* @return true - if the exception is a transient failure
* false - if the exception is non-transient
*/
private boolean isTransientException(Throwable throwableException) {
protected boolean isTransientException(Throwable throwableException) {
if (throwableException != null) {
return !((throwableException instanceof UnsupportedOperationException) ||
(throwableException instanceof ApplicationException && ((ApplicationException) throwableException).getCode() != ApplicationException.Code.BACKEND_ERROR));
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.netflix.conductor.annotations.Service;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.config.CoreModule;
import com.netflix.conductor.core.config.EventModule;
import com.netflix.conductor.core.config.ValidationModule;
import com.netflix.conductor.core.execution.WorkflowSweeper;
import com.netflix.conductor.dyno.SystemPropertiesDynomiteConfiguration;
Expand All @@ -44,6 +45,7 @@ protected void configure() {
install(new HealthModule());
install(new JettyModule());
install(new GRPCModule());
install(new EventModule());

bindInterceptor(Matchers.any(), Matchers.annotatedWith(Service.class), new ServiceInterceptor(getProvider(Validator.class)));
bind(Configuration.class).to(SystemPropertiesDynomiteConfiguration.class);
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.netflix.conductor.common.utils.JsonMapperProvider;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.config.CoreModule;
import com.netflix.conductor.core.config.EventModule;
import com.netflix.conductor.core.execution.WorkflowStatusListener;
import com.netflix.conductor.core.execution.WorkflowStatusListenerStub;
import com.netflix.conductor.core.utils.NoopLockModule;
Expand All @@ -39,11 +40,10 @@
import com.netflix.conductor.mysql.SystemPropertiesMySQLConfiguration;
import com.netflix.conductor.service.MetadataService;
import com.netflix.conductor.service.MetadataServiceImpl;

import javax.sql.DataSource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author jvemugunta
Expand Down Expand Up @@ -72,6 +72,7 @@ protected void configure() {
bind(WorkflowStatusListener.class).to(WorkflowStatusListenerStub.class);

install(new CoreModule());
install(new EventModule());
bind(UserTask.class).asEagerSingleton();
bind(ObjectMapper.class).toProvider(JsonMapperProvider.class);
bind(ExternalPayloadStorage.class).to(MockExternalPayloadStorage.class);
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.netflix.conductor.common.utils.JsonMapperProvider;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.config.CoreModule;
import com.netflix.conductor.core.config.EventModule;
import com.netflix.conductor.core.execution.WorkflowStatusListener;
import com.netflix.conductor.core.execution.WorkflowStatusListenerStub;
import com.netflix.conductor.core.utils.LocalOnlyLockModule;
Expand All @@ -42,6 +43,7 @@
import com.netflix.conductor.service.MetadataServiceImpl;
import com.netflix.dyno.queues.redis.RedisQueues;
import com.netflix.dyno.queues.redis.sharding.ShardingStrategy;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -81,6 +83,7 @@ protected void configure() {
bind(MetadataService.class).to(MetadataServiceImpl.class);

install(new CoreModule());
install(new EventModule());
bind(UserTask.class).asEagerSingleton();
bind(ObjectMapper.class).toProvider(JsonMapperProvider.class);
bind(ExternalPayloadStorage.class).to(MockExternalPayloadStorage.class);
Expand Down

0 comments on commit d6b5ca9

Please sign in to comment.