Skip to content

Commit

Permalink
Merge branch 'master' into axon-3.x
Browse files Browse the repository at this point in the history
Conflicts:
	amqp/pom.xml
	core/pom.xml
	core/src/main/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBus.java
	core/src/main/java/org/axonframework/domain/EventContainer.java
	core/src/main/java/org/axonframework/eventsourcing/AbstractEventSourcedAggregateRoot.java
	core/src/main/java/org/axonframework/saga/repository/jdbc/GenericSagaSqlSchema.java
	core/src/main/java/org/axonframework/serializer/ChainedConverter.java
	core/src/test/java/org/axonframework/commandhandling/annotation/AggregateAnnotationCommandHandlerTest.java
	core/src/test/java/org/axonframework/serializer/ChainedConverterTest.java
	distributed-commandbus/pom.xml
	distributed-commandbus/src/main/java/org/axonframework/commandhandling/distributed/jgroups/JGroupsConnector.java
	distributed-commandbus/src/test/java/org/axonframework/commandhandling/distributed/jgroups/JGroupsConnectorFactoryBeanTest.java
	documentation/pom.xml
	incubator/cassandra/pom.xml
	incubator/google-app-engine/pom.xml
	incubator/redis/pom.xml
	integration/pom.xml
	integrationtests/pom.xml
	mongo/pom.xml
	monitoring-jmx/pom.xml
	pom.xml
	quickstart/pom.xml
	spring/pom.xml
	test/pom.xml
  • Loading branch information
abuijze committed Mar 18, 2015
2 parents 6e0d460 + 8477373 commit 9405489
Show file tree
Hide file tree
Showing 50 changed files with 2,173 additions and 268 deletions.
3 changes: 1 addition & 2 deletions core/pom.xml
Expand Up @@ -15,8 +15,7 @@
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.axonframework</groupId>
<artifactId>axon</artifactId>
Expand Down
Expand Up @@ -41,6 +41,7 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import static org.axonframework.common.ReflectionUtils.fieldsOf;

