Skip to content

Commit

Permalink
Code review remarks.
Browse files Browse the repository at this point in the history
  • Loading branch information
m1l4n54v1c committed May 7, 2018
1 parent 437077c commit 87344e4
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 35 deletions.
Expand Up @@ -43,7 +43,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -135,14 +134,15 @@ public EventProcessorRegistry registerHandlerInterceptor(String processorName,
}

@Override
public Optional<EventProcessor> eventProcessor(String name) {
public List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptorsFor(String processorName) {
Assert.state(configuration != null, () -> "Configuration is not initialized yet");
if (eventProcessors.containsKey(name)) {
return Optional.of(eventProcessors.get(name).get());
}
return Optional.empty();
return handlerInterceptorsBuilders.getOrDefault(processorName, new ArrayList<>())
.stream()
.map(hi -> hi.apply(configuration))
.collect(Collectors.toList());
}


@Override
public EventProcessorRegistry registerEventProcessorFactory(EventProcessorBuilder eventProcessorBuilder) {
this.defaultEventProcessorBuilder = eventProcessorBuilder;
Expand Down
Expand Up @@ -124,7 +124,9 @@ public EventHandlingConfiguration() {
*
* @see EventHandlingConfiguration#registerHandlerInterceptor(BiFunction)
* @see EventHandlingConfiguration#registerHandlerInterceptor(String, Function)
* @deprecated use {@link EventProcessorRegistry#interceptorsFor(String)} instead
*/
@Deprecated
public List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptorsFor(Configuration configuration,
String processorName) {
List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptors = new ArrayList<>();
Expand Down Expand Up @@ -603,7 +605,9 @@ public EventHandlingConfiguration registerTokenStore(String name, Function<Confi
* configuration hasn't been {@link #initialize(Configuration) initialized} yet.
*
* @return a read-only list of processors initialized in this configuration.
* @deprecated use {@link EventProcessorRegistry#eventProcessors()} instead
*/
@Deprecated
public List<EventProcessor> getProcessors() {
Assert.state(config != null, () -> "Configuration is not initialized yet");
return new ArrayList<>(config.eventProcessorRegistry().eventProcessors().values());
Expand All @@ -615,7 +619,9 @@ public List<EventProcessor> getProcessors() {
*
* @param name The name of the processor to return
* @return an Optional referencing the processor, if present.
* @deprecated use {@link EventProcessorRegistry#eventProcessor(String)} instead
*/
@Deprecated
public <T extends EventProcessor> Optional<T> getProcessor(String name) {
Assert.state(config != null, () -> "Configuration is not initialized yet");
//noinspection unchecked
Expand All @@ -631,9 +637,12 @@ public <T extends EventProcessor> Optional<T> getProcessor(String name) {
* @param expectedType The type of processor expected
* @param <T> The type of processor expected
* @return an Optional referencing the processor, if present and of expected type.
* @deprecated use {@link EventProcessorRegistry#eventProcessor(String, Class)} instead
*/
@Deprecated
public <T extends EventProcessor> Optional<T> getProcessor(String name, Class<T> expectedType) {
return getProcessor(name).filter(expectedType::isInstance).map(expectedType::cast);
Assert.state(config != null, () -> "Configuration is not initialized yet");
return config.eventProcessorRegistry().eventProcessor(name, expectedType);
}

/**
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.monitoring.MessageMonitor;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
Expand Down Expand Up @@ -85,6 +86,14 @@ EventProcessorRegistry registerHandlerInvoker(String processorName,
EventProcessorRegistry registerHandlerInterceptor(String processorName,
Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>> interceptorBuilder);

/**
* Returns a list of interceptors for a processor with given {@code processorName}.
*
* @param processorName The name of the processor
* @return a list of interceptors for a processor with given {@code processorName}
*/
List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptorsFor(String processorName);

/**
* Allows for more fine-grained definition of the Event Processor to use for each group of Event Listeners. The
* given builder is expected to create a fully initialized Event Processor implementation based on the name and
Expand Down Expand Up @@ -149,7 +158,7 @@ default EventProcessorRegistry configureMessageMonitor(String name,
EventProcessorRegistry configureMessageMonitor(String name, MessageMonitorFactory messageMonitorFactory);

/**
* Register a TrackingProcessor using default configuration for the given {@code name}. Unlike
* Register a TrackingEventProcessor using default configuration for the given {@code name}. Unlike
* {@link #usingTrackingProcessors()}, this method will not default all processors to tracking, but instead only
* use tracking for event handler that have been assigned to the processor with given {@code name}.
* <p>
Expand All @@ -163,9 +172,9 @@ default EventProcessorRegistry registerTrackingEventProcessor(String name) {
}

/**
* Registers a TrackingProcessor using the given {@code source} to read messages from.
* Registers a TrackingEventProcessor using the given {@code source} to read messages from.
*
* @param name The name of the TrackingProcessor
* @param name The name of the TrackingEventProcessor
* @param source The source of messages for this processor
* @return event processor registry for chaining purposes
*/
Expand All @@ -178,8 +187,8 @@ default EventProcessorRegistry registerTrackingEventProcessorUsingSource(String
}

/**
* Registers a TrackingProcessor with the given {@code name}, reading from the Event Bus (or Store) from the main
* configuration and using the given {@code processorConfiguration}. The given {@code sequencingPolicy} defines
* Registers a TrackingEventProcessor with the given {@code name}, reading from the Event Bus (or Store) from the
* main configuration and using the given {@code processorConfiguration}. The given {@code sequencingPolicy} defines
* the policy for events that need to be executed sequentially.
*
* @param name The name of the Tracking Processor
Expand All @@ -192,9 +201,9 @@ default EventProcessorRegistry registerTrackingEventProcessor(String name,
}

/**
* Registers a TrackingProcessor with the given {@code name}, reading from the given {@code source} and using the
* given {@code processorConfiguration}. The given {@code sequencingPolicy} defines the policy for events that need
* to be executed sequentially.
* Registers a TrackingEventProcessor with the given {@code name}, reading from the given {@code source} and using
* the given {@code processorConfiguration}. The given {@code sequencingPolicy} defines the policy for events that
* need to be executed sequentially.
*
* @param name The name of the Tracking Processor
* @param source The source to read Events from
Expand All @@ -206,7 +215,7 @@ EventProcessorRegistry registerTrackingEventProcessor(String name,
Function<Configuration, TrackingEventProcessorConfiguration> processorConfiguration);

/**
* Register a subscribing processor with given {@code name} that subscribes to the Event Bus.
* Register a subscribing event processor with given {@code name} that subscribes to the Event Bus.
*
* @param name The name of the Event Processor
* @return event processor registry for chaining purposes
Expand All @@ -216,8 +225,9 @@ default EventProcessorRegistry registerSubscribingEventProcessor(String name) {
}

/**
* Register a subscribing processor with given {@code name} that subscribes to the given {@code messageSource}.
* This allows the use of standard Subscribing Processors that listen to another source than the Event Bus.
* Register a subscribing event processor with given {@code name} that subscribes to the given {@code
* messageSource}. This allows the use of standard Subscribing Event Processors that listen to another source than
* the Event Bus.
*
* @param name The name of the Event Processor
* @param messageSource The source the processor should read from
Expand Down Expand Up @@ -285,7 +295,18 @@ default EventProcessorRegistry usingTrackingProcessors() {
* @param name The name of the event processor
* @return optional whether event processor with given name exists
*/
Optional<EventProcessor> eventProcessor(String name);
@SuppressWarnings("unchecked")
default <T extends EventProcessor> Optional<T> eventProcessor(String name) {
Map<String, EventProcessor> eventProcessors = eventProcessors();
if (eventProcessors.containsKey(name)) {
return (Optional<T>) Optional.of(eventProcessors.get(name));
}
return Optional.empty();
}

default <T extends EventProcessor> Optional<T> eventProcessor(String name, Class<T> expectedType) {
return eventProcessor(name).filter(expectedType::isInstance).map(expectedType::cast);
}

/**
* Initializes the Event Processor Registry with global configuration. Initializing means that all event processor
Expand Down
Expand Up @@ -38,8 +38,8 @@
*/
public class SimpleEventHandlerInvoker implements EventHandlerInvoker {

private final List<?> eventListenerDelegates;
private final List<EventListener> eventListeners;
private final List<?> eventListeners;
private final List<EventListener> wrappedEventListeners;
private final ListenerInvocationErrorHandler listenerInvocationErrorHandler;
private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;

Expand Down Expand Up @@ -103,12 +103,12 @@ public SimpleEventHandlerInvoker(List<?> eventListeners,
public SimpleEventHandlerInvoker(List<?> eventListeners,
ListenerInvocationErrorHandler listenerInvocationErrorHandler,
SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
this.eventListenerDelegates = eventListeners;
this.eventListeners = eventListeners.stream()
.map(listener -> listener instanceof EventListener ?
this.eventListeners = eventListeners;
this.wrappedEventListeners = eventListeners.stream()
.map(listener -> listener instanceof EventListener ?
(EventListener) listener :
new AnnotationEventListenerAdapter(listener))
.collect(Collectors.toCollection(ArrayList::new));
.collect(Collectors.toCollection(ArrayList::new));
this.sequencingPolicy = sequencingPolicy;
this.listenerInvocationErrorHandler = listenerInvocationErrorHandler;
}
Expand Down Expand Up @@ -148,28 +148,28 @@ public SimpleEventHandlerInvoker(List<?> eventListeners,
ParameterResolverFactory parameterResolverFactory,
ListenerInvocationErrorHandler listenerInvocationErrorHandler,
SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
this.eventListenerDelegates = eventListeners;
this.eventListeners = eventListeners.stream()
.map(listener -> listener instanceof EventListener ?
this.eventListeners = eventListeners;
this.wrappedEventListeners = eventListeners.stream()
.map(listener -> listener instanceof EventListener ?
(EventListener) listener :
new AnnotationEventListenerAdapter(listener, parameterResolverFactory))
.collect(toCollection(ArrayList::new));
.collect(toCollection(ArrayList::new));
this.sequencingPolicy = sequencingPolicy;
this.listenerInvocationErrorHandler = listenerInvocationErrorHandler;
}

/**
* Gets the list of event listener delegates. This delegates are the end point of event handling.
*
* @return the list ov event listener delegates
* @return the list of event listener delegates
*/
public List<?> eventListeners() {
return Collections.unmodifiableList(eventListenerDelegates);
return Collections.unmodifiableList(eventListeners);
}

@Override
public void handle(EventMessage<?> message, Segment segment) throws Exception {
for (EventListener listener : eventListeners) {
for (EventListener listener : wrappedEventListeners) {
try {
listener.handle(message);
} catch(Exception e) {
Expand All @@ -186,7 +186,7 @@ public boolean canHandle(EventMessage<?> eventMessage, Segment segment) {
}

private boolean hasHandler(EventMessage<?> eventMessage) {
for (EventListener eventListener : eventListeners) {
for (EventListener eventListener : wrappedEventListeners) {
if (eventListener.canHandle(eventMessage)) {
return true;
}
Expand All @@ -196,7 +196,7 @@ private boolean hasHandler(EventMessage<?> eventMessage) {

@Override
public boolean supportsReset() {
for (EventListener eventListener : eventListeners) {
for (EventListener eventListener : wrappedEventListeners) {
if (!eventListener.supportsReset()) {
return false;
}
Expand All @@ -206,7 +206,7 @@ public boolean supportsReset() {

@Override
public void performReset() {
for (EventListener eventListener : eventListeners) {
for (EventListener eventListener : wrappedEventListeners) {
eventListener.prepareReset();
}
}
Expand Down

0 comments on commit 87344e4

Please sign in to comment.