Skip to content

Commit

Permalink
Added CommandMessage as parameter to CommandCallback methods
Browse files Browse the repository at this point in the history
This includes a fix of the use of Generics in many cases.

Issue #AXON-303 Fixed
  • Loading branch information
abuijze committed Mar 19, 2015
1 parent 9405489 commit 9f5709a
Show file tree
Hide file tree
Showing 53 changed files with 677 additions and 738 deletions.
Expand Up @@ -59,7 +59,7 @@ public AsynchronousCommandBus(Executor executor) {
} }


@Override @Override
protected <R> void doDispatch(CommandMessage<?> command, CommandCallback<R> callback) { protected <C, R> void doDispatch(CommandMessage<C> command, CommandCallback<? super C, R> callback) {
executor.execute(new DispatchCommand<>(command, callback)); executor.execute(new DispatchCommand<>(command, callback));
} }


Expand All @@ -79,12 +79,12 @@ public void shutdown() {
} }
} }


private final class DispatchCommand<R> implements Runnable { private final class DispatchCommand<C, R> implements Runnable {


private final CommandMessage<?> command; private final CommandMessage<C> command;
private final CommandCallback<R> callback; private final CommandCallback<? super C, R> callback;


public DispatchCommand(CommandMessage<?> command, CommandCallback<R> callback) { public DispatchCommand(CommandMessage<C> command, CommandCallback<? super C, R> callback) {
this.command = command; this.command = command;
this.callback = callback; this.callback = callback;
} }
Expand Down
Expand Up @@ -16,6 +16,8 @@


package org.axonframework.commandhandling; package org.axonframework.commandhandling;


import org.axonframework.commandhandling.callbacks.LoggingCallback;

/** /**
* The mechanism that dispatches Command objects to their appropriate CommandHandler. CommandHandlers can subscribe and * The mechanism that dispatches Command objects to their appropriate CommandHandler. CommandHandlers can subscribe and
* unsubscribe to specific types of commands on the command bus. Only a single handler may be subscribed for a single * unsubscribe to specific types of commands on the command bus. Only a single handler may be subscribed for a single
Expand All @@ -36,7 +38,9 @@ public interface CommandBus {
* @throws NoHandlerForCommandException when no command handler is registered for the given <code>command</code>. * @throws NoHandlerForCommandException when no command handler is registered for the given <code>command</code>.
* @see GenericCommandMessage#asCommandMessage(Object) * @see GenericCommandMessage#asCommandMessage(Object)
*/ */
void dispatch(CommandMessage<?> command); default <C> void dispatch(CommandMessage<C> command) {
dispatch(command, LoggingCallback.INSTANCE);
}


/** /**
* Dispatch the given <code>command</code> to the CommandHandler subscribed to that type of <code>command</code>. * Dispatch the given <code>command</code> to the CommandHandler subscribed to that type of <code>command</code>.
Expand All @@ -55,7 +59,7 @@ public interface CommandBus {
* @throws NoHandlerForCommandException when no command handler is registered for the given <code>command</code>. * @throws NoHandlerForCommandException when no command handler is registered for the given <code>command</code>.
* @see GenericCommandMessage#asCommandMessage(Object) * @see GenericCommandMessage#asCommandMessage(Object)
*/ */
<R> void dispatch(CommandMessage<?> command, CommandCallback<R> callback); <C, R> void dispatch(CommandMessage<C> command, CommandCallback<? super C, R> callback);


/** /**
* Subscribe the given <code>handler</code> to commands of type <code>commandType</code>. * Subscribe the given <code>handler</code> to commands of type <code>commandType</code>.
Expand Down
Expand Up @@ -18,25 +18,28 @@


/** /**
* Interface describing a callback that is invoked when command handler execution has finished. Depending of the outcome * Interface describing a callback that is invoked when command handler execution has finished. Depending of the outcome
* of the execution, either the {@link #onSuccess(Object)} or the {@link #onFailure(Throwable)} is called. * of the execution, either the {@link #onSuccess(CommandMessage, Object)} or the {@link #onFailure(CommandMessage, Throwable)} is called.
* *
* @param <R> the type of result of the command handling * @param <R> the type of result of the command handling
* @param <C> the type of payload of the command
* @author Allard Buijze * @author Allard Buijze
* @since 0.6 * @since 0.6
*/ */
public interface CommandCallback<R> { public interface CommandCallback<C, R> {


/** /**
* Invoked when command handling execution was successful. * Invoked when command handling execution was successful.
* *
* @param commandMessage The message that was dispatched
* @param result The result of the command handling execution, if any. * @param result The result of the command handling execution, if any.
*/ */
void onSuccess(R result); void onSuccess(CommandMessage<? extends C> commandMessage, R result);


/** /**
* Invoked when command handling execution resulted in an error. * Invoked when command handling execution resulted in an error.
* *
* @param commandMessage The message that was dispatched
* @param cause The exception raised during command handling * @param cause The exception raised during command handling
*/ */
void onFailure(Throwable cause); void onFailure(CommandMessage<? extends C> commandMessage, Throwable cause);
} }
Expand Up @@ -33,5 +33,5 @@ public interface CommandDispatchInterceptor {
* @param commandMessage The command message intended to be dispatched on the Command Bus * @param commandMessage The command message intended to be dispatched on the Command Bus
* @return the command message to dispatch on the Command Bus * @return the command message to dispatch on the Command Bus
*/ */
CommandMessage<?> handle(CommandMessage<?> commandMessage); <C> CommandMessage<? extends C> handle(CommandMessage<C> commandMessage);
} }
Expand Up @@ -46,11 +46,12 @@ public class GenericCommandMessage<T> implements CommandMessage<T> {
* @return a CommandMessage containing given <code>command</code> as payload, or <code>command</code> if it already * @return a CommandMessage containing given <code>command</code> as payload, or <code>command</code> if it already
* implements CommandMessage. * implements CommandMessage.
*/ */
public static CommandMessage asCommandMessage(Object command) { @SuppressWarnings("unchecked")
public static <C> CommandMessage<C> asCommandMessage(Object command) {
if (CommandMessage.class.isInstance(command)) { if (CommandMessage.class.isInstance(command)) {
return (CommandMessage) command; return (CommandMessage<C>) command;
} }
return new GenericCommandMessage<>(command); return new GenericCommandMessage<>((C) command);
} }


/** /**
Expand Down
Expand Up @@ -20,7 +20,7 @@
* The RollbackConfiguration defines when a UnitOfWork should be rolled back when the command handler or any * The RollbackConfiguration defines when a UnitOfWork should be rolled back when the command handler or any
* interceptors * interceptors
* reported a failure. This decision will only impact the UnitOfWork (and potentially any transactions attached to it), * reported a failure. This decision will only impact the UnitOfWork (and potentially any transactions attached to it),
* the {@link org.axonframework.commandhandling.CommandCallback#onFailure(Throwable)} will always be called. * the {@link CommandCallback#onFailure(CommandMessage, Throwable)} will always be called.
* *
* @author Martin Tilma * @author Martin Tilma
* @since 1.1 * @since 1.1
Expand Down
Expand Up @@ -16,7 +16,6 @@


package org.axonframework.commandhandling; package org.axonframework.commandhandling;


import org.axonframework.commandhandling.callbacks.LoggingCallback;
import org.axonframework.unitofwork.DefaultUnitOfWorkFactory; import org.axonframework.unitofwork.DefaultUnitOfWorkFactory;
import org.axonframework.unitofwork.TransactionManager; import org.axonframework.unitofwork.TransactionManager;
import org.axonframework.unitofwork.UnitOfWork; import org.axonframework.unitofwork.UnitOfWork;
Expand Down Expand Up @@ -61,14 +60,8 @@ public class SimpleCommandBus implements CommandBus {
public SimpleCommandBus() { public SimpleCommandBus() {
} }


@SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
@Override @Override
public void dispatch(CommandMessage<?> command) { public <C, R> void dispatch(CommandMessage<C> command, final CommandCallback<? super C, R> callback) {
doDispatch(intercept(command), new LoggingCallback(command));
}

@Override
public <R> void dispatch(CommandMessage<?> command, final CommandCallback<R> callback) {
doDispatch(intercept(command), callback); doDispatch(intercept(command), callback);
} }


Expand All @@ -78,8 +71,8 @@ public <R> void dispatch(CommandMessage<?> command, final CommandCallback<R> cal
* @param command The original command being dispatched * @param command The original command being dispatched
* @return The command to actually dispatch * @return The command to actually dispatch
*/ */
protected CommandMessage<?> intercept(CommandMessage<?> command) { protected <C> CommandMessage<? extends C> intercept(CommandMessage<C> command) {
CommandMessage<?> commandToDispatch = command; CommandMessage<? extends C> commandToDispatch = command;
for (CommandDispatchInterceptor interceptor : dispatchInterceptors) { for (CommandDispatchInterceptor interceptor : dispatchInterceptors) {
commandToDispatch = interceptor.handle(commandToDispatch); commandToDispatch = interceptor.handle(commandToDispatch);
} }
Expand All @@ -94,13 +87,13 @@ protected CommandMessage<?> intercept(CommandMessage<?> command) {
* @param <R> The type of result expected from the command handler * @param <R> The type of result expected from the command handler
*/ */
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
protected <R> void doDispatch(CommandMessage<?> command, CommandCallback<R> callback) { protected <C, R> void doDispatch(CommandMessage<C> command, CommandCallback<? super C, R> callback) {
try { try {
CommandHandler handler = findCommandHandlerFor(command); CommandHandler handler = findCommandHandlerFor(command);
Object result = doDispatch(command, handler); Object result = doDispatch(command, handler);
callback.onSuccess((R) result); callback.onSuccess(command, (R) result);
} catch (Throwable throwable) { } catch (Throwable throwable) {
callback.onFailure(throwable); callback.onFailure(command, throwable);
} }
} }


Expand All @@ -113,7 +106,7 @@ private CommandHandler findCommandHandlerFor(CommandMessage<?> command) {
return handler; return handler;
} }


private Object doDispatch(CommandMessage<?> command, CommandHandler commandHandler) throws Throwable { private <C> Object doDispatch(CommandMessage<C> command, CommandHandler<? super C> commandHandler) throws Throwable {
logger.debug("Dispatching command [{}]", command.getCommandName()); logger.debug("Dispatching command [{}]", command.getCommandName());
UnitOfWork unitOfWork = unitOfWorkFactory.createUnitOfWork(); UnitOfWork unitOfWork = unitOfWorkFactory.createUnitOfWork();
InterceptorChain chain = new DefaultInterceptorChain(command, unitOfWork, commandHandler, handlerInterceptors); InterceptorChain chain = new DefaultInterceptorChain(command, unitOfWork, commandHandler, handlerInterceptors);
Expand Down

0 comments on commit 9f5709a

Please sign in to comment.