diff --git a/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java b/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java index 5122840233..7e9f5f1659 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java +++ b/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java @@ -23,15 +23,9 @@ import com.google.inject.multibindings.ProvidesIntoMap; import com.google.inject.multibindings.StringMapKey; import com.google.inject.name.Named; -import com.netflix.conductor.core.events.ActionProcessor; -import com.netflix.conductor.core.events.EventProcessor; -import com.netflix.conductor.core.events.EventQueueProvider; -import com.netflix.conductor.core.events.SimpleActionProcessor; -import com.netflix.conductor.core.events.SimpleEventProcessor; -import com.netflix.conductor.core.events.queue.EventPollSchedulerProvider; -import com.netflix.conductor.core.events.queue.dyno.DynoEventQueueProvider; import com.netflix.conductor.core.execution.ParametersUtils; import com.netflix.conductor.core.execution.mapper.DecisionTaskMapper; +import com.netflix.conductor.core.execution.mapper.DoWhileTaskMapper; import com.netflix.conductor.core.execution.mapper.DynamicTaskMapper; import com.netflix.conductor.core.execution.mapper.EventTaskMapper; import com.netflix.conductor.core.execution.mapper.ExclusiveJoinTaskMapper; @@ -39,17 +33,16 @@ import com.netflix.conductor.core.execution.mapper.ForkJoinTaskMapper; import com.netflix.conductor.core.execution.mapper.HTTPTaskMapper; import com.netflix.conductor.core.execution.mapper.JoinTaskMapper; +import com.netflix.conductor.core.execution.mapper.JsonJQTransformTaskMapper; import com.netflix.conductor.core.execution.mapper.KafkaPublishTaskMapper; import com.netflix.conductor.core.execution.mapper.LambdaTaskMapper; +import com.netflix.conductor.core.execution.mapper.SetVariableTaskMapper; import com.netflix.conductor.core.execution.mapper.SimpleTaskMapper; import com.netflix.conductor.core.execution.mapper.SubWorkflowTaskMapper; import com.netflix.conductor.core.execution.mapper.TaskMapper; import com.netflix.conductor.core.execution.mapper.TerminateTaskMapper; import com.netflix.conductor.core.execution.mapper.UserDefinedTaskMapper; import com.netflix.conductor.core.execution.mapper.WaitTaskMapper; -import com.netflix.conductor.core.execution.mapper.DoWhileTaskMapper; -import com.netflix.conductor.core.execution.mapper.JsonJQTransformTaskMapper; -import com.netflix.conductor.core.execution.mapper.SetVariableTaskMapper; import com.netflix.conductor.core.execution.tasks.Event; import com.netflix.conductor.core.execution.tasks.IsolatedTaskQueueProducer; import com.netflix.conductor.core.execution.tasks.Lambda; @@ -60,10 +53,9 @@ import com.netflix.conductor.core.execution.tasks.Wait; import com.netflix.conductor.core.utils.JsonUtils; import com.netflix.conductor.dao.MetadataDAO; -import com.netflix.conductor.dao.QueueDAO; -import rx.Scheduler; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_DECISION; +import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_DO_WHILE; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_DYNAMIC; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_EVENT; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_EXCLUSIVE_JOIN; @@ -71,23 +63,20 @@ import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_FORK_JOIN_DYNAMIC; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_HTTP; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_JOIN; +import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_JSON_JQ_TRANSFORM; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_KAFKA_PUBLISH; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_LAMBDA; +import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_SET_VARIABLE; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_SIMPLE; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_SUB_WORKFLOW; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_TERMINATE; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_USER_DEFINED; import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_WAIT; -import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_DO_WHILE; -import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_JSON_JQ_TRANSFORM; -import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_SET_VARIABLE; -import static com.netflix.conductor.core.events.EventQueues.EVENT_QUEUE_PROVIDERS_QUALIFIER; /** * @author Viren */ public class CoreModule extends AbstractModule { - private static final String CONDUCTOR_QUALIFIER = "conductor"; private static final String TASK_MAPPERS_QUALIFIER = "TaskMappers"; @Override @@ -101,10 +90,6 @@ protected void configure() { bind(Terminate.class).asEagerSingleton(); bind(IsolatedTaskQueueProducer.class).asEagerSingleton(); bind(SetVariable.class).asEagerSingleton(); - // start processing events when instance starts - bind(ActionProcessor.class).to(SimpleActionProcessor.class); - bind(EventProcessor.class).to(SimpleEventProcessor.class).asEagerSingleton(); - bind(Scheduler.class).toProvider(EventPollSchedulerProvider.class).asEagerSingleton(); } @Provides @@ -119,14 +104,6 @@ public JsonUtils getJsonUtils() { return new JsonUtils(); } - @ProvidesIntoMap - @StringMapKey(CONDUCTOR_QUALIFIER) - @Singleton - @Named(EVENT_QUEUE_PROVIDERS_QUALIFIER) - public EventQueueProvider getDynoEventQueueProvider(QueueDAO queueDAO, Configuration configuration, Scheduler eventScheduler) { - return new DynoEventQueueProvider(queueDAO, configuration, eventScheduler); - } - @ProvidesIntoMap @StringMapKey(TASK_TYPE_DECISION) @Singleton diff --git a/core/src/main/java/com/netflix/conductor/core/config/EventModule.java b/core/src/main/java/com/netflix/conductor/core/config/EventModule.java new file mode 100644 index 0000000000..a4b2128c53 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/config/EventModule.java @@ -0,0 +1,39 @@ +package com.netflix.conductor.core.config; + +import com.google.inject.AbstractModule; +import com.google.inject.Singleton; +import com.google.inject.multibindings.ProvidesIntoMap; +import com.google.inject.multibindings.StringMapKey; +import com.google.inject.name.Named; +import com.netflix.conductor.core.events.ActionProcessor; +import com.netflix.conductor.core.events.EventProcessor; +import com.netflix.conductor.core.events.EventQueueProvider; +import com.netflix.conductor.core.events.SimpleActionProcessor; +import com.netflix.conductor.core.events.SimpleEventProcessor; +import com.netflix.conductor.core.events.queue.EventPollSchedulerProvider; +import com.netflix.conductor.core.events.queue.dyno.DynoEventQueueProvider; +import com.netflix.conductor.dao.QueueDAO; +import rx.Scheduler; + +import static com.netflix.conductor.core.events.EventQueues.EVENT_QUEUE_PROVIDERS_QUALIFIER; + +public class EventModule extends AbstractModule { + + public static final String CONDUCTOR_QUALIFIER = "conductor"; + + @Override + protected void configure() { + // start processing events when instance starts + bind(ActionProcessor.class).to(SimpleActionProcessor.class); + bind(EventProcessor.class).to(SimpleEventProcessor.class).asEagerSingleton(); + bind(Scheduler.class).toProvider(EventPollSchedulerProvider.class).asEagerSingleton(); + } + + @ProvidesIntoMap + @StringMapKey(CONDUCTOR_QUALIFIER) + @Singleton + @Named(EVENT_QUEUE_PROVIDERS_QUALIFIER) + public EventQueueProvider getDynoEventQueueProvider(QueueDAO queueDAO, Configuration configuration, Scheduler eventScheduler) { + return new DynoEventQueueProvider(queueDAO, configuration, eventScheduler); + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java index 46916fa2d0..6ec9584213 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java @@ -31,6 +31,11 @@ import com.netflix.conductor.service.ExecutionService; import com.netflix.conductor.service.MetadataService; import com.spotify.futures.CompletableFutures; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -45,10 +50,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import javax.inject.Inject; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author Viren @@ -65,7 +66,7 @@ public class SimpleEventProcessor implements EventProcessor { private final ActionProcessor actionProcessor; private final EventQueues eventQueues; - private ExecutorService executorService; + private final ExecutorService executorService; private final Map eventToQueueMap = new ConcurrentHashMap<>(); private final ObjectMapper objectMapper; private final JsonUtils jsonUtils; @@ -95,6 +96,7 @@ public SimpleEventProcessor(ExecutionService executionService, logger.info("Event Processing is ENABLED. executorThreadCount set to {}", executorThreadCount); } else { logger.warn("Event processing is DISABLED. executorThreadCount set to {}", executorThreadCount); + executorService = null; } } @@ -146,7 +148,7 @@ private void listen(ObservableQueue queue) { queue.observe().subscribe((Message msg) -> handle(queue, msg)); } - private void handle(ObservableQueue queue, Message msg) { + protected void handle(ObservableQueue queue, Message msg) { try { if (isEventMessageIndexingEnabled) { executionService.addMessage(queue.getName(), msg); @@ -178,7 +180,7 @@ private void handle(ObservableQueue queue, Message msg) { * * @return a list of {@link EventExecution} that failed due to transient failures. */ - private List executeEvent(String event, Message msg) throws Exception { + protected List executeEvent(String event, Message msg) throws Exception { List eventHandlerList = metadataService.getEventHandlersForEvent(event, true); Object payloadObject = getPayloadObject(msg.getPayload()); @@ -221,7 +223,7 @@ private List executeEvent(String event, Message msg) throws Exce * @param msg the {@link Message} that triggered the event * @return a {@link CompletableFuture} holding a list of {@link EventExecution}s for the {@link Action}s executed in the event handler */ - private CompletableFuture> executeActionsForEventHandler(EventHandler eventHandler, Message msg) { + protected CompletableFuture> executeActionsForEventHandler(EventHandler eventHandler, Message msg) { List> futuresList = new ArrayList<>(); int i = 0; for (Action action : eventHandler.getActions()) { @@ -249,7 +251,7 @@ private CompletableFuture> executeActionsForEventHandler(Ev * the input event execution, if the execution failed due to transient error */ @VisibleForTesting - EventExecution execute(EventExecution eventExecution, Action action, Object payload) { + protected EventExecution execute(EventExecution eventExecution, Action action, Object payload) { try { String methodName = "executeEventAction"; String description = String.format("Executing action: %s for event: %s with messageId: %s with payload: %s", action.getAction(), eventExecution.getId(), eventExecution.getMessageId(), payload); @@ -282,7 +284,7 @@ EventExecution execute(EventExecution eventExecution, Action action, Object payl * @return true - if the exception is a transient failure * false - if the exception is non-transient */ - private boolean isTransientException(Throwable throwableException) { + protected boolean isTransientException(Throwable throwableException) { if (throwableException != null) { return !((throwableException instanceof UnsupportedOperationException) || (throwableException instanceof ApplicationException && ((ApplicationException) throwableException).getCode() != ApplicationException.Code.BACKEND_ERROR)); diff --git a/server/src/main/java/com/netflix/conductor/server/ServerModule.java b/server/src/main/java/com/netflix/conductor/server/ServerModule.java index 747b57becb..ba78ba5605 100644 --- a/server/src/main/java/com/netflix/conductor/server/ServerModule.java +++ b/server/src/main/java/com/netflix/conductor/server/ServerModule.java @@ -19,6 +19,7 @@ import com.netflix.conductor.annotations.Service; import com.netflix.conductor.core.config.Configuration; import com.netflix.conductor.core.config.CoreModule; +import com.netflix.conductor.core.config.EventModule; import com.netflix.conductor.core.config.ValidationModule; import com.netflix.conductor.core.execution.WorkflowSweeper; import com.netflix.conductor.dyno.SystemPropertiesDynomiteConfiguration; @@ -44,6 +45,7 @@ protected void configure() { install(new HealthModule()); install(new JettyModule()); install(new GRPCModule()); + install(new EventModule()); bindInterceptor(Matchers.any(), Matchers.annotatedWith(Service.class), new ServiceInterceptor(getProvider(Validator.class))); bind(Configuration.class).to(SystemPropertiesDynomiteConfiguration.class); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java index 8a9560350d..f451e5b9e1 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java @@ -21,6 +21,7 @@ import com.netflix.conductor.common.utils.JsonMapperProvider; import com.netflix.conductor.core.config.Configuration; import com.netflix.conductor.core.config.CoreModule; +import com.netflix.conductor.core.config.EventModule; import com.netflix.conductor.core.execution.WorkflowStatusListener; import com.netflix.conductor.core.execution.WorkflowStatusListenerStub; import com.netflix.conductor.core.utils.NoopLockModule; @@ -39,11 +40,10 @@ import com.netflix.conductor.mysql.SystemPropertiesMySQLConfiguration; import com.netflix.conductor.service.MetadataService; import com.netflix.conductor.service.MetadataServiceImpl; + +import javax.sql.DataSource; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; -import javax.sql.DataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author jvemugunta @@ -72,6 +72,7 @@ protected void configure() { bind(WorkflowStatusListener.class).to(WorkflowStatusListenerStub.class); install(new CoreModule()); + install(new EventModule()); bind(UserTask.class).asEagerSingleton(); bind(ObjectMapper.class).toProvider(JsonMapperProvider.class); bind(ExternalPayloadStorage.class).to(MockExternalPayloadStorage.class); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java b/test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java index 69f1f2caa4..c4d633528f 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java @@ -19,6 +19,7 @@ import com.netflix.conductor.common.utils.JsonMapperProvider; import com.netflix.conductor.core.config.Configuration; import com.netflix.conductor.core.config.CoreModule; +import com.netflix.conductor.core.config.EventModule; import com.netflix.conductor.core.execution.WorkflowStatusListener; import com.netflix.conductor.core.execution.WorkflowStatusListenerStub; import com.netflix.conductor.core.utils.LocalOnlyLockModule; @@ -42,6 +43,7 @@ import com.netflix.conductor.service.MetadataServiceImpl; import com.netflix.dyno.queues.redis.RedisQueues; import com.netflix.dyno.queues.redis.sharding.ShardingStrategy; + import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -81,6 +83,7 @@ protected void configure() { bind(MetadataService.class).to(MetadataServiceImpl.class); install(new CoreModule()); + install(new EventModule()); bind(UserTask.class).asEagerSingleton(); bind(ObjectMapper.class).toProvider(JsonMapperProvider.class); bind(ExternalPayloadStorage.class).to(MockExternalPayloadStorage.class);