Skip to content

Commit

Permalink
FutureCallback returns the whole CommandResponseMessage now.
Browse files Browse the repository at this point in the history
  • Loading branch information
m1l4n54v1c committed Sep 17, 2018
1 parent 0b76246 commit 65836eb
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 37 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected <C> CommandMessage<C> intercept(CommandMessage<C> command) {
* @param <C> The type of payload of the command * @param <C> The type of payload of the command
* @param <R> The type of result expected from the command handler * @param <R> The type of result expected from the command handler
*/ */
protected <C, R> void doDispatch(CommandMessage<C> command, CommandCallback<? super C, R> callback) { protected <C, R> void doDispatch(CommandMessage<C> command, CommandCallback<? super C, ? super R> callback) {
MessageMonitor.MonitorCallback monitorCallback = messageMonitor.onMessageIngested(command); MessageMonitor.MonitorCallback monitorCallback = messageMonitor.onMessageIngested(command);


MessageHandler<? super CommandMessage<?>> handler = findCommandHandlerFor(command).orElseThrow(() -> { MessageHandler<? super CommandMessage<?>> handler = findCommandHandlerFor(command).orElseThrow(() -> {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;


import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.axonframework.commandhandling.GenericCommandResponseMessage.asCommandResponseMessage;


/** /**
* Command Handler Callback that allows the dispatching thread to wait for the result of the callback, using the Future * Command Handler Callback that allows the dispatching thread to wait for the result of the callback, using the Future
Expand All @@ -37,12 +38,13 @@
* @author Allard Buijze * @author Allard Buijze
* @since 0.6 * @since 0.6
*/ */
public class FutureCallback<C, R> extends CompletableFuture<R> implements CommandCallback<C, R> { public class FutureCallback<C, R> extends CompletableFuture<CommandResponseMessage<? extends R>>
implements CommandCallback<C, R> {


@Override @Override
public void onSuccess(CommandMessage<? extends C> commandMessage, public void onSuccess(CommandMessage<? extends C> commandMessage,
CommandResponseMessage<? extends R> commandResponseMessage) { CommandResponseMessage<? extends R> commandResponseMessage) {
super.complete(commandResponseMessage.getPayload()); super.complete(commandResponseMessage);
} }


@Override @Override
Expand All @@ -64,12 +66,12 @@ public void onFailure(CommandMessage commandMessage, Throwable cause) {
* *
* @see #get() * @see #get()
*/ */
public R getResult() { public CommandResponseMessage<? extends R> getResult() {
try { try {
return get(); return get();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return null; return asCommandResponseMessage(null);
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw asRuntime(e); throw asRuntime(e);
} }
Expand All @@ -90,14 +92,14 @@ public R getResult() {
* @param unit the time unit of the timeout argument * @param unit the time unit of the timeout argument
* @return the result of the command handler execution. * @return the result of the command handler execution.
*/ */
public R getResult(long timeout, TimeUnit unit) { public CommandResponseMessage<? extends R> getResult(long timeout, TimeUnit unit) {
try { try {
return get(timeout, unit); return get(timeout, unit);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return null; return asCommandResponseMessage(null);
} catch (TimeoutException e) { } catch (TimeoutException e) {
return null; return asCommandResponseMessage(null);
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw asRuntime(e); throw asRuntime(e);
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ protected AbstractCommandGateway(CommandBus commandBus, RetryScheduler retrySche
* @param callback The callback to notify with the processing result * @param callback The callback to notify with the processing result
* @param <R> The type of response expected from the command * @param <R> The type of response expected from the command
*/ */
protected <C, R> void send(C command, CommandCallback<? super C, R> callback) { protected <C, R> void send(C command, CommandCallback<? super C, ? super R> callback) {
CommandMessage<? extends C> commandMessage = processInterceptors(asCommandMessage(command)); CommandMessage<? extends C> commandMessage = processInterceptors(asCommandMessage(command));
CommandCallback<? super C, R> commandCallback = callback; CommandCallback<? super C, ? super R> commandCallback = callback;
if (retryScheduler != null) { if (retryScheduler != null) {
commandCallback = new RetryingCallback<>(callback, retryScheduler, commandBus); commandCallback = new RetryingCallback<>(callback, retryScheduler, commandBus);
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface CommandGateway {
* @param callback The callback to notify when the command has been processed * @param callback The callback to notify when the command has been processed
* @param <R> The type of result expected from command execution * @param <R> The type of result expected from command execution
*/ */
<C, R> void send(C command, CommandCallback<? super C, R> callback); <C, R> void send(C command, CommandCallback<? super C, ? super R> callback);


/** /**
* Sends the given {@code command} and wait for it to execute. The result of the execution is returned when * Sends the given {@code command} and wait for it to execute. The result of the execution is returned when
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.axonframework.commandhandling.callbacks.FutureCallback; import org.axonframework.commandhandling.callbacks.FutureCallback;
import org.axonframework.common.Assert; import org.axonframework.common.Assert;
import org.axonframework.common.annotation.AnnotationUtils; import org.axonframework.common.annotation.AnnotationUtils;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor; import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.annotation.MetaDataValue; import org.axonframework.messaging.annotation.MetaDataValue;
import org.axonframework.messaging.responsetypes.ResponseType; import org.axonframework.messaging.responsetypes.ResponseType;
Expand Down Expand Up @@ -459,7 +460,7 @@ public CompletableFuture<R> invoke(Object proxy, Method invokedMethod, Object[]
} }
callbacks.addAll(commandCallbacks); callbacks.addAll(commandCallbacks);
send(command, new CompositeCallback(callbacks)); send(command, new CompositeCallback(callbacks));
return future; return future.thenApply(Message::getPayload);
} else { } else {
sendAndForget(command); sendAndForget(command);
return null; return null;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.axonframework.commandhandling.CommandMessage; import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.callbacks.FailureLoggingCallback; import org.axonframework.commandhandling.callbacks.FailureLoggingCallback;
import org.axonframework.commandhandling.callbacks.FutureCallback; import org.axonframework.commandhandling.callbacks.FutureCallback;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor; import org.axonframework.messaging.MessageDispatchInterceptor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -91,7 +92,7 @@ public DefaultCommandGateway(CommandBus commandBus, RetryScheduler retrySchedule
} }


@Override @Override
public <C, R> void send(C command, CommandCallback<? super C, R> callback) { public <C, R> void send(C command, CommandCallback<? super C, ? super R> callback) {
super.send(command, callback); super.send(command, callback);
} }


Expand All @@ -109,9 +110,9 @@ public <C, R> void send(C command, CommandCallback<? super C, R> callback) {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <R> R sendAndWait(Object command) { public <R> R sendAndWait(Object command) {
FutureCallback<Object, Object> futureCallback = new FutureCallback<>(); FutureCallback<Object, R> futureCallback = new FutureCallback<>();
send(command, futureCallback); send(command, futureCallback);
return (R) futureCallback.getResult(); return futureCallback.getResult().getPayload();
} }


/** /**
Expand All @@ -132,15 +133,15 @@ public <R> R sendAndWait(Object command) {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <R> R sendAndWait(Object command, long timeout, TimeUnit unit) { public <R> R sendAndWait(Object command, long timeout, TimeUnit unit) {
FutureCallback<Object, Object> futureCallback = new FutureCallback<>(); FutureCallback<Object, R> futureCallback = new FutureCallback<>();
send(command, futureCallback); send(command, futureCallback);
return (R) futureCallback.getResult(timeout, unit); return futureCallback.getResult(timeout, unit).getPayload();
} }


@Override @Override
public <R> CompletableFuture<R> send(Object command) { public <R> CompletableFuture<R> send(Object command) {
FutureCallback<Object, R> callback = new FutureCallback<>(); FutureCallback<Object, R> callback = new FutureCallback<>();
send(command, new FailureLoggingCallback<>(logger, callback)); send(command, new FailureLoggingCallback<>(logger, callback));
return callback; return callback.thenApply(Message::getPayload);
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public void setUp() {
@Test @Test
public void testInterceptor() { public void testInterceptor() {
commandGateway.sendAndWait(asCommandMessage(new CreateMyAggregateCommand("id"))); commandGateway.sendAndWait(asCommandMessage(new CreateMyAggregateCommand("id")));
String result = commandGateway.sendAndWait(asCommandMessage(new UpdateMyAggregateStateCommand("id", "state"))); String result = commandGateway
.sendAndWait(asCommandMessage(new UpdateMyAggregateStateCommand("id", "state")));


ArgumentCaptor<EventMessage<?>> eventCaptor = ArgumentCaptor.forClass(EventMessage.class); ArgumentCaptor<EventMessage<?>> eventCaptor = ArgumentCaptor.forClass(EventMessage.class);


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testOnSuccess() throws InterruptedException {
assertTrue(t.isAlive()); assertTrue(t.isAlive());
testSubject.onSuccess(COMMAND_MESSAGE, COMMAND_RESPONSE_MESSAGE); testSubject.onSuccess(COMMAND_MESSAGE, COMMAND_RESPONSE_MESSAGE);
t.join(THREAD_JOIN_TIMEOUT); t.join(THREAD_JOIN_TIMEOUT);
assertEquals("Hello world", resultFromParallelThread); assertEquals(COMMAND_RESPONSE_MESSAGE, resultFromParallelThread);
} }


@SuppressWarnings({"ThrowableInstanceNeverThrown"}) @SuppressWarnings({"ThrowableInstanceNeverThrown"})
Expand Down Expand Up @@ -112,6 +112,6 @@ public void testOnSuccessForLimitedTime_InTime() throws InterruptedException {
testSubject.onSuccess(COMMAND_MESSAGE, COMMAND_RESPONSE_MESSAGE); testSubject.onSuccess(COMMAND_MESSAGE, COMMAND_RESPONSE_MESSAGE);
assertTrue(testSubject.isDone()); assertTrue(testSubject.isDone());
t.join(THREAD_JOIN_TIMEOUT); t.join(THREAD_JOIN_TIMEOUT);
assertEquals("Hello world", resultFromParallelThread); assertEquals(COMMAND_RESPONSE_MESSAGE, resultFromParallelThread);
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void defaultConfigurationWithEventSourcing() throws Exception {


FutureCallback<Object, Object> callback = new FutureCallback<>(); FutureCallback<Object, Object> callback = new FutureCallback<>();
config.commandBus().dispatch(GenericCommandMessage.asCommandMessage("test"), callback); config.commandBus().dispatch(GenericCommandMessage.asCommandMessage("test"), callback);
assertEquals("test", callback.get()); assertEquals("test", callback.get().getPayload());
assertNotNull(config.repository(StubAggregate.class)); assertNotNull(config.repository(StubAggregate.class));
assertEquals(2, config.getModules().size()); assertEquals(2, config.getModules().size());
assertExpectedModules(config, assertExpectedModules(config,
Expand Down Expand Up @@ -178,7 +178,7 @@ public void testJpaConfigurationWithInitialTransactionManagerJpaRepository() thr
config.start(); config.start();
FutureCallback<Object, Object> callback = new FutureCallback<>(); FutureCallback<Object, Object> callback = new FutureCallback<>();
config.commandBus().dispatch(GenericCommandMessage.asCommandMessage("test"), callback); config.commandBus().dispatch(GenericCommandMessage.asCommandMessage("test"), callback);
assertEquals("test", callback.get()); assertEquals("test", callback.get().getPayload());
assertNotNull(config.repository(StubAggregate.class)); assertNotNull(config.repository(StubAggregate.class));
assertEquals(2, config.getModules().size()); assertEquals(2, config.getModules().size());
assertExpectedModules(config, assertExpectedModules(config,
Expand All @@ -202,7 +202,7 @@ public void testJpaConfigurationWithInitialTransactionManagerJpaRepositoryFromCo
config.start(); config.start();
FutureCallback<Object, Object> callback = new FutureCallback<>(); FutureCallback<Object, Object> callback = new FutureCallback<>();
config.commandBus().dispatch(GenericCommandMessage.asCommandMessage("test"), callback); config.commandBus().dispatch(GenericCommandMessage.asCommandMessage("test"), callback);
assertEquals("test", callback.get()); assertEquals("test", callback.get().getPayload());
assertNotNull(config.repository(StubAggregate.class)); assertNotNull(config.repository(StubAggregate.class));
assertTrue(config.getModules() assertTrue(config.getModules()
.stream() .stream()
Expand Down Expand Up @@ -249,7 +249,7 @@ public void testJpaConfigurationWithJpaRepository() throws Exception {
config.start(); config.start();
FutureCallback<Object, Object> callback = new FutureCallback<>(); FutureCallback<Object, Object> callback = new FutureCallback<>();
config.commandBus().dispatch(GenericCommandMessage.asCommandMessage("test"), callback); config.commandBus().dispatch(GenericCommandMessage.asCommandMessage("test"), callback);
assertEquals("test", callback.get()); assertEquals("test", callback.get().getPayload());
assertNotNull(config.repository(StubAggregate.class)); assertNotNull(config.repository(StubAggregate.class));
assertEquals(2, config.getModules().size()); assertEquals(2, config.getModules().size());
assertExpectedModules(config, assertExpectedModules(config,
Expand All @@ -273,7 +273,7 @@ public void defaultConfigurationWithMonitors() throws Exception {


FutureCallback<Object, Object> callback = new FutureCallback<>(); FutureCallback<Object, Object> callback = new FutureCallback<>();
config.commandBus().dispatch(GenericCommandMessage.asCommandMessage("test"), callback); config.commandBus().dispatch(GenericCommandMessage.asCommandMessage("test"), callback);
assertEquals("test", callback.get()); assertEquals("test", callback.get().getPayload());
assertEquals(1, defaultMonitor.getMessages().size()); assertEquals(1, defaultMonitor.getMessages().size());
assertEquals(1, commandBusMonitor.getMessages().size()); assertEquals(1, commandBusMonitor.getMessages().size());
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ public void testConnectAndDispatchMessages_Balanced() throws Exception {
} }
callbacks.add(callback); callbacks.add(callback);
} }
for (FutureCallback callback : callbacks) { for (FutureCallback<?, ?> callback : callbacks) {
assertEquals("The Reply!", callback.get()); assertEquals("The Reply!", callback.get().getPayload());
} }
assertEquals(100, counter1.get() + counter2.get()); assertEquals(100, counter1.get() + counter2.get());
System.out.println("Node 1 got " + counter1.get()); System.out.println("Node 1 got " + counter1.get());
Expand Down Expand Up @@ -200,10 +200,10 @@ public void testRingsProperlySynchronized_ChannelAlreadyConnected() throws Excep
FutureCallback<Object, Object> callback4 = new FutureCallback<>(); FutureCallback<Object, Object> callback4 = new FutureCallback<>();
dcb2.dispatch(new GenericCommandMessage<>(1L), callback4); dcb2.dispatch(new GenericCommandMessage<>(1L), callback4);


assertEquals("The Reply!", callback1.get()); assertEquals("The Reply!", callback1.get().getPayload());
assertEquals("The Reply!", callback2.get()); assertEquals("The Reply!", callback2.get().getPayload());
assertEquals("The Reply!", callback3.get()); assertEquals("The Reply!", callback3.get().getPayload());
assertEquals("The Reply!", callback4.get()); assertEquals("The Reply!", callback4.get().getPayload());


assertEquals(connector1.getConsistentHash(), connector2.getConsistentHash()); assertEquals(connector1.getConsistentHash(), connector2.getConsistentHash());
} }
Expand Down Expand Up @@ -311,8 +311,8 @@ public void testConnectAndDispatchMessages_SingleCandidate() throws Exception {
} }
callbacks.add(callback); callbacks.add(callback);
} }
for (FutureCallback callback : callbacks) { for (FutureCallback<?, ?> callback : callbacks) {
assertEquals("The Reply!", callback.get()); assertEquals("The Reply!", callback.get().getPayload());
} }
assertEquals(100, counter1.get() + counter2.get()); assertEquals(100, counter1.get() + counter2.get());
System.out.println("Node 1 got " + counter1.get()); System.out.println("Node 1 got " + counter1.get());
Expand Down Expand Up @@ -357,7 +357,7 @@ public void testUnserializableResponseConvertedToNull() throws Exception {


callback = new FutureCallback<>(); callback = new FutureCallback<>();
dcb1.dispatch(new GenericCommandMessage<>("string"), callback); dcb1.dispatch(new GenericCommandMessage<>("string"), callback);
assertNull(callback.getResult()); assertNull(callback.getResult().getPayload());
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Expand Down Expand Up @@ -395,8 +395,8 @@ public void testConnectAndDispatchMessages_CustomCommandName() throws Exception
} }
callbacks.add(callback); callbacks.add(callback);
} }
for (FutureCallback callback : callbacks) { for (FutureCallback<?, ?> callback : callbacks) {
assertEquals("The Reply!", callback.get()); assertEquals("The Reply!", callback.get().getPayload());
} }
assertEquals(100, counter1.get() + counter2.get()); assertEquals(100, counter1.get() + counter2.get());
System.out.println("Node 1 got " + counter1.get()); System.out.println("Node 1 got " + counter1.get());
Expand Down

0 comments on commit 65836eb

Please sign in to comment.