Skip to content

Commit

Permalink
MessageDispatchInterceptor.java now takes a BiFunction of <message_in…
Browse files Browse the repository at this point in the history
…dex, messag> instead of only an index
  • Loading branch information
Steven van Beelen committed Jun 24, 2016
1 parent ef838a0 commit 13a9db3
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 39 deletions.
Expand Up @@ -15,8 +15,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.axonframework.messaging.unitofwork.UnitOfWork.Phase.*;

Expand Down Expand Up @@ -149,9 +149,9 @@ public void publish(List<? extends EventMessage<?>> events) {
protected List<? extends EventMessage<?>> intercept(List<? extends EventMessage<?>> events) {
List<EventMessage<?>> preprocessedEvents = new ArrayList<>(events);
for (MessageDispatchInterceptor<EventMessage<?>> preprocessor : dispatchInterceptors) {
Function<Integer, EventMessage<?>> function = preprocessor.handle(preprocessedEvents);
BiFunction<Integer, EventMessage<?>, EventMessage<?>> function = preprocessor.handle(preprocessedEvents);
for (int i = 0; i < preprocessedEvents.size(); i++) {
preprocessedEvents.set(i, function.apply(i));
preprocessedEvents.set(i, function.apply(i, preprocessedEvents.get(i)));
}
}
return preprocessedEvents;
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -65,7 +65,7 @@ public EventLoggingInterceptor(String loggerName) {
}

@Override
public Function<Integer, EventMessage<?>> handle(List<EventMessage<?>> messages) {
public BiFunction<Integer, EventMessage<?>, EventMessage<?>> handle(List<EventMessage<?>> messages) {
StringBuilder sb = new StringBuilder(String.format("Events published: [%s]",
messages.stream().map(m -> m.getPayloadType().getSimpleName()).collect(Collectors.joining(", "))));
if (CurrentUnitOfWork.isStarted()) {
Expand All @@ -86,6 +86,6 @@ public Function<Integer, EventMessage<?>> handle(List<EventMessage<?>> messages)
}
}
logger.info(sb.toString());
return messages::get;
return (i, m) -> m;
}
}
Expand Up @@ -18,7 +18,7 @@

import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.function.BiFunction;

/**
* Interceptor that allows messages to be intercepted and modified before they are dispatched. This interceptor
Expand All @@ -38,7 +38,7 @@ public interface MessageDispatchInterceptor<T extends Message<?>> {
* @return the message to dispatch
*/
default T handle(T message) {
return handle(Collections.singletonList(message)).apply(0);
return handle(Collections.singletonList(message)).apply(0, message);
}

/**
Expand All @@ -51,6 +51,6 @@ default T handle(T message) {
* @param messages The Messages to pre-process
* @return a function that processes messages based on their position in the list
*/
Function<Integer, T> handle(List<T> messages);
BiFunction<Integer, T, T> handle(List<T> messages);

}
Expand Up @@ -28,7 +28,7 @@
import javax.validation.ValidatorFactory;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.function.BiFunction;