Expand Down Expand Up @@ -104,19 +105,13 @@ private void processNestedEntityCommandHandlers(Class<?> targetType,
+ "the field is not assignable to java.util.Collection.",
targetType.getSimpleName(), field.getName()));
}
Class<?> entityType = annotation.entityType();
if (AbstractAnnotatedEntity.class.equals(entityType)) {
final Type genericType = field.getGenericType();
if (genericType == null
|| !(genericType instanceof ParameterizedType)
|| ((ParameterizedType) genericType).getActualTypeArguments().length == 0) {
throw new AxonConfigurationException(String.format(
"Field %s.%s is annotated with @CommandHandlingMemberCollection, but the entity"
+ " type is not indicated on the annotation, "
+ "nor can it be deducted from the generic parameters",
targetType.getSimpleName(), field.getName()));
}
entityType = (Class<?>) ((ParameterizedType) genericType).getActualTypeArguments()[0];
Class<?> entityType = determineEntityType(annotation.entityType(), field, 0);
if(entityType == null) {
throw new AxonConfigurationException(String.format(
"Field %s.%s is annotated with @CommandHandlingMemberCollection, but the entity"
+ " type is not indicated on the annotation, "
+ "nor can it be deduced from the generic parameters",
targetType.getSimpleName(), field.getName()));
}
if (logger.isDebugEnabled()) {
logger.debug("Field {}.{} is annotated with @CommandHandlingMemberCollection. "
Expand All @@ -125,8 +120,30 @@ private void processNestedEntityCommandHandlers(Class<?> targetType,
);
}
newEntityAccessor = new EntityCollectionFieldAccessor(entityType, annotation, entityAccessor, field);
} else if (field.isAnnotationPresent(CommandHandlingMemberMap.class)) {
CommandHandlingMemberMap annotation = field.getAnnotation(CommandHandlingMemberMap.class);
if (!Map.class.isAssignableFrom(field.getType())) {
throw new AxonConfigurationException(String.format(
"Field %s.%s is annotated with @CommandHandlingMemberMap, but the declared type of "
+ "the field is not assignable to java.util.Map.",
targetType.getSimpleName(), field.getName()));
}
Class<?> entityType = determineEntityType(annotation.entityType(), field, 1);
if(entityType == null) {
throw new AxonConfigurationException(String.format(
"Field %s.%s is annotated with @CommandHandlingMemberMap, but the entity"
+ " type is not indicated on the annotation, "
+ "nor can it be deduced from the generic parameters",
targetType.getSimpleName(), field.getName()));
}
if (logger.isDebugEnabled()) {
logger.debug("Field {}.{} is annotated with @CommandHandlingMemberMap. "
+ "Checking {} for Command Handlers",
targetType.getSimpleName(), field.getName(), entityType.getSimpleName()
);
}
newEntityAccessor = new EntityMapFieldAccessor(entityType, annotation, entityAccessor, field);
}

if (newEntityAccessor != null) {
MethodMessageHandlerInspector fieldInspector = MethodMessageHandlerInspector
.getInstance(newEntityAccessor.entityType(),
Expand All @@ -148,6 +165,19 @@ private void processNestedEntityCommandHandlers(Class<?> targetType,
}
}

private Class<?> determineEntityType(Class<?> entityType, Field field, int genericTypeIndex) {
if (AbstractAnnotatedEntity.class.equals(entityType)) {
final Type genericType = field.getGenericType();
if (genericType == null
|| !(genericType instanceof ParameterizedType)
|| ((ParameterizedType) genericType).getActualTypeArguments().length == 0) {
return null;
}
entityType = (Class<?>) ((ParameterizedType) genericType).getActualTypeArguments()[genericTypeIndex];
}
return entityType;
}

/**
* Returns a list of constructor handlers on the given aggregate type.
*
Expand Down Expand Up @@ -186,7 +216,7 @@ public EntityForwardingMethodMessageHandler(EntityAccessor entityAccessor, Abstr

@Override
public Object invoke(Object target, Message message) throws InvocationTargetException, IllegalAccessException {
Object entity = entityAccessor.getInstance(target, (CommandMessage) message);
Object entity = entityAccessor.getInstance(target, (CommandMessage<?>) message);
if (entity == null) {
throw new IllegalStateException("No appropriate entity available in the aggregate. "
+ "The command cannot be handled.");
Expand Down Expand Up @@ -242,21 +272,20 @@ public Class<?> entityType() {
}
}

private class EntityCollectionFieldAccessor implements EntityAccessor {
private abstract class MultipleEntityFieldAccessor<T> implements EntityAccessor {

private final Class<?> entityType;
private final CommandHandlingMemberCollection annotation;
private final EntityAccessor entityAccessor;
private final Field field;
private final Property<Object> entityProperty;
private String commandTargetProperty;


@SuppressWarnings("unchecked")
public EntityCollectionFieldAccessor(Class entityType, CommandHandlingMemberCollection annotation,
EntityAccessor entityAccessor, Field field) {
this.entityProperty = PropertyAccessStrategy.getProperty(entityType, annotation.entityId());
public MultipleEntityFieldAccessor(Class entityType, String commandTargetProperty,
EntityAccessor entityAccessor, Field field) {
this.entityType = entityType;
this.annotation = annotation;
this.entityAccessor = entityAccessor;
this.commandTargetProperty = commandTargetProperty;
this.field = field;
}

Expand All @@ -267,9 +296,8 @@ public Object getInstance(Object aggregateRoot, CommandMessage<?> command) throw
if (parentEntity == null) {
return null;
}
Collection<?> entityCollection = (Collection<?>) ReflectionUtils.getFieldValue(field, parentEntity);
Property<Object> commandProperty = PropertyAccessStrategy.getProperty(command.getPayloadType(),
annotation.commandTargetProperty());
T entityCollection = (T) ReflectionUtils.getFieldValue(field, parentEntity);
Property<Object> commandProperty = PropertyAccessStrategy.getProperty(command.getPayloadType(), commandTargetProperty);

if (commandProperty == null) {
// TODO: Log failure. It seems weird that the property is not present
Expand All @@ -279,18 +307,49 @@ public Object getInstance(Object aggregateRoot, CommandMessage<?> command) throw
if (commandId == null) {
return null;
}
for (Object entity : entityCollection) {
Object entityId = entityProperty.getValue(entity);
if (entityId != null && entityId.equals(commandId)) {
return entity;
}
}
return null;
return getEntity(entityCollection, commandId);
}

protected abstract Object getEntity(T entities, Object commandId);

@Override
public Class<?> entityType() {
return entityType;
}
}

private class EntityCollectionFieldAccessor extends MultipleEntityFieldAccessor<Collection<?>> {
private final Property<Object> entityProperty;

@SuppressWarnings("unchecked")
public EntityCollectionFieldAccessor(Class entityType, CommandHandlingMemberCollection annotation,
EntityAccessor entityAccessor, Field field) {
super(entityType, annotation.commandTargetProperty(), entityAccessor, field);
this.entityProperty = PropertyAccessStrategy.getProperty(entityType, annotation.entityId());
}

protected Object getEntity(Collection<?> entities, Object commandId) {
for (Object entity : entities) {
Object entityId = entityProperty.getValue(entity);
if (entityId != null && entityId.equals(commandId)) {
return entity;
}
}
return null;
}

}

private class EntityMapFieldAccessor extends MultipleEntityFieldAccessor<Map<?,?>> {

public EntityMapFieldAccessor(Class entityType, CommandHandlingMemberMap annotation,
EntityAccessor entityAccessor, Field field) {
super(entityType, annotation.commandTargetProperty(), entityAccessor, field);
}

@Override
protected Object getEntity(Map<?,?> entities, Object commandId) {
return entities.get(commandId);
}
}
}
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2010-2014. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.commandhandling.annotation;

import org.axonframework.eventsourcing.annotation.AbstractAnnotatedEntity;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Marker annotation for fields that contain a {@link java.util.Map} of Entities capable of handling Commands on behalf
* of the aggregate. When a field is annotated with <code>@CommandHandlerMemberMap</code>, it is a hint towards Command
* Handler discovery mechanisms that the entity should also be inspected for {@link
* org.axonframework.commandhandling.annotation.CommandHandler} annotated methods.
* <p/>
* Note that CommandHandler detection is done using static typing. This means that only the declared type of the field
* can be inspected. If a subclass of that type is assigned to the field, any handlers declared on that subclass will
* be ignored.
*
* @author Jeroen Bruinink
* @since 2.4
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface CommandHandlingMemberMap {

/**
* The name of the property on the incoming command's payload that identifies the intended target of the command.
* This identity should correspond to the keys in the annotated map.
* <p/>
* The name of this method must correspond with the getter method using the JavaBean specification (property 'id'
* is accessed using method 'getId()'), or any other specification supported by a configured
* {@link org.axonframework.common.property.PropertyAccessStrategy}.
*/
String commandTargetProperty();

/**
* The type of entity contained in the annotated map. By default, Axon attempts to identify the type by the
* generic parameters on the field declaration.
*/
Class<? extends AbstractAnnotatedEntity> entityType() default AbstractAnnotatedEntity.class;
}
Expand Up @@ -26,6 +26,7 @@
import org.axonframework.commandhandling.CommandHandlerInterceptor;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandTargetResolver;
import org.axonframework.commandhandling.NoHandlerForCommandException;
import org.axonframework.commandhandling.interceptors.SerializationOptimizingInterceptor;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonThreadFactory;
Expand All @@ -49,6 +50,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static java.lang.String.format;

/**
* Asynchronous CommandBus implementation with very high performance characteristics. It divides the command handling
* process in two steps, which can be executed in different threads. The CommandBus is backed by a {@link Disruptor},
Expand Down Expand Up @@ -118,13 +121,13 @@ public class DisruptorCommandBus implements CommandBus {
private final List<CommandHandlerInterceptor> publisherInterceptors;
private final ExecutorService executorService;
private final boolean rescheduleOnCorruptState;
private volatile boolean started = true;
private volatile boolean disruptorShutDown = false;
private final long coolingDownPeriod;
private final CommandTargetResolver commandTargetResolver;
private final int publisherCount;
private final int serializerCount;
private final CommandCallback<Object> failureLoggingCallback = new FailureLoggingCommandCallback();
private volatile boolean started = true;
private volatile boolean disruptorShutDown = false;

/**
* Initialize the DisruptorCommandBus with given resources, using default configuration settings. Uses a Blocking
Expand Down Expand Up @@ -254,6 +257,12 @@ public <R> void dispatch(CommandMessage<?> command, CommandCallback<R> callback)
*/
public <R> void doDispatch(CommandMessage command, CommandCallback<R> callback) {
Assert.state(!disruptorShutDown, "Disruptor has been shut down. Cannot dispatch or re-dispatch commands");
final CommandHandler<?> commandHandler = commandHandlers.get(command.getCommandName());
if (commandHandler == null) {
throw new NoHandlerForCommandException(format("No handler was subscribed to command [%s]",
command.getCommandName()));
}

RingBuffer<CommandHandlingEntry> ringBuffer = disruptor.getRingBuffer();
int invokerSegment = 0;
int publisherSegment = 0;
Expand All @@ -274,13 +283,17 @@ public <R> void doDispatch(CommandMessage command, CommandCallback<R> callback)
}
}
long sequence = ringBuffer.next();
CommandHandlingEntry event = ringBuffer.get(sequence);
event.reset(command, commandHandlers.get(command.getCommandName()), invokerSegment, publisherSegment,
serializerSegment, new BlacklistDetectingCallback<>(callback, command, disruptor.getRingBuffer(),
this, rescheduleOnCorruptState),
invokerInterceptors, publisherInterceptors
);
ringBuffer.publish(sequence);
try {
CommandHandlingEntry event = ringBuffer.get(sequence);
event.reset(command, commandHandler, invokerSegment, publisherSegment,
serializerSegment, new BlacklistDetectingCallback<>(callback, command,
disruptor.getRingBuffer(),
this, rescheduleOnCorruptState),
invokerInterceptors, publisherInterceptors
);
} finally {
ringBuffer.publish(sequence);
}
}

