Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Moved classes involving messaging to appropriate package
Any classes in the eventhandling or commandhandling package were generic. They
have been moved to the messaging package instead. There is now also only one
HandlerDefinition implementation left: AnnotatedMessageHandlingMemberDefinition

Command and Event handler methods are (meta) annotated with @MessageHandler,
which now uniquely identifies a method/constructor as a message handler. The
messageType property of the annotation defines the type of message the method
expects.
  • Loading branch information
abuijze committed Aug 1, 2016
1 parent 0979659 commit b9556fb
Show file tree
Hide file tree
Showing 81 changed files with 793 additions and 341 deletions.
Expand Up @@ -16,11 +16,13 @@
import org.axonframework.commandhandling.model.Aggregate; import org.axonframework.commandhandling.model.Aggregate;
import org.axonframework.commandhandling.model.Repository; import org.axonframework.commandhandling.model.Repository;
import org.axonframework.commandhandling.model.inspection.AggregateModel; import org.axonframework.commandhandling.model.inspection.AggregateModel;
import org.axonframework.commandhandling.model.inspection.CommandMessageHandlingMember;
import org.axonframework.commandhandling.model.inspection.ModelInspector; import org.axonframework.commandhandling.model.inspection.ModelInspector;
import org.axonframework.common.Assert; import org.axonframework.common.Assert;
import org.axonframework.common.Registration; import org.axonframework.common.Registration;
import org.axonframework.common.annotation.ClasspathParameterResolverFactory; import org.axonframework.messaging.annotation.ClasspathParameterResolverFactory;
import org.axonframework.common.annotation.ParameterResolverFactory; import org.axonframework.messaging.annotation.MessageHandlingMember;
import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.messaging.MessageHandler; import org.axonframework.messaging.MessageHandler;