/**
* Interceptor that applies JSR303 bean validation on incoming messages. When validation on a message fails, a
Expand Down Expand Up @@ -69,9 +69,8 @@ public Object handle(UnitOfWork<? extends T> unitOfWork, InterceptorChain interc
}

@Override
public Function<Integer, T> handle(List<T> messages) {
return index -> {
T message = messages.get(index);
public BiFunction<Integer, T, T> handle(List<T> messages) {
return (index, message) -> {
Validator validator = validatorFactory.getValidator();
Set<ConstraintViolation<Object>> violations = validateMessage(message.getPayload(), validator);
if (violations != null && !violations.isEmpty()) {
Expand Down
Expand Up @@ -33,7 +33,7 @@
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.BiFunction;

import static junit.framework.TestCase.assertEquals;

Expand Down Expand Up @@ -90,10 +90,10 @@ public void testCommandDipatchInterceptorExceptionOnRetryThreadIsThrownToCaller(
final Thread testThread = Thread.currentThread();
commandBus.setDispatchInterceptors(Collections.singletonList(new MessageDispatchInterceptor<CommandMessage<?>>() {
@Override
public Function<Integer, CommandMessage<?>> handle(List<CommandMessage<?>> messages) {
return (index) -> {
public BiFunction<Integer, CommandMessage<?>, CommandMessage<?>> handle(List<CommandMessage<?>> messages) {
return (index, message) -> {
if (Thread.currentThread() == testThread) {
return messages.get(index); // ok
return message; // ok
} else {
// also, nothing is logged!
LoggerFactory.getLogger(getClass()).info("throwing exception from dispatcher...");
Expand Down Expand Up @@ -124,10 +124,10 @@ public void testCommandGatewayDispatchInterceptorMetaDataIsPreservedOnRetry() {
commandGateway = new DefaultCommandGateway(commandBus, retryScheduler, new MessageDispatchInterceptor<CommandMessage<?>>() {

@Override
public Function<Integer, CommandMessage<?>> handle(List<CommandMessage<?>> messages) {
return (index) -> {
public BiFunction<Integer, CommandMessage<?>, CommandMessage<?>> handle(List<CommandMessage<?>> messages) {
return (index, message) -> {
if (Thread.currentThread() == testThread) {
return messages.get(index).andMetaData(
return message.andMetaData(
Collections.singletonMap("gatewayMetaData", "myUserSession"));
} else {
// gateway interceptor should only be called from the caller's thread
Expand Down Expand Up @@ -172,16 +172,14 @@ public void testCommandBusDispatchInterceptorMetaDataIsNotPreservedOnRetry() {
}
});

commandBus.setDispatchInterceptors(Collections.singletonList(messages -> (index) -> {
commandBus.setDispatchInterceptors(Collections.singletonList(messages -> (index, message) -> {
if (Thread.currentThread() == testThread) {
return messages.get(index).andMetaData(Collections.singletonMap("commandBusMetaData",
"myUserSession"));
return message.andMetaData(Collections.singletonMap("commandBusMetaData", "myUserSession"));
} else {
// say the security interceptor example
// from #testCommandDipatchInterceptorExceptionOnRetryThreadIsThrownToCaller
// has been "fixed" -- on the retry thread, there's no security context
return messages.get(index).andMetaData(Collections.singletonMap("commandBusMetaData",
"noUserSession"));
return message.andMetaData(Collections.singletonMap("commandBusMetaData", "noUserSession"));
}
}));

Expand Down
Expand Up @@ -41,14 +41,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.BiFunction;

import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;
import static org.axonframework.test.FixtureTest_CommandInterceptors.InterceptorAggregate.AGGREGATE_IDENTIFIER;
import static org.axonframework.test.FixtureTest_CommandInterceptors.TestCommandDispatchInterceptor.DISPATCH_META_DATA_KEY;
import static org.axonframework.test.FixtureTest_CommandInterceptors.TestCommandDispatchInterceptor.DISPATCH_META_DATA_VALUE;
import static org.axonframework.test.FixtureTest_CommandInterceptors.TestCommandHandlerInterceptor.HANDLER_META_DATA_KEY;
import static org.axonframework.test.FixtureTest_CommandInterceptors.TestCommandHandlerInterceptor.HANDLER_META_DATA_VALUE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.verify;
Expand All @@ -57,6 +53,11 @@
@RunWith(MockitoJUnitRunner.class)
public class FixtureTest_CommandInterceptors {

private static final String DISPATCH_META_DATA_KEY = "dispatchKey";
private static final String DISPATCH_META_DATA_VALUE = "dispatchValue";
private static final String HANDLER_META_DATA_KEY = "handlerKey";
private static final String HANDLER_META_DATA_VALUE = "handlerValue";

private FixtureConfiguration<InterceptorAggregate> fixture;

@Mock
Expand Down Expand Up @@ -263,13 +264,9 @@ public Object getAggregateIdentifier() {

class TestCommandDispatchInterceptor implements MessageDispatchInterceptor<CommandMessage<?>> {

static final String DISPATCH_META_DATA_KEY = "dispatchKey";
static final String DISPATCH_META_DATA_VALUE = "dispatchValue";

@Override
public Function<Integer, CommandMessage<?>> handle(List<CommandMessage<?>> messages) {
return index -> {
CommandMessage<?> message = messages.get(index);
public BiFunction<Integer, CommandMessage<?>, CommandMessage<?>> handle(List<CommandMessage<?>> messages) {
return (index, message) -> {
Map<String, Object> testMetaDataMap = new HashMap<>();
testMetaDataMap.put(DISPATCH_META_DATA_KEY, DISPATCH_META_DATA_VALUE);
message = message.andMetaData(testMetaDataMap);
Expand All @@ -281,9 +278,6 @@ public Function<Integer, CommandMessage<?>> handle(List<CommandMessage<?>> messa

class TestCommandHandlerInterceptor implements MessageHandlerInterceptor<CommandMessage<?>> {

static final String HANDLER_META_DATA_KEY = "handlerKey";
static final String HANDLER_META_DATA_VALUE = "handlerValue";

@Override
public Object handle(UnitOfWork<? extends CommandMessage<?>> unitOfWork, InterceptorChain interceptorChain) throws Exception {
unitOfWork.registerCorrelationDataProvider(new SimpleCorrelationDataProvider(HANDLER_META_DATA_KEY));
Expand Down

0 comments on commit 13a9db3

Please sign in to comment.