Skip to content

Commit

Permalink
Replace ForwardingMode enum for ForwardingMode interface
Browse files Browse the repository at this point in the history
-Replace ForwardingMode enum for ForwardingMode interface
-Introduce a ForwardAll ForwardingMode implementation
-Introduce a ForwardNone ForwardingMode implementation
-Introduce a ForwardMatchingInstances ForwardingMode implementation
-Adjust AggregateMember annotation to take in a ForwardingMode class
instead of an enum
-Instantiate the right ForwardingMode in the
AbstractChildEntityDefinition based on the class type in the
AggregateMember
-Filter the event targets based on the eventForwardingMode in the
AbstractChildEntityDefinition implementations
-Fix AggregateMember usages in tests

[#388]
  • Loading branch information
smcvb committed Nov 23, 2017
1 parent 71c7fb9 commit 469e1e5
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@
boolean forwardEvents() default true;

/**
* Indicates the forwarding mode used for events within this entity. Defaults to {@code ForwardingMode.ALL} to allow
* all events through.
* forwarding events to this Aggregate Member.
* Indicates the forwarding mode used for events within this entity. Defaults to
* {@link org.axonframework.commandhandling.model.ForwardAll} to allow all events through.
*/
ForwardingMode eventForwardingMode() default ForwardingMode.ALL;
Class<? extends ForwardingMode> eventForwardingMode() default ForwardAll.class;

/**
* The property of the event to be used as a routing key towards this Aggregate Member. Defaults to {@code ""},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2010-2017. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.commandhandling.model;

import org.axonframework.messaging.Message;

import java.util.function.Supplier;

/**
* Forward all messages {@code T} regardless of their set up.
*
* @param <T> the implementation {@code T} of the {@link org.axonframework.messaging.Message} being forwarded.
*/
public class ForwardAll<T extends Message<?>> implements ForwardingMode<T> {

public static final ForwardAll INSTANCE = new ForwardAll();

@Override
public ForwardingMode getInstance(Supplier<ForwardingMode> forwardingModeConstructor) {
return forwardingModeConstructor.get();
}

@Override
public <E> boolean forwardMessage(T message, E target) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2010-2017. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.commandhandling.model;

import org.axonframework.commandhandling.model.inspection.EntityModel;
import org.axonframework.common.property.Property;
import org.axonframework.messaging.Message;

import java.util.Objects;
import java.util.function.Supplier;

import static org.axonframework.common.property.PropertyAccessStrategy.getProperty;

/**
* Only forward messages of type {@code T} if the routing key of the message matches the identifier of the entity.
*
* @param <T> the implementation {@code T} of the {@link org.axonframework.messaging.Message} being forwarded.
*/
public class ForwardMatchingInstances<T extends Message<?>> implements ForwardingMode<T> {

private static final String EMPTY_STRING = "";

private final String routingKey;
private final EntityModel childEntity;

public ForwardMatchingInstances(String routingKey,
EntityModel childEntity) {
this.routingKey = routingKey;
this.childEntity = childEntity;
}

@Override
public ForwardingMode getInstance(Supplier<ForwardingMode> forwardingModeConstructor) {
return forwardingModeConstructor.get();
}

@Override
@SuppressWarnings("unchecked")
public <E> boolean forwardMessage(T message, E target) {
Property routingProperty = getProperty(message.getPayloadType(), routingKey());

Object routingValue = routingProperty.getValue(message.getPayload());
Object identifier = childEntity.getIdentifier(target);

return Objects.equals(routingValue, identifier);
}

private String routingKey() {
return Objects.equals(routingKey, EMPTY_STRING) ? childEntity.routingKey() : routingKey;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2010-2017. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.commandhandling.model;

import org.axonframework.messaging.Message;

import java.util.function.Supplier;

/**
* Forward no messages {@code T} regardless of their set up.
*
* @param <T> the implementation {@code T} of the {@link org.axonframework.messaging.Message} being forwarded.
*/
public class ForwardNone<T extends Message<?>> implements ForwardingMode<T> {

public static final ForwardNone INSTANCE = new ForwardNone();

@Override
public ForwardingMode getInstance(Supplier<ForwardingMode> forwardingModeConstructor) {
return forwardingModeConstructor.get();
}

@Override
public <E> boolean forwardMessage(T message, E target) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,30 @@

package org.axonframework.commandhandling.model;

import org.axonframework.messaging.Message;

import java.util.function.Supplier;

/**
* Enumeration describing the possible forwarding modes usable. Is for example used in the {@link
* org.axonframework.commandhandling.model.AggregateMember} to describe how events should be routed to it.
* Interface describing the required functionality to forward a message.
* An example implementation is the {@link ForwardAll}, which forwards all incoming messages.
*/
public enum ForwardingMode {
public interface ForwardingMode<T extends Message<?>> {

/**
* Forwards all the messages.
*/
ALL,
/**
* Forwards none of the messages.
* Creates an instance of an implementation of a {@link ForwardingMode}.
*
* @return an implementation of a {@link ForwardingMode}.
*/
NONE,
ForwardingMode getInstance(Supplier<ForwardingMode> forwardingModeConstructor);

/**
* Forwards messages based on a routing key value.
* Check whether the given {@code message} should be forwarded to the given {@code target}.
*
* @param message the message of type {@code T} to be forwarded.
* @param target the target of type {@code E} where the {@code message} shoudl be forwarded to
* @param <E> the type of the {@code target}
* @return true if the {@code message} should be forwarded; false if it should not.
*/
ROUTING_KEY

<E> boolean forwardMessage(T message, E target);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.model.AggregateMember;
import org.axonframework.commandhandling.model.ForwardAll;
import org.axonframework.commandhandling.model.ForwardMatchingInstances;
import org.axonframework.commandhandling.model.ForwardNone;
import org.axonframework.commandhandling.model.ForwardingMode;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.property.Property;
Expand All @@ -40,27 +43,29 @@
*/
public abstract class AbstractChildEntityDefinition implements ChildEntityDefinition {

@SuppressWarnings("unchecked")
@Override
public <T> Optional<ChildEntity<T>> createChildDefinition(Field field, EntityModel<T> declaringEntity) {
Map<String, Object> attributes = findAnnotationAttributes(field, AggregateMember.class).orElse(null);
if (attributes == null || fieldIsOfType(field)) {
return Optional.empty();
}

EntityModel<Object> childEntityModel = extractChildEntityModel(declaringEntity, attributes, field);

Boolean forwardEvents = (Boolean) attributes.get("forwardEvents");
ForwardingMode eventForwardingMode =
eventForwardingMode(forwardEvents, (ForwardingMode) attributes.get("eventForwardingMode"));
ForwardingMode eventForwardingMode = eventForwardingMode(
(Boolean) attributes.get("forwardEvents"),
(Class<? extends ForwardingMode>) attributes.get("eventForwardingMode"),
(String) attributes.get("eventRoutingKey"),
childEntityModel
);

return Optional.of(new AnnotatedChildEntity<>(
childEntityModel,
(Boolean) attributes.get("forwardCommands"),
(msg, parent) -> resolveCommandTarget(msg, parent, field, childEntityModel),
(msg, parent) -> resolveEventTarget(msg,
parent,
field,
eventForwardingMode,
(String) attributes.get("eventRoutingKey"),
childEntityModel)));
(msg, parent) -> resolveEventTarget(msg, parent, field, eventForwardingMode)
));
}

/**
Expand Down Expand Up @@ -139,8 +144,20 @@ private Property<Object> extractCommandHandlerRoutingKey(EntityModel<Object> chi
return property;
}

private ForwardingMode eventForwardingMode(Boolean forwardEvents, ForwardingMode eventForwardingMode) {
return !forwardEvents ? ForwardingMode.NONE : eventForwardingMode;
private ForwardingMode eventForwardingMode(Boolean forwardEvents,
Class<? extends ForwardingMode> eventForwardingMode,
String eventRoutingKey,
EntityModel<Object> childEntityModel) {
if (!forwardEvents) {
return ForwardNone.INSTANCE;
}

if (eventForwardingMode.equals(ForwardAll.class)) {
return ForwardAll.INSTANCE;
} else if (eventForwardingMode.equals(ForwardMatchingInstances.class)) {
return new ForwardMatchingInstances(eventRoutingKey, childEntityModel);
}
return ForwardNone.INSTANCE;
}

/**
Expand All @@ -162,47 +179,22 @@ protected abstract <T> Object resolveCommandTarget(CommandMessage<?> msg,
EntityModel<Object> childEntityModel);

/**
* Resolve the targets of an incoming {@link org.axonframework.eventhandling.EventMessage} to the right Child
* Entities. Returns a {@link java.util.stream.Stream} of all the Child Entities the Event Message could be
* * Resolve the targets of an incoming {@link org.axonframework.eventhandling.EventMessage} to the right Child
* Entities. Returns a {@link java.util.stream.Stream} of all the Child Entities the Event Message should be
* routed to.
*
* @param <T> The type {@code T} of the given {@code parent} Entity.
* @param parentEntity The {@code parent} Entity of type {@code T} of this Child Entity.
* @param field The {@link Field} containing the Child Entity.
* @param message The {@link org.axonframework.eventhandling.EventMessage} to route
* @param parentEntity The {@code parent} Entity of type {@code T} of this Child Entity.
* @param field The {@link Field} containing the Child Entity.
* @param eventForwardingMode The {@link org.axonframework.commandhandling.model.ForwardingMode} used for the {@code
* message} to route.
* @param <T> The type {@code T} of the given {@code parent} Entity.
* @return A {@link java.util.stream.Stream} of Child Entities which might be the targets of the incoming
* {@link org.axonframework.eventhandling.EventMessage}.
*/
protected abstract <T> Stream<Object> resolveEventTarget(EventMessage msg,
protected abstract <T> Stream<Object> resolveEventTarget(EventMessage message,
T parentEntity,
Field field,
ForwardingMode eventForwardingMode,
String eventRoutingKey,
EntityModel childEntity);

@SuppressWarnings("unchecked")
protected <C> boolean filterTarget(EventMessage msg,
C target,
ForwardingMode eventForwardingMode,
String eventRoutingKey,
EntityModel childEntity) {
if (eventForwardingMode == ForwardingMode.NONE) {
return false;
} else if (eventForwardingMode == ForwardingMode.ALL) {
return true;
} else if (eventForwardingMode == ForwardingMode.ROUTING_KEY) {
Property eventRoutingProperty =
getProperty(msg.getPayloadType(), eventRoutingKey(eventRoutingKey, childEntity));

Object eventRoutingValue = eventRoutingProperty.getValue(msg.getPayload());
Object entityIdentifier = childEntity.getIdentifier(target);

return Objects.equals(eventRoutingValue, entityIdentifier);
}
return false;
}

private String eventRoutingKey(String eventRoutingKey, EntityModel childEntity) {
return Objects.equals(eventRoutingKey, "") ? childEntity.routingKey() : eventRoutingKey;
}
ForwardingMode eventForwardingMode);
}

Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,14 @@ protected <T> Object resolveCommandTarget(CommandMessage<?> msg,
.orElse(null);
}

@SuppressWarnings("unchecked")
@Override
protected <T> Stream<Object> resolveEventTarget(EventMessage msg,
protected <T> Stream<Object> resolveEventTarget(EventMessage message,
T parentEntity,
Field field,
ForwardingMode eventForwardingMode,
String eventRoutingKey,
EntityModel childEntity) {
ForwardingMode eventForwardingMode) {
Collection<Object> fieldValue = ReflectionUtils.getFieldValue(field, parentEntity);
Stream<Object> eventTargetStream = fieldValue == null ? Stream.empty() : fieldValue.stream();
return eventTargetStream.filter(
target -> filterTarget(msg, target, eventForwardingMode, eventRoutingKey, childEntity)
);
return eventTargetStream.filter(target -> eventForwardingMode.forwardMessage(message, target));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,14 @@ protected <T> Object resolveCommandTarget(CommandMessage<?> msg,
return ReflectionUtils.getFieldValue(field, parent);
}

@SuppressWarnings("unchecked")
@Override
protected <T> Stream<Object> resolveEventTarget(EventMessage msg,
protected <T> Stream<Object> resolveEventTarget(EventMessage message,
T parentEntity,
Field field,
ForwardingMode eventForwardingMode,
String eventRoutingKey,
EntityModel childEntity) {
ForwardingMode eventForwardingMode) {
Object fieldVal = ReflectionUtils.getFieldValue(field, parentEntity);
Stream<Object> eventTargetStream = fieldVal == null ? Stream.empty() : Stream.of(fieldVal);
return eventTargetStream.filter(
target -> filterTarget(msg, target, eventForwardingMode, eventRoutingKey, childEntity)
);
return eventTargetStream.filter(target -> eventForwardingMode.forwardMessage(message, target));
}
}
Loading

0 comments on commit 469e1e5

Please sign in to comment.