Skip to content

Commit

Permalink
AXON-11603 Rewrite distributed commandbus to have a separate service …
Browse files Browse the repository at this point in the history
…registry, Added websockets implementation
  • Loading branch information
KoenLavooijTrifork committed Jun 17, 2016
1 parent f8489b8 commit 0ee3e51
Show file tree
Hide file tree
Showing 45 changed files with 1,934 additions and 1,583 deletions.
28 changes: 27 additions & 1 deletion distributed-commandbus/pom.xml
Expand Up @@ -15,7 +15,8 @@
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>axon</artifactId>
<groupId>org.axonframework</groupId>
Expand Down Expand Up @@ -76,6 +77,21 @@
<version>3.4.2.Final</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-jdk-http</artifactId>
<version>2.22.2</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
<version>2.22.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
Expand All @@ -102,6 +118,16 @@
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-websocket</artifactId>
<version>8.0.23</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
Expand Down
Expand Up @@ -16,37 +16,40 @@

package org.axonframework.commandhandling.distributed;

import org.axonframework.commandhandling.AnnotationCommandTargetResolver;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
import org.axonframework.commandhandling.annotation.TargetAggregateIdentifier;
import org.axonframework.common.AxonConfigurationException;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.axonframework.common.ReflectionUtils.*;

/**
* RoutingStrategy that expects an {@link TargetAggregateIdentifier}
* RoutingStrategy that expects an {@link org.axonframework.commandhandling.annotation.TargetAggregateIdentifier}
* annotation on the command message's payload. Commands are routed based on the identifier of the aggregate that they
* target. This approach ensures that commands to be processed by the same aggregate are dispatched to the same node in
* a DistributedCommandBus. See {@link AnnotationCommandTargetResolver}
* a DistributedCommandBus. See {@link org.axonframework.commandhandling.annotation.AnnotationCommandTargetResolver}
* for more details.
* <p/>
* This class requires the returned Aggregate Identifiers to implement a proper {@link Object#toString()} method. An
* inconsistent toString() method may result in different members using different routing keys for the same identifier.
*
* @author Allard Buijze
* @see AnnotationCommandTargetResolver
* @see org.axonframework.commandhandling.annotation.AnnotationCommandTargetResolver
* @see DistributedCommandBus
* @since 2.0
*/
public class AnnotationRoutingStrategy extends AbstractRoutingStrategy {

private final Class<? extends Annotation> annotationType;

private final Map<Class<?>, AggregateIdentifierResolver> resolverMap = new ConcurrentHashMap<>();

/**
* Initializes a Routing Strategy that fails when an incoming command does not define an AggregateIdentifier to
* base the routing key on.
Expand Down Expand Up @@ -108,18 +111,47 @@ protected String doResolveRoutingKey(CommandMessage<?> command) {
}

@SuppressWarnings("unchecked")
private <I> I findIdentifier(CommandMessage<?> command) throws InvocationTargetException, IllegalAccessException {
for (Method m : methodsOf(command.getPayloadType())) {
private String findIdentifier(CommandMessage<?> command) throws InvocationTargetException, IllegalAccessException {
return resolverMap.computeIfAbsent(command.getPayloadType(), this::createResolver).identify(command);
}

private AggregateIdentifierResolver createResolver(Class<?> type) {
for (Method m : methodsOf(type)) {
if (m.isAnnotationPresent(annotationType)) {
ensureAccessible(m);
return (I) m.invoke(command.getPayload());
return new AggregateIdentifierResolver(m);
}
}
for (Field f : fieldsOf(command.getPayloadType())) {
for (Field f : fieldsOf(type)) {
if (f.isAnnotationPresent(annotationType)) {
return (I) getFieldValue(f, command.getPayload());
return new AggregateIdentifierResolver(f);
}
}
return NO_RESOLVE;
}

private static final AggregateIdentifierResolver NO_RESOLVE = new AggregateIdentifierResolver((Method) null);
private static final class AggregateIdentifierResolver {
private final Method method;
private final Field field;

public AggregateIdentifierResolver(Method method) {
this.method = method;
this.field = null;
}

public AggregateIdentifierResolver(Field field) {
this.method = null;
this.field = field;
}

public String identify(Object command) throws InvocationTargetException, IllegalAccessException {
if (method != null) {
return (String) method.invoke(command);
} else if (field != null) {
return (String) field.get(command);
}
return null;
}
return null;
}
}
Expand Up @@ -16,18 +16,29 @@

package org.axonframework.commandhandling.distributed;

import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageHandler;

import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Interface describing the component that remotely connects multiple CommandBus instances.
*
* @author Allard Buijze
* @since 2.0
*/
public interface CommandBusConnector {
public interface CommandBusConnector<T> {
/**
* Get the local endpoint for this connector
* @return The local endpoint identifier
*/
T getLocalEndpoint();

/**
* Sends the given <code>command</code> to the node assigned to handle messages with the given
Expand All @@ -40,12 +51,11 @@ public interface CommandBusConnector {
* will result in the command being sent to the same member. Each message must be sent to <em>exactly one
* member</em>.
*
* @param routingKey The key describing the routing requirements of this command. Generally, commands with the same
* routingKey will be sent to the same destination.
* @param command The command to send to the (remote) member
* @param destination The member of the network to send the message to
* @param command The command to send to the (remote) member
* @throws Exception when an error occurs before or during the sending of the message
*/
<C> void send(String routingKey, CommandMessage<C> command) throws Exception;
<C> void send(T destination, CommandMessage<? extends C> command) throws Exception;

/**
* Sends the given <code>command</code> to the node assigned to handle messages with the given
Expand All @@ -64,28 +74,25 @@ public interface CommandBusConnector {
* Connectors route the commands based on the given <code>routingKey</code>. Using the same <code>routingKey</code>
* will result in the command being sent to the same member.
*
* @param routingKey The key describing the routing requirements of this command. Generally, commands with the same
* routingKey will be sent to the same destination.
* @param command The command to send to the (remote) member
* @param callback The callback on which result notifications are sent
* @param <R> The type of object expected as return value in the callback
* @param destination The member of the network to send the message to
* @param command The command to send to the (remote) member
* @param callback The callback
* @param <C> The type of object expected as command
* @param <R> The type of object expected as result of the command
* @throws Exception when an error occurs before or during the sending of the message
*/
<C, R> void send(String routingKey, CommandMessage<C> command, CommandCallback<? super C, R> callback)
throws Exception;
<C, R> void send(T destination, CommandMessage<C> command, CommandCallback<? super C, R> callback);

/**
* Subscribe the given <code>handler</code> to commands of type <code>commandType</code> to the local segment of
* the
* command bus.
* <p/>
* If a subscription already exists for the given type, the behavior is undefined. Implementations may throw an
* Exception to refuse duplicate subscription or alternatively decide whether the existing or new
* <code>handler</code> gets the subscription.
*
* @param commandName The name of the command to subscribe the handler to
* @param handler The handler instance that handles the given type of command
* @return a handle to unsubscribe the <code>handler</code>. When unsubscribed it will no longer receive commands.
* Triggered when a new set of members is identified. CommandBusConnector implementations can use this to purge
* callbacks of pending commands on remote hosts
* @param newMembers The new set of members
*/
void updateMembers(Set<T> newMembers);

/**
* Get the load factor of this member in the network
* @return The load factor
*/
Registration subscribe(String commandName, MessageHandler<? super CommandMessage<?>> handler);
int getLoadFactor();
}
@@ -0,0 +1,25 @@
package org.axonframework.commandhandling.distributed;

import org.axonframework.common.AxonTransientException;

/**
* Exception thrown when the CommandBusConnector has a communication failure
*/
public class CommandBusConnectorCommunicationException extends AxonTransientException {
/**
* Initializes the CommandBusConnectorCommunicationException
* @param message The message of the exception
*/
public CommandBusConnectorCommunicationException(String message) {
super(message);
}

/**
* Initializes the CommandBusConnectorCommunicationException
* @param message The message of the exception
* @param cause The cause of this exception
*/
public CommandBusConnectorCommunicationException(String message, Throwable cause) {
super(message, cause);
}
}
@@ -0,0 +1,41 @@
package org.axonframework.commandhandling.distributed;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author koen
* on 11-5-16.
*/
public class CommandCallbackRepository<A> {
final Map<String, CommandCallbackWrapper> callbacks = new ConcurrentHashMap<>();

public void cancelCallbacks(A channelId) {
Iterator<CommandCallbackWrapper> callbacks = this.callbacks.values().iterator();
while (callbacks.hasNext()) {
CommandCallbackWrapper wrapper = callbacks.next();
if (wrapper.getChannelIdentifier().equals(channelId)) {
wrapper.fail(new CommandBusConnectorCommunicationException(String.format(
"Connection error while waiting for a response on command %s",
wrapper.getMessage().getCommandName())));
callbacks.remove();
}
}
}

@SuppressWarnings("unchecked")
public <A, C, R> CommandCallbackWrapper<A, C, R> fetchAndRemove(String callbackId) {
return callbacks.remove(callbackId);
}

public <A, C, R> void store(String callbackId, CommandCallbackWrapper<A, C, R> commandCallbackWrapper) {
CommandCallbackWrapper previous;
if ((previous = callbacks.put(callbackId, commandCallbackWrapper)) != null) {
//a previous callback with the same command ID was already found, we will cancel the callback as the command
//is likely to be retried, so the previous one likely failed
previous.fail(new CommandBusConnectorCommunicationException(
"Command-callback cancelled, a new command with the same ID is entered into the command bus"));
}
}
}
@@ -0,0 +1,47 @@
package org.axonframework.commandhandling.distributed;

import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;

/**
* @author koen
* on 11-5-16.
*/

public class CommandCallbackWrapper<A, C, R> implements CommandCallback<C, R> {
private final CommandCallback<? super C, R> wrapped;
private final A sessionId;
private final CommandMessage<C> message;

public CommandCallbackWrapper(A sessionId, CommandMessage<C> message, CommandCallback<? super C, R> callback) {
this.wrapped = callback;
this.sessionId = sessionId;
this.message = message;
}

public CommandMessage<C> getMessage() {
return message;
}

public A getChannelIdentifier() {
return sessionId;
}

public void fail(Throwable e) {
onFailure(getMessage(), e);
}

public void success(R returnValue) {
onSuccess(getMessage(), returnValue);
}

@Override
public void onSuccess(CommandMessage<? extends C> message, R result) {
wrapped.onSuccess(message, result);
}

@Override
public void onFailure(CommandMessage<? extends C> message, Throwable cause) {
wrapped.onFailure(message, cause);
}
}

0 comments on commit 0ee3e51

Please sign in to comment.