/**
Expand Down Expand Up @@ -368,27 +381,6 @@ public void onFailure(Throwable cause) {
}
}

private class ExceptionHandler implements com.lmax.disruptor.ExceptionHandler {

@Override
public void handleEventException(Throwable ex, long sequence, Object event) {
logger.error("Exception occurred while processing a {}.",
((CommandHandlingEntry) event).getCommand().getPayloadType().getSimpleName(),
ex);
}

@Override
public void handleOnStartException(Throwable ex) {
logger.error("Failed to start the DisruptorCommandBus.", ex);
disruptor.shutdown();
}

@Override
public void handleOnShutdownException(Throwable ex) {
logger.error("Error while shutting down the DisruptorCommandBus", ex);
}
}

private static class DisruptorRepository<T extends EventSourcedAggregateRoot> implements Repository<T> {

private final String typeIdentifier;
Expand Down Expand Up @@ -431,4 +423,25 @@ public DomainEventStream decorateForAppend(String aggregateType, EventSourcedAgg
return eventStream;
}
}

private class ExceptionHandler implements com.lmax.disruptor.ExceptionHandler {

@Override
public void handleEventException(Throwable ex, long sequence, Object event) {
logger.error("Exception occurred while processing a {}.",
((CommandHandlingEntry) event).getCommand().getPayloadType().getSimpleName(),
ex);
}

@Override
public void handleOnStartException(Throwable ex) {
logger.error("Failed to start the DisruptorCommandBus.", ex);
disruptor.shutdown();
}

@Override
public void handleOnShutdownException(Throwable ex) {
logger.error("Error while shutting down the DisruptorCommandBus", ex);
}
}
}
Expand Up @@ -193,7 +193,6 @@ private Throwable performCommit(DisruptorUnitOfWork unitOfWork, EventSourcedAggr
unitOfWork.onAfterCommit();
}
} catch (Exception e) {
e.printStackTrace();
if (transaction != null) {
transactionManager.rollbackTransaction(transaction);
}
Expand Down

0 comments on commit 9405489

Please sign in to comment.