Skip to content

Commit

Permalink
Added default CorrelationDataInterceptors to the Configurer
Browse files Browse the repository at this point in the history
By default, Axon will add a traceId and correlationId to each message when components are configured using the Configuration API.
  • Loading branch information
abuijze committed Nov 8, 2016
1 parent 9ff7e8d commit 0e141d3
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 62 deletions.
Expand Up @@ -36,7 +36,6 @@
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.annotation.ClasspathParameterResolverFactory;
import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.messaging.correlation.CorrelationDataProvider;
Expand Down Expand Up @@ -82,8 +81,6 @@ public class DefaultConfigurer implements Configurer {
private final Configuration config = new ConfigurationImpl();

private final Component<BiFunction<Class<?>, String, MessageMonitor<Message<?>>>> messageMonitorFactory = new Component<>(config, "monitorFactory", (c) -> (type, name) -> NoOpMessageMonitor.instance());
private final Component<MessageHandlerInterceptor<Message<?>>> interceptor = new Component<>(config, "correlationInterceptor",
c -> new CorrelationDataInterceptor<>());
private final Component<List<CorrelationDataProvider>> correlationProviders = new Component<>(config, "correlationProviders",
c -> asList(msg -> singletonMap("correlationId", msg.getIdentifier()),
msg -> singletonMap("traceId", msg.getMetaData().getOrDefault("traceId", msg.getIdentifier()))
Expand Down Expand Up @@ -162,7 +159,7 @@ protected ParameterResolverFactory defaultParameterResolverFactory(Configuration
*/
protected CommandBus defaultCommandBus(Configuration config) {
SimpleCommandBus cb = new SimpleCommandBus(config.messageMonitor(SimpleCommandBus.class, "commandBus"));
cb.setHandlerInterceptors(singletonList(interceptor.get()));
cb.setHandlerInterceptors(singletonList(new CorrelationDataInterceptor<>(config.correlationDataProviders())));
DefaultUnitOfWorkFactory unitOfWorkFactory = new DefaultUnitOfWorkFactory(config.getComponent(TransactionManager.class));
config.correlationDataProviders().forEach(unitOfWorkFactory::registerCorrelationDataProvider);
cb.setUnitOfWorkFactory(unitOfWorkFactory);
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;
import org.axonframework.messaging.interceptors.TransactionManagingInterceptor;

import java.util.*;
import java.util.function.Function;
Expand Down Expand Up @@ -58,14 +57,16 @@ public EventHandlingConfiguration() {
}

private SubscribingEventProcessor defaultEventProcessor(Configuration conf, String name, List<?> eh) {
return new SubscribingEventProcessor(name,
new SimpleEventHandlerInvoker(eh,
conf.getComponent(
ListenerErrorHandler.class,
LoggingListenerErrorHandler::new)),
conf.eventBus(),
conf.messageMonitor(SubscribingEventProcessor.class,
name));
SubscribingEventProcessor processor = new SubscribingEventProcessor(name,
new SimpleEventHandlerInvoker(eh,
conf.getComponent(
ListenerErrorHandler.class,
LoggingListenerErrorHandler::new)),
conf.eventBus(),
conf.messageMonitor(SubscribingEventProcessor.class,
name));
processor.registerInterceptor(new CorrelationDataInterceptor<>(conf.correlationDataProviders()));
return processor;
}

/**
Expand Down Expand Up @@ -106,8 +107,7 @@ private EventProcessor buildTrackingEventProcessor(Configuration conf, String na
NoTransactionManager::instance),
conf.messageMonitor(EventProcessor.class,
name));
CorrelationDataInterceptor<EventMessage<?>> interceptor = new CorrelationDataInterceptor<>();
interceptor.registerCorrelationDataProviders(conf.correlationDataProviders());
CorrelationDataInterceptor<EventMessage<?>> interceptor = new CorrelationDataInterceptor<>(conf.correlationDataProviders());
processor.registerInterceptor(interceptor);
return processor;
}
Expand Down
Expand Up @@ -16,15 +16,15 @@

package org.axonframework.messaging.interceptors;

import org.axonframework.common.Registration;
import org.axonframework.messaging.InterceptorChain;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.correlation.CorrelationDataProvider;
import org.axonframework.messaging.unitofwork.UnitOfWork;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.List;

/**
* Message interceptor that registers {@link CorrelationDataProvider CorrelationDataProviders} with the Unit of Work.
Expand All @@ -38,35 +38,21 @@
*/
public class CorrelationDataInterceptor<T extends Message<?>> implements MessageHandlerInterceptor<T> {

private final Collection<CorrelationDataProvider> correlationDataProviders = new CopyOnWriteArraySet<>();

@Override
public Object handle(UnitOfWork<? extends T> unitOfWork, InterceptorChain interceptorChain) throws Exception {
correlationDataProviders.forEach(unitOfWork::registerCorrelationDataProvider);
return interceptorChain.proceed();
}
private final List<CorrelationDataProvider> correlationDataProviders;

/**
* Registers given {@code correlationDataProvider} with the Interceptor. The provider will be registered with
* the Unit of Work each time a Message is intercepted that is to be processed.
* Initializes the interceptor that registers given {@code correlationDataProvider} with the current Unit of Work.
*
* @param correlationDataProvider The CorrelationDataProvider to register with the Interceptor
* @return a handle to cancel the registration. After cancellation the given {@code correlationDataProvider}
* will no longer be registered with new Units of Work by this interceptor.
* @param correlationDataProviders The CorrelationDataProviders to register with the Interceptor
*/
public Registration registerCorrelationDataProvider(CorrelationDataProvider correlationDataProvider) {
correlationDataProviders.add(correlationDataProvider);
return () -> correlationDataProviders.remove(correlationDataProvider);
public CorrelationDataInterceptor(Collection<CorrelationDataProvider> correlationDataProviders) {
this.correlationDataProviders = new ArrayList<>(correlationDataProviders);
}

/**
* Registers given {@code correlationDataProviders} with the Interceptor. The providers will be registered with
* the Unit of Work each time a Message is intercepted that is to be processed.
*
* @param correlationDataProviders The CorrelationDataProviders to register with the Interceptor
*/
public void registerCorrelationDataProviders(Collection<CorrelationDataProvider> correlationDataProviders) {
this.correlationDataProviders.addAll(correlationDataProviders);
@Override
public Object handle(UnitOfWork<? extends T> unitOfWork, InterceptorChain interceptorChain) throws Exception {
correlationDataProviders.forEach(unitOfWork::registerCorrelationDataProvider);
return interceptorChain.proceed();
}

}
Expand Up @@ -16,15 +16,17 @@

package org.axonframework.messaging.interceptors;

import org.axonframework.common.Registration;
import org.axonframework.messaging.InterceptorChain;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.correlation.CorrelationDataProvider;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.junit.Before;
import org.junit.Test;

import static org.mockito.Mockito.*;
import java.util.Arrays;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

/**
* @author Rene de Waele
Expand All @@ -34,39 +36,24 @@ public class CorrelationDataInterceptorTest {
private CorrelationDataInterceptor<Message<?>> subject;
private UnitOfWork<Message<?>> mockUnitOfWork;
private InterceptorChain mockInterceptorChain;
private CorrelationDataProvider mockProvider1;
private CorrelationDataProvider mockProvider2;

@Before
@SuppressWarnings("unchecked")
public void setUp() {
subject = new CorrelationDataInterceptor<>();
mockProvider1 = mock(CorrelationDataProvider.class);
mockProvider2 = mock(CorrelationDataProvider.class);
subject = new CorrelationDataInterceptor<>(Arrays.asList(mockProvider1, mockProvider2));
mockUnitOfWork = mock(UnitOfWork.class);
mockInterceptorChain = mock(InterceptorChain.class);
}

@Test
public void testAttachesCorrelationDataProvidersToUnitOfWork() throws Exception {
CorrelationDataProvider mockProvider1 = mock(CorrelationDataProvider.class);
CorrelationDataProvider mockProvider2 = mock(CorrelationDataProvider.class);
subject.registerCorrelationDataProvider(mockProvider1);
subject.registerCorrelationDataProvider(mockProvider2);
subject.handle(mockUnitOfWork, mockInterceptorChain);
verify(mockUnitOfWork).registerCorrelationDataProvider(mockProvider1);
verify(mockUnitOfWork).registerCorrelationDataProvider(mockProvider2);
verify(mockInterceptorChain).proceed();
}

@Test
public void testUnregisteredProviderIsNoLongerAttachedToUnitOfWork() throws Exception {
CorrelationDataProvider mockProvider = mock(CorrelationDataProvider.class);
Registration registration = subject.registerCorrelationDataProvider(mockProvider);
subject.handle(mockUnitOfWork, mockInterceptorChain);
verify(mockUnitOfWork).registerCorrelationDataProvider(mockProvider);
verify(mockInterceptorChain).proceed();
registration.cancel();
reset((Object) mockInterceptorChain);
subject.handle(mockUnitOfWork, mockInterceptorChain);
verifyNoMoreInteractions(mockUnitOfWork);
verify(mockInterceptorChain).proceed();
}

}

0 comments on commit 0e141d3

Please sign in to comment.