Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

CommandGateways can now be configured with Callbacks using the factory

This is useful when a callback needs to be invoked for each command sent. With
this mechanism, that callback doesn't have to be passed via each method
invocation on the gateway.
If the generic type of the callback is detected to be incompatible with the
actual result of a command, the callback is ignored for that result.

Issue #AXON-242 Fixed
  • Loading branch information...
commit d5f85262b08e5844d4cd1aa111eb54abe3268fe1 1 parent 16a498c
@abuijze abuijze authored
View
25 core/src/main/java/org/axonframework/commandhandling/gateway/CommandGatewayFactoryBean.java
@@ -17,6 +17,7 @@
package org.axonframework.commandhandling.gateway;
import org.axonframework.commandhandling.CommandBus;
+import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandDispatchInterceptor;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
@@ -46,6 +47,7 @@
private CommandBus commandBus;
private RetryScheduler retryScheduler;
private List<CommandDispatchInterceptor> dispatchInterceptors = Collections.emptyList();
+ private List<CommandCallback<?>> commandCallbacks = Collections.emptyList();
private T gateway;
private Class<T> gatewayInterface;
@@ -70,8 +72,11 @@ public void afterPropertiesSet() throws Exception {
if (commandBus == null) {
throw new AxonConfigurationException("CommandBus may not be null");
}
- gateway = (T) new GatewayProxyFactory(commandBus, retryScheduler, dispatchInterceptors)
- .createGateway(gatewayInterface == null ? CommandGateway.class : gatewayInterface);
+ final GatewayProxyFactory factory = new GatewayProxyFactory(commandBus, retryScheduler, dispatchInterceptors);
+ for (CommandCallback<?> commandCallback : commandCallbacks) {
+ factory.registerCommandCallback(commandCallback);
+ }
+ gateway = (T) factory.createGateway(gatewayInterface == null ? CommandGateway.class : gatewayInterface);
}
/**
@@ -100,7 +105,7 @@ public void setRetryScheduler(RetryScheduler retryScheduler) {
*
* @param gatewayInterface The interface describing the gateway
* @throws IllegalArgumentException if the given <code>gatewayInterface</code> is <code>null</code> or not an
- * interface.
+ * interface.
*/
public void setGatewayInterface(Class<T> gatewayInterface) {
Assert.notNull(gatewayInterface, "The given gateway interface may not be null");
@@ -135,4 +140,18 @@ public void setCommandDispatchInterceptors(CommandDispatchInterceptor... command
public void setCommandDispatchInterceptors(List<CommandDispatchInterceptor> commandDispatchInterceptors) {
this.dispatchInterceptors = commandDispatchInterceptors;
}
+
+ /**
+ * Registers the <code>commandCallbacks</code>, which are invoked for each sent command, unless Axon is able to detect
+ * that the result of the command does not match the type accepted by that callback.
+ * <p/>
+ * Axon will check the signature of the onSuccess() method and only invoke the callback if the actual result of the
+ * command is an instance of that type. If Axon is unable to detect the type, the callback is always invoked,
+ * potentially causing {@link java.lang.ClassCastException}.
+ *
+ * @param commandCallbacks The callbacks to register
+ */
+ public void setCommandCallbacks(List<CommandCallback<?>> commandCallbacks) {
+ this.commandCallbacks = commandCallbacks;
+ }
}
View
173 core/src/main/java/org/axonframework/commandhandling/gateway/GatewayProxyFactory.java
@@ -23,17 +23,19 @@
import org.axonframework.commandhandling.callbacks.FutureCallback;
import org.axonframework.common.Assert;
import org.axonframework.common.CollectionUtils;
+import org.axonframework.common.ReflectionUtils;
import org.axonframework.common.annotation.MetaData;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -88,7 +90,7 @@
* </ul>
* In other cases, the method is non-blocking and will return immediately after dispatching a command.
* <p/>
- * This factory is thread safe, and so are the gateways it creates.
+ * This factory is thread safe once configured, and so are the gateways it creates.
*
* @author Allard Buijze
* @since 2.0
@@ -98,6 +100,7 @@
private final CommandBus commandBus;
private final RetryScheduler retryScheduler;
private final List<CommandDispatchInterceptor> dispatchInterceptors;
+ private final List<CommandCallback<?>> commandCallbacks;
/**
* Initialize the factory sending Commands to the given <code>commandBus</code>, optionally intercepting them with
@@ -150,10 +153,11 @@ public GatewayProxyFactory(CommandBus commandBus, RetryScheduler retryScheduler,
this.retryScheduler = retryScheduler;
this.commandBus = commandBus;
if (commandDispatchInterceptors != null && !commandDispatchInterceptors.isEmpty()) {
- this.dispatchInterceptors = new ArrayList<CommandDispatchInterceptor>(commandDispatchInterceptors);
+ this.dispatchInterceptors = new CopyOnWriteArrayList<CommandDispatchInterceptor>(commandDispatchInterceptors);
} else {
- this.dispatchInterceptors = Collections.emptyList();
+ this.dispatchInterceptors = new CopyOnWriteArrayList<CommandDispatchInterceptor>();
}
+ this.commandCallbacks = new CopyOnWriteArrayList<CommandCallback<?>>();
}
/**
@@ -173,7 +177,8 @@ public GatewayProxyFactory(CommandBus commandBus, RetryScheduler retryScheduler,
final Class<?>[] arguments = gatewayMethod.getParameterTypes();
InvocationHandler dispatcher = new DispatchOnInvocationHandler(commandBus, retryScheduler,
- dispatchInterceptors, extractors, true);
+ dispatchInterceptors, extractors,
+ commandCallbacks, true);
if (!Future.class.equals(gatewayMethod.getReturnType())) {
// no wrapping
if (arguments.length >= 3
@@ -192,11 +197,11 @@ public GatewayProxyFactory(CommandBus commandBus, RetryScheduler retryScheduler,
} else if (!Void.TYPE.equals(gatewayMethod.getReturnType())
|| gatewayMethod.getExceptionTypes().length > 0) {
dispatcher = wrapToWaitForResult(dispatcher);
- } else if (!hasCallbackParameters(gatewayMethod)) {
+ } else if (commandCallbacks.isEmpty() && !hasCallbackParameters(gatewayMethod)) {
// switch to fire-and-forget mode
- dispatcher = wrapToFireAndForget(new DispatchOnInvocationHandler(commandBus, retryScheduler,
- dispatchInterceptors,
- extractors, false));
+ dispatcher = wrapToFireAndForget(new DispatchOnInvocationHandler(
+ commandBus, retryScheduler, dispatchInterceptors, extractors,
+ commandCallbacks, false));
}
}
Class<?>[] declaredExceptions = gatewayMethod.getExceptionTypes();
@@ -329,6 +334,69 @@ private boolean contains(Class<?>[] declaredExceptions, Class<?> exceptionClass)
return false;
}
+ /**
+ * Registers the <code>callback</code>, which is invoked for each sent command, unless Axon is able to detect that
+ * the result of the command does not match the type accepted by the callback.
+ * <p/>
+ * Axon will check the signature of the onSuccess() method and only invoke the callback if the actual result of the
+ * command is an instance of that type. If Axon is unable to detect the type, the callback is always invoked,
+ * potentially causing {@link java.lang.ClassCastException}.
+ *
+ * @param callback The callback to register
+ * @param <R> The type of return value the callback is interested in
+ * @return this instance for further configuration
+ */
+ public <R> GatewayProxyFactory registerCommandCallback(CommandCallback<R> callback) {
+ this.commandCallbacks.add(new TypeSafeCallbackWrapper<R>(callback));
+ return this;
+ }
+
+ /**
+ * Registers the given <code>dispatchInterceptor</code> which is invoked for each Command dispatched through the
+ * Command Gateways created by this factory.
+ *
+ * @param dispatchInterceptor The interceptor to register.
+ * @return this instance for further configuration
+ */
+ public GatewayProxyFactory registerDispatchInterceptor(CommandDispatchInterceptor dispatchInterceptor) {
+ this.dispatchInterceptors.add(dispatchInterceptor);
+ return this;
+ }
+
+ private MetaDataExtractor[] extractMetaData(Annotation[][] parameterAnnotations) {
+ List<MetaDataExtractor> extractors = new ArrayList<MetaDataExtractor>();
+ for (int i = 0; i < parameterAnnotations.length; i++) {
+ Annotation[] annotations = parameterAnnotations[i];
+ final MetaData metaDataAnnotation = CollectionUtils.getAnnotation(annotations, MetaData.class);
+ if (metaDataAnnotation != null) {
+ extractors.add(new MetaDataExtractor(i, metaDataAnnotation.value()));
+ }
+ }
+ return extractors.toArray(new MetaDataExtractor[extractors.size()]);
+ }
+
+ /**
+ * Interface towards the mechanism that handles a method call on a gateway interface method.
+ *
+ * @param <R> The return type of the method invocation
+ */
+ public interface InvocationHandler<R> {
+
+ /**
+ * Handle the invocation of the given <code>invokedMethod</code>, invoked on given <code>proxy</code> with
+ * given
+ * <code>args</code>.
+ *
+ * @param proxy The proxy on which the method was invoked
+ * @param invokedMethod The method being invoked
+ * @param args The arguments of the invocation
+ * @return the return value of the invocation
+ *
+ * @throws Throwable any exceptions that occurred while processing the invocation
+ */
+ R invoke(Object proxy, Method invokedMethod, Object[] args) throws Throwable;
+ }
+
private static class GatewayInvocationHandler extends AbstractCommandGateway implements
java.lang.reflect.InvocationHandler {
@@ -352,30 +420,22 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
}
}
- private MetaDataExtractor[] extractMetaData(Annotation[][] parameterAnnotations) {
- List<MetaDataExtractor> extractors = new ArrayList<MetaDataExtractor>();
- for (int i = 0; i < parameterAnnotations.length; i++) {
- Annotation[] annotations = parameterAnnotations[i];
- final MetaData metaDataAnnotation = CollectionUtils.getAnnotation(annotations, MetaData.class);
- if (metaDataAnnotation != null) {
- extractors.add(new MetaDataExtractor(i, metaDataAnnotation.value()));
- }
- }
- return extractors.toArray(new MetaDataExtractor[extractors.size()]);
- }
-
private static class DispatchOnInvocationHandler<R> extends AbstractCommandGateway
implements InvocationHandler<Future<R>> {
private final MetaDataExtractor[] metaDataExtractors;
- private final boolean useCallbacks;
+ private final List<CommandCallback<? super R>> commandCallbacks;
+ private final boolean forceCallbacks;
protected DispatchOnInvocationHandler(CommandBus commandBus, RetryScheduler retryScheduler,
List<CommandDispatchInterceptor> commandDispatchInterceptors,
- MetaDataExtractor[] metaDataExtractors, boolean useCallbacks) {
+ MetaDataExtractor[] metaDataExtractors,
+ List<CommandCallback<? super R>> commandCallbacks,
+ boolean forceCallbacks) {
super(commandBus, retryScheduler, commandDispatchInterceptors);
this.metaDataExtractors = metaDataExtractors; // NOSONAR
- this.useCallbacks = useCallbacks;
+ this.commandCallbacks = commandCallbacks;
+ this.forceCallbacks = forceCallbacks;
}
@SuppressWarnings("unchecked")
@@ -391,8 +451,8 @@ protected DispatchOnInvocationHandler(CommandBus commandBus, RetryScheduler retr
command = asCommandMessage(command).withMetaData(metaDataValues);
}
}
- if (useCallbacks) {
- List<CommandCallback<R>> callbacks = new LinkedList<CommandCallback<R>>();
+ if (forceCallbacks || !commandCallbacks.isEmpty()) {
+ List<CommandCallback<? super R>> callbacks = new LinkedList<CommandCallback<? super R>>();
FutureCallback<R> future = new FutureCallback<R>();
callbacks.add(future);
for (Object arg : args) {
@@ -401,6 +461,7 @@ protected DispatchOnInvocationHandler(CommandBus commandBus, RetryScheduler retr
callbacks.add(callback);
}
}
+ callbacks.addAll(commandCallbacks);
send(command, new CompositeCallback(callbacks));
return future;
} else {
@@ -412,16 +473,16 @@ protected DispatchOnInvocationHandler(CommandBus commandBus, RetryScheduler retr
private static class CompositeCallback<R> implements CommandCallback<R> {
- private final List<CommandCallback<R>> callbacks;
+ private final List<CommandCallback<? super R>> callbacks;
@SuppressWarnings("unchecked")
- public CompositeCallback(List<CommandCallback<R>> callbacks) {
- this.callbacks = new ArrayList<CommandCallback<R>>(callbacks);
+ public CompositeCallback(List<CommandCallback<? super R>> callbacks) {
+ this.callbacks = new ArrayList<CommandCallback<? super R>>(callbacks);
}
@Override
public void onSuccess(R result) {
- for (CommandCallback<R> callback : callbacks) {
+ for (CommandCallback<? super R> callback : callbacks) {
callback.onSuccess(result);
}
}
@@ -587,25 +648,39 @@ public void addMetaData(Object[] args, Map<String, Object> metaData) {
}
}
- /**
- * Interface towards the mechanism that handles a method call on a gateway interface method.
- *
- * @param <R> The return type of the method invocation
- */
- public interface InvocationHandler<R> {
+ private static class TypeSafeCallbackWrapper<R> implements CommandCallback<Object> {
- /**
- * Handle the invocation of the given <code>invokedMethod</code>, invoked on given <code>proxy</code> with
- * given
- * <code>args</code>.
- *
- * @param proxy The proxy on which the method was invoked
- * @param invokedMethod The method being invoked
- * @param args The arguments of the invocation
- * @return the return value of the invocation
- *
- * @throws Throwable any exceptions that occurred while processing the invocation
- */
- R invoke(Object proxy, Method invokedMethod, Object[] args) throws Throwable;
+ private final CommandCallback<R> delegate;
+ private final Class<R> parameterType;
+
+ @SuppressWarnings("unchecked")
+ public TypeSafeCallbackWrapper(CommandCallback<R> delegate) {
+ this.delegate = delegate;
+ Class discoveredParameterType = Object.class;
+ for (Method m : ReflectionUtils.methodsOf(delegate.getClass())) {
+ if (m.getGenericParameterTypes().length == 1
+ && m.getGenericParameterTypes()[0] != Object.class
+ && "onSuccess".equals(m.getName())
+ && Modifier.isPublic(m.getModifiers())) {
+ discoveredParameterType = m.getParameterTypes()[0];
+ if (discoveredParameterType != Object.class) {
+ break;
+ }
+ }
+ }
+ parameterType = discoveredParameterType;
+ }
+
+ @Override
+ public void onSuccess(Object result) {
+ if (parameterType.isInstance(result)) {
+ delegate.onSuccess(parameterType.cast(result));
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ delegate.onFailure(cause);
+ }
}
}
View
59 core/src/test/java/org/axonframework/commandhandling/gateway/GatewayProxyFactoryTest.java
@@ -25,8 +25,10 @@
import org.axonframework.unitofwork.DefaultUnitOfWork;
import org.axonframework.unitofwork.UnitOfWork;
import org.hamcrest.Description;
-import org.junit.*;
-import org.junit.internal.matchers.*;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
import org.mockito.invocation.*;
import org.mockito.stubbing.*;
@@ -49,21 +51,33 @@
private static GatewayProxyFactory testSubject;
private static CompleteGateway gateway;
private static RetryScheduler mockRetryScheduler;
+ private static CommandCallback callback;
@BeforeClass
public static void beforeClass() {
mockCommandBus = mock(CommandBus.class);
mockRetryScheduler = mock(RetryScheduler.class);
testSubject = new GatewayProxyFactory(mockCommandBus, mockRetryScheduler);
+ callback = spy(new StringCommandCallback());
+ testSubject.registerCommandCallback(new CommandCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ }
+ });
+ testSubject.registerCommandCallback(callback);
gateway = testSubject.createGateway(CompleteGateway.class);
}
@Before
public void setUp() {
- reset(mockCommandBus, mockRetryScheduler);
+ reset(mockCommandBus, mockRetryScheduler, callback);
}
- @Test(timeout = 2000)
+ @Test//(timeout = 2000)
public void testGateway_FireAndForget() {
final Object metaTest = new Object();
gateway.fireAndForget("Command", metaTest, "value");
@@ -135,6 +149,7 @@ public void run() {
assertTrue("Expected command bus to be invoked", cdl.await(1, TimeUnit.SECONDS));
t.join();
assertEquals("ReturnValue", result.get());
+ verify(callback).onSuccess("ReturnValue");
}
@Test(timeout = 2000)
@@ -160,6 +175,7 @@ public void run() {
assertNull("Did not expect ReturnValue", result.get());
assertTrue(error.get() instanceof CommandExecutionException);
assertTrue(error.get().getCause() instanceof ExpectedException);
+ verify(callback).onFailure(isA(ExpectedException.class));
}
@Test(timeout = 2000)
@@ -312,6 +328,7 @@ public void run() {
t.join();
assertNull("Did not expect ReturnValue", result.get());
assertTrue(error.get() instanceof ExpectedException);
+ verify(callback).onFailure(isA(ExpectedException.class));
}
@Test(timeout = 2000)
@@ -481,6 +498,24 @@ public void testCreateGateway_AsyncWithCallbacks_Success() {
}
@Test(timeout = 2000)
+ public void testCreateGateway_AsyncWithCallbacks_Success_ButReturnTypeDoesntMatchCallback() {
+ CountDownLatch cdl = new CountDownLatch(1);
+
+ final CommandCallback callback1 = mock(CommandCallback.class);
+ final CommandCallback callback2 = mock(CommandCallback.class);
+
+ doAnswer(new Success(cdl, 42))
+ .when(mockCommandBus).dispatch(isA(CommandMessage.class), isA(CommandCallback.class));
+
+ gateway.fireAsyncWithCallbacks("Command", callback1, callback2);
+ assertEquals(0, cdl.getCount());
+
+ verify(callback1).onSuccess(42);
+ verify(callback2).onSuccess(42);
+ verify(callback, never()).onSuccess(anyObject());
+ }
+
+ @Test(timeout = 2000)
public void testCreateGateway_AsyncWithCallbacks_Failure() {
final CommandCallback callback1 = mock(CommandCallback.class);
final CommandCallback callback2 = mock(CommandCallback.class);
@@ -575,9 +610,9 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
private static class Success implements Answer {
private final CountDownLatch cdl;
- private final String returnValue;
+ private final Object returnValue;
- public Success(CountDownLatch cdl, String returnValue) {
+ public Success(CountDownLatch cdl, Object returnValue) {
this.cdl = cdl;
this.returnValue = returnValue;
}
@@ -590,6 +625,18 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}
}
+ public static class StringCommandCallback implements CommandCallback<String> {
+
+ @Override
+ public void onSuccess(String result) {
+ System.out.println("YAYYYYYYYYYYYYYY");
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ }
+ }
+
private class Failure implements Answer {
private final CountDownLatch cdl;

0 comments on commit d5f8526

Please sign in to comment.
Something went wrong with that request. Please try again.