Navigation Menu

Skip to content

Commit

Permalink
Simplification of configuration of Sagas
Browse files Browse the repository at this point in the history
Extracted SagaConfiguration to an interface, with methods returning
the actual instances of elements configured, instead of components.
  • Loading branch information
abuijze committed Oct 11, 2018
1 parent 23abcf7 commit fe0c680
Show file tree
Hide file tree
Showing 11 changed files with 426 additions and 384 deletions.
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -81,7 +81,6 @@ private List<AbstractSagaManager> retrieveSagaManagers() {
.sagaConfigurations()
.stream()
.map(SagaConfiguration::manager)
.map(Component::get)
.collect(toList());
}
}
Expand Up @@ -172,6 +172,19 @@ default <T extends EventProcessor> Optional<T> eventProcessorByProcessingGroup(S
*/
List<SagaConfiguration<?>> sagaConfigurations();

/**
* Returns the {@link SagaConfiguration} for the given {@code sagaType}. If no configuration has been provided for
* a Saga of this type, {@code null} is returned.
*
* @param sagaType The type of Saga to return the configuration for.
* @param <T> The type of Saga
* @return the configuration for the Saga, or {@code null} if not found
*/
@SuppressWarnings("unchecked")
default <T> SagaConfiguration<T> sagaConfiguration(Class<T> sagaType) {
return (SagaConfiguration<T>) sagaConfigurations().stream().filter(c -> sagaType.equals(c.type())).findFirst().orElse(null);
}

/**
* Returns the {@link MessageMonitor} set to the given {@code componentType} and {@code componentName} registered
* within this configuration.
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.axonframework.monitoring.MessageMonitor;

import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

Expand All @@ -42,25 +43,44 @@
*/
public interface EventProcessingConfigurer {

/**
* Contract which defines how to build an event processor.
*/
@FunctionalInterface interface EventProcessorBuilder {

/**
* Builds an {@link EventProcessor} with the given {@code name}, {@link Configuration} and
* {@link EventHandlerInvoker}.
*
* @param name a {@link String} specifying the name of the {@link EventProcessor} to create
* @param configuration the global {@link Configuration} the implementation may use to obtain dependencies
* @param eventHandlerInvoker the {@link EventHandlerInvoker} assigned to the {@link EventProcessor} to be
* created, used to invoke event handlers
* @return an {@link EventProcessor}
*/
EventProcessor build(String name, Configuration configuration, EventHandlerInvoker eventHandlerInvoker);
}

/**
* Registers a Saga with default configuration within this Configurer.
*
* @param sagaType the type of Saga
* @param <T> the type of Saga
* @return the current {@link EventProcessingConfigurer} instance, for fluent interfacing
* @see SagaConfiguration#defaultConfiguration(Class)
*/
default <T> EventProcessingConfigurer registerSaga(Class<T> sagaType) {
return registerSagaConfiguration(SagaConfiguration.defaultConfiguration(sagaType));
return registerSaga(sagaType, c -> {});
}

/**
* Registers a {@link SagaConfiguration}.
* Registers a Saga, allowing specific configuration to use for this Saga type.
*
* @param sagaConfiguration a {@link SagaConfiguration}
* @param <T> The type of Saga to configure
* @param sagaType The type of Saga to configure
* @param sagaConfigurer a function providing modifications on top of the defaul configuration
* @return the current {@link EventProcessingConfigurer} instance, for fluent interfacing
*/
EventProcessingConfigurer registerSagaConfiguration(SagaConfiguration<?> sagaConfiguration);
<T> EventProcessingConfigurer registerSaga(Class<T> sagaType, Consumer<SagaConfigurer<T>> sagaConfigurer);

/**
* Registers a {@link Function} that builds a {@link SagaStore}.
Expand Down Expand Up @@ -449,23 +469,4 @@ EventProcessingConfigurer registerRollbackConfiguration(String name,
EventProcessingConfigurer registerTransactionManager(String name,
Function<Configuration, TransactionManager> transactionManagerBuilder);

/**
* Contract which defines how to build an event processor.
*/
@FunctionalInterface
interface EventProcessorBuilder {

/**
* Builds an {@link EventProcessor} with the given {@code name}, {@link Configuration} and
* {@link EventHandlerInvoker}.
*
* @param name a {@link String} specifying the name of the {@link EventProcessor} to create
* @param configuration the global {@link Configuration} the implementation may use to obtain dependencies
* @param eventHandlerInvoker the {@link EventHandlerInvoker} assigned to the {@link EventProcessor} to be
* created, used to invoke event handlers
* @return an {@link EventProcessor}
*/
EventProcessor build(String name, Configuration configuration, EventHandlerInvoker eventHandlerInvoker);
}

}
Expand Up @@ -37,6 +37,7 @@

import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -59,7 +60,7 @@ public class EventProcessingModule

private final List<TypeProcessingGroupSelector> typeSelectors = new ArrayList<>();
private final List<InstanceProcessingGroupSelector> instanceSelectors = new ArrayList<>();
private final List<SagaConfiguration<?>> sagaConfigurations = new ArrayList<>();
private final List<SagaConfigurer<?>> sagaConfigurations = new ArrayList<>();
private final List<Component<Object>> eventHandlerBuilders = new ArrayList<>();
private final Map<String, Component<ListenerInvocationErrorHandler>> listenerInvocationErrorHandlers = new HashMap<>();
private final Map<String, Component<ErrorHandler>> errorHandlers = new HashMap<>();
Expand Down Expand Up @@ -202,11 +203,11 @@ private void registerSimpleEventHandlerInvokers(

private void registerSagaManagers(Map<String, List<Function<Configuration, EventHandlerInvoker>>> handlerInvokers) {
sagaConfigurations.forEach(sc -> {
sc.initialize(configuration);
String processingGroup = selectProcessingGroupByType(sc.type());
SagaConfiguration<?> sagaConfig = sc.initialize(configuration);
String processingGroup = selectProcessingGroupByType(sagaConfig.type());
String processorName = processorNameForProcessingGroup(processingGroup);
handlerInvokers.computeIfAbsent(processorName, k -> new ArrayList<>())
.add(c -> sc.manager().get());
.add(c -> sagaConfig.manager());
});
}

Expand Down Expand Up @@ -308,7 +309,7 @@ public SagaStore sagaStore() {
@Override
public List<SagaConfiguration<?>> sagaConfigurations() {
ensureInitialized();
return new ArrayList<>(sagaConfigurations);
return sagaConfigurations.stream().map(sc -> sc.initialize(configuration)).collect(Collectors.toList());
}

private String processorNameForProcessingGroup(String processingGroup) {
Expand Down Expand Up @@ -352,9 +353,12 @@ private void ensureInitialized() {
}

//<editor-fold desc="configurer methods">

@Override
public EventProcessingConfigurer registerSagaConfiguration(SagaConfiguration<?> sagaConfiguration) {
this.sagaConfigurations.add(sagaConfiguration);
public <T> EventProcessingConfigurer registerSaga(Class<T> sagaType, Consumer<SagaConfigurer<T>> sagaConfigurer) {
SagaConfigurer<T> configurer = SagaConfigurer.forType(sagaType);
sagaConfigurer.accept(configurer);
this.sagaConfigurations.add(configurer);
return this;
}

Expand Down

0 comments on commit fe0c680

Please sign in to comment.