import java.util.*; import java.util.*;
Expand Down Expand Up @@ -130,7 +132,9 @@ private Map<String, MessageHandler<CommandMessage<?>>> initializeHandlers(Aggreg
Map<String, MessageHandler<CommandMessage<?>>> handlersFound = new HashMap<>(); Map<String, MessageHandler<CommandMessage<?>>> handlersFound = new HashMap<>();
AggregateCommandHandler aggregateCommandHandler = new AggregateCommandHandler(); AggregateCommandHandler aggregateCommandHandler = new AggregateCommandHandler();
aggregateModel.commandHandlers().forEach((k, v) -> { aggregateModel.commandHandlers().forEach((k, v) -> {
if (v.isFactoryHandler()) { if (v.unwrap(CommandMessageHandlingMember.class)
.map(CommandMessageHandlingMember::isFactoryHandler)
.orElse(false)) {
handlersFound.put(k, new AggregateConstructorCommandHandler(v)); handlersFound.put(k, new AggregateConstructorCommandHandler(v));
} else { } else {
handlersFound.put(k, aggregateCommandHandler); handlersFound.put(k, aggregateCommandHandler);
Expand Down Expand Up @@ -165,9 +169,9 @@ public Set<String> supportedCommandNames() {


private class AggregateConstructorCommandHandler implements MessageHandler<CommandMessage<?>> { private class AggregateConstructorCommandHandler implements MessageHandler<CommandMessage<?>> {


private final org.axonframework.common.annotation.MessageHandler<?> handler; private final MessageHandlingMember<?> handler;


public AggregateConstructorCommandHandler(org.axonframework.common.annotation.MessageHandler<?> handler) { public AggregateConstructorCommandHandler(MessageHandlingMember<?> handler) {
this.handler = handler; this.handler = handler;
} }


Expand Down
Expand Up @@ -17,8 +17,8 @@
import org.axonframework.commandhandling.model.inspection.ModelInspector; import org.axonframework.commandhandling.model.inspection.ModelInspector;
import org.axonframework.common.Assert; import org.axonframework.common.Assert;
import org.axonframework.common.Registration; import org.axonframework.common.Registration;
import org.axonframework.common.annotation.ClasspathParameterResolverFactory; import org.axonframework.messaging.annotation.ClasspathParameterResolverFactory;
import org.axonframework.common.annotation.ParameterResolverFactory; import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.messaging.MessageHandler; import org.axonframework.messaging.MessageHandler;


import java.util.ArrayDeque; import java.util.ArrayDeque;
Expand Down
Expand Up @@ -16,6 +16,7 @@


package org.axonframework.commandhandling; package org.axonframework.commandhandling;


import org.axonframework.messaging.annotation.MessageHandler;
import org.axonframework.messaging.unitofwork.UnitOfWork; import org.axonframework.messaging.unitofwork.UnitOfWork;


import java.lang.annotation.*; import java.lang.annotation.*;
Expand All @@ -42,6 +43,7 @@
@Documented @Documented
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.ANNOTATION_TYPE}) @Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.ANNOTATION_TYPE})
@MessageHandler(messageType = CommandMessage.class)
public @interface CommandHandler { public @interface CommandHandler {


/** /**
Expand All @@ -56,4 +58,10 @@
* value to the correct instance. * value to the correct instance.
*/ */
String routingKey() default ""; String routingKey() default "";

/**
* The type of payload expected by this handler. Defaults to the expected types expresses by (primarily the first)
* parameters of the annotated Method or Constructor.
*/
Class<?> payloadType() default Object.class;
} }
Expand Up @@ -14,8 +14,8 @@
package org.axonframework.commandhandling; package org.axonframework.commandhandling;


import org.axonframework.common.Priority; import org.axonframework.common.Priority;
import org.axonframework.common.annotation.ParameterResolver; import org.axonframework.messaging.annotation.ParameterResolver;
import org.axonframework.common.annotation.ParameterResolverFactory; import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.messaging.Message; import org.axonframework.messaging.Message;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork; import org.axonframework.messaging.unitofwork.UnitOfWork;
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.axonframework.common.Assert; import org.axonframework.common.Assert;
import org.axonframework.common.CollectionUtils; import org.axonframework.common.CollectionUtils;
import org.axonframework.common.ReflectionUtils; import org.axonframework.common.ReflectionUtils;
import org.axonframework.common.annotation.MetaData; import org.axonframework.messaging.annotation.MetaData;
import org.axonframework.messaging.MessageDispatchInterceptor; import org.axonframework.messaging.MessageDispatchInterceptor;


import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
Expand Down
Expand Up @@ -19,7 +19,7 @@
import org.axonframework.commandhandling.model.inspection.AggregateModel; import org.axonframework.commandhandling.model.inspection.AggregateModel;
import org.axonframework.commandhandling.model.inspection.ModelInspector; import org.axonframework.commandhandling.model.inspection.ModelInspector;
import org.axonframework.common.Assert; import org.axonframework.common.Assert;
import org.axonframework.common.annotation.ParameterResolverFactory; import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork; import org.axonframework.messaging.unitofwork.UnitOfWork;


Expand Down
Expand Up @@ -17,7 +17,7 @@
package org.axonframework.commandhandling.model; package org.axonframework.commandhandling.model;


import org.axonframework.common.Assert; import org.axonframework.common.Assert;
import org.axonframework.common.annotation.ParameterResolverFactory; import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.common.lock.Lock; import org.axonframework.common.lock.Lock;
import org.axonframework.common.lock.LockFactory; import org.axonframework.common.lock.LockFactory;
import org.axonframework.common.lock.PessimisticLockFactory; import org.axonframework.common.lock.PessimisticLockFactory;
Expand Down
Expand Up @@ -35,6 +35,7 @@


public class AggregateMemberAnnotatedChildEntityCollectionDefinition implements ChildEntityDefinition { public class AggregateMemberAnnotatedChildEntityCollectionDefinition implements ChildEntityDefinition {


@SuppressWarnings("unchecked")
@Override @Override
public <T> Optional<ChildEntity<T>> createChildDefinition(Field field, EntityModel<T> declaringEntity) { public <T> Optional<ChildEntity<T>> createChildDefinition(Field field, EntityModel<T> declaringEntity) {
Map<String, Object> attributes = AnnotationUtils.findAnnotationAttributes(field, AggregateMember.class).orElse(null); Map<String, Object> attributes = AnnotationUtils.findAnnotationAttributes(field, AggregateMember.class).orElse(null);
Expand All @@ -43,26 +44,27 @@ public <T> Optional<ChildEntity<T>> createChildDefinition(Field field, EntityMod
} }


EntityModel<Object> childEntityModel = declaringEntity.modelOf(resolveType(attributes, field)); EntityModel<Object> childEntityModel = declaringEntity.modelOf(resolveType(attributes, field));
String parentRoutingKey = declaringEntity.routingKey();
Map<String, Property<Object>> routingKeyProperties = Map<String, Property<Object>> routingKeyProperties =
childEntityModel.commandHandlers().values().stream() childEntityModel.commandHandlers().values().stream()
.map(h -> h.unwrap(CommandMessageHandlingMember.class).orElse(null))
.filter(h -> h != null)
.collect(Collectors.toConcurrentMap( .collect(Collectors.toConcurrentMap(
CommandMessageHandler::commandName, CommandMessageHandlingMember::commandName,
h -> getProperty(h.payloadType(), h -> getProperty(h.payloadType(),
getOrDefault(h.routingKey(), childEntityModel.routingKey())))); getOrDefault(childEntityModel.routingKey(), h.routingKey()))));
//noinspection unchecked //noinspection unchecked
return Optional.of(new AnnotatedChildEntity<>( return Optional.of(new AnnotatedChildEntity<>(
parentRoutingKey, field, childEntityModel, field, childEntityModel,
(Boolean) attributes.get("forwardCommands"), (Boolean) attributes.get("forwardCommands"),
(Boolean) attributes.get("forwardEvents"), (Boolean) attributes.get("forwardEvents"),
(msg, parent) -> { (msg, parent) -> {
Object routingValue = routingKeyProperties.get(msg.getCommandName()).getValue(msg.getPayload()); Object routingValue = routingKeyProperties.get(msg.getCommandName()).getValue(msg.getPayload());
Iterable<?> iterable = (Iterable) ReflectionUtils.getFieldValue(field, parent); Iterable<?> iterable = ReflectionUtils.getFieldValue(field, parent);
return StreamSupport.stream(iterable.spliterator(), false) return StreamSupport.stream(iterable.spliterator(), false)
.filter(i -> Objects.equals(routingValue, childEntityModel.getIdentifier(i))) .filter(i -> Objects.equals(Objects.toString(routingValue, ""), childEntityModel.getIdentifier(i)))
.findFirst() .findFirst().orElse(null);
.orElse(null); },
})); (msg, parent) -> ReflectionUtils.getFieldValue(field, parent)));
} }


private static Class<?> resolveType(Map<String, Object> attributes, Field field) { private static Class<?> resolveType(Map<String, Object> attributes, Field field) {
Expand Down
Expand Up @@ -24,6 +24,9 @@
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;


import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;

public class AggregateMemberAnnotatedChildEntityDefinition implements ChildEntityDefinition { public class AggregateMemberAnnotatedChildEntityDefinition implements ChildEntityDefinition {


@Override @Override
Expand All @@ -37,11 +40,14 @@ public <T> Optional<ChildEntity<T>> createChildDefinition(Field field, EntityMod
} }


EntityModel entityModel = declaringEntity.modelOf(field.getType()); EntityModel entityModel = declaringEntity.modelOf(field.getType());
String parentRoutingKey = declaringEntity.routingKey(); return Optional.of(new AnnotatedChildEntity<>(field, entityModel,
return Optional.of(new AnnotatedChildEntity<>(parentRoutingKey, field, entityModel,
(Boolean) attributes.get("forwardCommands"), (Boolean) attributes.get("forwardCommands"),
(Boolean) attributes.get("forwardEvents"), (Boolean) attributes.get("forwardEvents"),
(msg, parent) -> ReflectionUtils.getFieldValue(field, parent))); (msg, parent) -> ReflectionUtils.getFieldValue(field, parent),
(msg, parent) -> {
Object fieldVal = ReflectionUtils.getFieldValue(field, parent);
return fieldVal == null ? emptyList() : singleton(fieldVal);
}));
} }


} }
Expand Up @@ -23,8 +23,7 @@
import org.axonframework.common.property.Property; import org.axonframework.common.property.Property;


import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.Map; import java.util.*;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import static java.lang.String.format; import static java.lang.String.format;
Expand All @@ -33,6 +32,20 @@


public class AggregateMemberAnnotatedChildEntityMapDefinition implements ChildEntityDefinition { public class AggregateMemberAnnotatedChildEntityMapDefinition implements ChildEntityDefinition {


private static Class<?> resolveType(Map<String, Object> attributes, Field field) {
Class<?> entityType = (Class<?>) attributes.get("type");
if (Void.class.equals(entityType)) {
entityType = ReflectionUtils.resolveGenericType(field, 1)
.orElseThrow(
() -> new AxonConfigurationException(
format("Unable to resolve entity type of field [%s]. " +
"Please provide type explicitly in @AggregateMember annotation.",
field.toGenericString())));
}

return entityType;
}

@Override @Override
public <T> Optional<ChildEntity<T>> createChildDefinition(Field field, EntityModel<T> declaringEntity) { public <T> Optional<ChildEntity<T>> createChildDefinition(Field field, EntityModel<T> declaringEntity) {
Map<String, Object> attributes = AnnotationUtils.findAnnotationAttributes(field, AggregateMember.class).orElse(null); Map<String, Object> attributes = AnnotationUtils.findAnnotationAttributes(field, AggregateMember.class).orElse(null);
Expand All @@ -41,16 +54,16 @@ public <T> Optional<ChildEntity<T>> createChildDefinition(Field field, EntityMod
} }


EntityModel<Object> childEntityModel = declaringEntity.modelOf(resolveType(attributes, field)); EntityModel<Object> childEntityModel = declaringEntity.modelOf(resolveType(attributes, field));
String parentRoutingKey = declaringEntity.routingKey();
Map<String, Property<Object>> routingKeyProperties = Map<String, Property<Object>> routingKeyProperties =
childEntityModel.commandHandlers().values().stream() childEntityModel.commandHandlers().values().stream()
.collect(Collectors.toConcurrentMap( .map(h -> h.unwrap(CommandMessageHandlingMember.class).orElse(null))
CommandMessageHandler::commandName, .filter(Objects::nonNull)
.collect(Collectors.toMap(
CommandMessageHandlingMember::commandName,
h -> { h -> {
String routingKey = getOrDefault(h.routingKey(), String routingKey = getOrDefault(h.routingKey(),
childEntityModel.routingKey()); childEntityModel.routingKey());
Property<Object> property = getProperty(h.payloadType(), Property<Object> property = getProperty(h.payloadType(), routingKey);
routingKey);
if (property == null) { if (property == null) {
throw new AxonConfigurationException(format("Command of type [%s] doesn't have a property matching the routing key [%s] necessary to route through field [%s]", throw new AxonConfigurationException(format("Command of type [%s] doesn't have a property matching the routing key [%s] necessary to route through field [%s]",
h.payloadType(), routingKey, field.toGenericString())); h.payloadType(), routingKey, field.toGenericString()));
Expand All @@ -59,28 +72,18 @@ public <T> Optional<ChildEntity<T>> createChildDefinition(Field field, EntityMod
})); }));
//noinspection unchecked //noinspection unchecked
return Optional.of(new AnnotatedChildEntity<>( return Optional.of(new AnnotatedChildEntity<>(
parentRoutingKey, field, childEntityModel, field, childEntityModel,
(Boolean) attributes.get("forwardCommands"), (Boolean) attributes.get("forwardCommands"),
(Boolean) attributes.get("forwardEvents"), (Boolean) attributes.get("forwardEvents"),
(msg, parent) -> { (msg, parent) -> {
Object routingValue = routingKeyProperties.get(msg.getCommandName()).getValue(msg.getPayload()); Object routingValue = routingKeyProperties.get(msg.getCommandName()).getValue(msg.getPayload());
Map<?, ?> fieldValue = (Map<?, ?>) ReflectionUtils.getFieldValue(field, parent); Map<?, ?> fieldValue = ReflectionUtils.getFieldValue(field, parent);
return fieldValue.get(routingValue); return fieldValue == null ? null : fieldValue.get(routingValue);
},
(msg, parent) -> {
Map<?, Object> fieldValue = ReflectionUtils.getFieldValue(field, parent);
return fieldValue == null ? Collections.emptyList() : fieldValue.values();
})); }));


} }

private static Class<?> resolveType(Map<String, Object> attributes, Field field) {
Class<?> entityType = (Class<?>) attributes.get("type");
if (Void.class.equals(entityType)) {
entityType = ReflectionUtils.resolveGenericType(field, 1)
.orElseThrow(
() -> new AxonConfigurationException(
format("Unable to resolve entity type of field [%s]. " +
"Please provide type explicitly in @AggregateMember annotation.",
field.toGenericString())));
}

return entityType;
}
} }
Expand Up @@ -21,7 +21,7 @@
import org.axonframework.commandhandling.model.AggregateInvocationException; import org.axonframework.commandhandling.model.AggregateInvocationException;
import org.axonframework.commandhandling.model.AggregateLifecycle; import org.axonframework.commandhandling.model.AggregateLifecycle;
import org.axonframework.commandhandling.model.ApplyMore; import org.axonframework.commandhandling.model.ApplyMore;
import org.axonframework.common.annotation.MessageHandler; import org.axonframework.messaging.annotation.MessageHandlingMember;
import org.axonframework.eventhandling.EventBus; import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventMessage; import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage; import org.axonframework.eventhandling.GenericEventMessage;
Expand Down Expand Up @@ -156,7 +156,7 @@ protected void publishOnEventBus(EventMessage<?> msg) {
@Override @Override
public Object handle(CommandMessage<?> msg) throws Exception { public Object handle(CommandMessage<?> msg) throws Exception {
return executeWithResult(() -> { return executeWithResult(() -> {
MessageHandler<? super T> handler = inspector.commandHandlers().get(msg.getCommandName()); MessageHandlingMember<? super T> handler = inspector.commandHandlers().get(msg.getCommandName());
Object result = handler.handle(msg, aggregateRoot); Object result = handler.handle(msg, aggregateRoot);
if (aggregateRoot == null) { if (aggregateRoot == null) {
aggregateRoot = (T) result; aggregateRoot = (T) result;
Expand Down
Expand Up @@ -17,8 +17,8 @@
package org.axonframework.commandhandling.model.inspection; package org.axonframework.commandhandling.model.inspection;


import org.axonframework.commandhandling.CommandMessage; import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.common.ReflectionUtils;
import org.axonframework.eventhandling.EventMessage; import org.axonframework.eventhandling.EventMessage;
import org.axonframework.messaging.annotation.MessageHandlingMember;


import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.HashMap; import java.util.HashMap;
Expand All @@ -29,40 +29,46 @@ public class AnnotatedChildEntity<P, C> implements ChildEntity<P> {
private final Field field; private final Field field;
private final EntityModel<C> entityModel; private final EntityModel<C> entityModel;
private final boolean forwardEvents; private final boolean forwardEvents;
private final Map<String, CommandMessageHandler<? super P>> commandHandlers; private final Map<String, MessageHandlingMember<? super P>> commandHandlers;
private final BiFunction<EventMessage<?>, P, Iterable<C>> eventTargetResolver;


public AnnotatedChildEntity(String parentRoutingKey, Field field, EntityModel<C> entityModel, @SuppressWarnings("unchecked")
public AnnotatedChildEntity(Field field, EntityModel<C> entityModel,
boolean forwardCommands, boolean forwardEvents, boolean forwardCommands, boolean forwardEvents,
BiFunction<CommandMessage<?>, P, C> targetResolver) { BiFunction<CommandMessage<?>, P, C> commandTargetResolver,
BiFunction<EventMessage<?>, P, Iterable<C>> eventTargetResolver) {
this.field = field; this.field = field;
this.entityModel = entityModel; this.entityModel = entityModel;
this.forwardEvents = forwardEvents; this.forwardEvents = forwardEvents;

this.eventTargetResolver = eventTargetResolver;
this.commandHandlers = new HashMap<>(); this.commandHandlers = new HashMap<>();
if (forwardCommands) { if (forwardCommands) {
entityModel.commandHandlers().forEach((commandType, childHandler) -> commandHandlers.put(commandType, entityModel
new ChildForwardingCommandMessageHandler<>( .commandHandlers()
parentRoutingKey, .forEach((commandType, childHandler) -> {
childHandler, commandHandlers.put(commandType,
targetResolver))); new ChildForwardingCommandMessageHandlingMember<>(
entityModel.routingKey(),
childHandler,
commandTargetResolver));
});
} }
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void publish(EventMessage<?> msg, P declaringInstance) { public void publish(EventMessage<?> msg, P declaringInstance) {
if (forwardEvents) { if (forwardEvents) {
Object instance = ReflectionUtils.getFieldValue(field, declaringInstance); Iterable<C> targets = eventTargetResolver.apply(msg, declaringInstance);
if (instance != null) { if (targets != null) {
EntityModel runtimeModel = this.entityModel.modelOf(instance.getClass()); targets.forEach(target -> this.entityModel.publish(msg, target));
runtimeModel.publish(msg, instance);
} }
} }
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public Map<String, CommandMessageHandler<? super P>> commandHandlers() { public Map<String, MessageHandlingMember<? super P>> commandHandlers() {
return commandHandlers; return commandHandlers;
} }


Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.axonframework.commandhandling.model.inspection; package org.axonframework.commandhandling.model.inspection;


import org.axonframework.eventhandling.EventMessage; import org.axonframework.eventhandling.EventMessage;
import org.axonframework.messaging.annotation.MessageHandlingMember;


import java.util.Map; import java.util.Map;


Expand All @@ -40,6 +41,6 @@ public interface ChildEntity<T> {
* *
* @return a map containing with the Command Names as keys and the handlers as values. * @return a map containing with the Command Names as keys and the handlers as values.
*/ */
Map<String, CommandMessageHandler<? super T>> commandHandlers(); Map<String, MessageHandlingMember<? super T>> commandHandlers();


} }

0 comments on commit b9556fb

Please sign in to comment.