Permalink
Browse files

It is now allows to call apply() from an @EventSourcingHandler

This allows for easier event handling in more complex aggregate structures,
where events applied by a entity can be handled and cause new events in
another entity.

Issue #AXON-236 Fixed
  • Loading branch information...
1 parent 82e9b54 commit 5ac5682dc9aae47af4faefd2b92ac62f215b4ba9 @abuijze abuijze committed Jun 2, 2014
@@ -23,6 +23,8 @@
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.domain.MetaData;
+import java.util.ArrayDeque;
+import java.util.Queue;
import javax.persistence.MappedSuperclass;
/**
@@ -39,6 +41,10 @@
implements EventSourcedAggregateRoot<I> {
private static final long serialVersionUID = 5868786029296883724L;
+ private transient boolean inReplay = false;
+
+ private transient boolean applyingEvents = false;
+ private transient Queue<PayloadAndMetaData> eventsToApply = new ArrayDeque<PayloadAndMetaData>();
/**
* {@inheritDoc}
@@ -54,13 +60,15 @@
@Override
public void initializeState(DomainEventStream domainEventStream) {
Assert.state(getUncommittedEventCount() == 0, "Aggregate is already initialized");
+ inReplay = true;
long lastSequenceNumber = -1;
while (domainEventStream.hasNext()) {
DomainEventMessage event = domainEventStream.next();
lastSequenceNumber = event.getSequenceNumber();
handleRecursively(event);
}
initializeEventStream(lastSequenceNumber);
+ inReplay = false;
}
/**
@@ -85,7 +93,15 @@ protected void apply(Object eventPayload) {
* @param metaData any meta-data that must be registered with the Event
*/
protected void apply(Object eventPayload, MetaData metaData) {
+ if (inReplay) {
+ return;
+ }
+ // ensure that nested invocations know they are nested
+ boolean wasNested = applyingEvents;
+ applyingEvents = true;
if (getIdentifier() == null) {
+ Assert.state(!wasNested, "Applying an event in an @EventSourcingHandler is allowed, but only *after* the "
+ + "aggregate identifier has been set");
// workaround for aggregates that set the aggregate identifier in an Event Handler
if (getUncommittedEventCount() > 0 || getVersion() != null) {
throw new IncompatibleAggregateException("The Aggregate Identifier has not been initialized. "
@@ -95,9 +111,36 @@ protected void apply(Object eventPayload, MetaData metaData) {
handleRecursively(new GenericDomainEventMessage<Object>(null, 0, eventPayload, metaData));
registerEvent(metaData, eventPayload);
} else {
- DomainEventMessage event = registerEvent(metaData, eventPayload);
- handleRecursively(event);
+ // eventsToApply may heb been set to null by serialization
+ if (eventsToApply == null) {
+ eventsToApply = new ArrayDeque<PayloadAndMetaData>();
+ }
+ eventsToApply.add(new PayloadAndMetaData(eventPayload, metaData));
}
+
+ while (!wasNested && eventsToApply != null && !eventsToApply.isEmpty()) {
+ final PayloadAndMetaData payloadAndMetaData = eventsToApply.poll();
+ handleRecursively(registerEvent(payloadAndMetaData.metaData, payloadAndMetaData.payload));
+ }
+ applyingEvents = wasNested;
+ }
+
+ /**
+ * Indicates whether this aggregate is in "live" mode. This is the case when an aggregate is fully initialized and
+ * ready to handle commands.
+ * <p/>
+ * Typically, this method is used to check the state of the aggregate while events are being handled. When the
+ * aggregate is handling an event to reconstruct its current state, <code>isLive()</code> returns
+ * <code>false</code>. If an event is being handled because is was applied as a result of the current command being
+ * executed, it returns <code>true</code>.
+ * <p/>
+ * <code>isLive()</code> can be used to prevent expensive calculations while event sourcing.
+ *
+ * @return <code>true</code> if the aggregate is live, <code>false</code> when the aggregate is relaying historic
+ * events.
+ */
+ protected boolean isLive() {
+ return !inReplay;
}
private void handleRecursively(DomainEventMessage event) {
@@ -136,4 +179,15 @@ private void handleRecursively(DomainEventMessage event) {
public Long getVersion() {
return getLastCommittedEventSequenceNumber();
}
+
+ private static class PayloadAndMetaData {
+
+ private final Object payload;
+ private final MetaData metaData;
+
+ private PayloadAndMetaData(Object payload, MetaData metaData) {
+ this.payload = payload;
+ this.metaData = metaData;
+ }
+ }
}
@@ -16,14 +16,16 @@
package org.axonframework.eventsourcing.annotation;
-import org.axonframework.common.annotation.ParameterResolver;
-import org.axonframework.common.annotation.ParameterResolverFactory;
-import org.axonframework.domain.Message;
-import org.axonframework.domain.StubDomainEvent;
-import org.axonframework.eventhandling.annotation.EventHandler;
+import org.axonframework.domain.DomainEventStream;
+import org.axonframework.domain.GenericDomainEventMessage;
+import org.axonframework.domain.SimpleDomainEventStream;
+import org.axonframework.serializer.SerializedObject;
+import org.axonframework.serializer.xml.XStreamSerializer;
import org.junit.*;
-import java.lang.annotation.Annotation;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
import javax.persistence.Id;
@@ -41,14 +43,27 @@ public void testApplyEvent() {
testSubject = new SimpleAggregateRoot();
assertNotNull(testSubject.getIdentifier());
- assertEquals(1, testSubject.getUncommittedEventCount());
+ // the first applied event applies another one
+ assertEquals(2, testSubject.getUncommittedEventCount());
// this proves that a newly added entity is also notified of an event
- assertEquals(1, testSubject.getEntity().invocationCount);
+ assertEquals(2, testSubject.getEntity().invocationCount);
testSubject.doSomething();
- assertEquals(2, testSubject.invocationCount);
- assertEquals(2, testSubject.getEntity().invocationCount);
+ assertEquals(3, testSubject.invocationCount);
+ assertEquals(3, testSubject.getEntity().invocationCount);
+
+ // the nested handler must be invoked second
+ assertFalse(testSubject.entity.appliedEvents.get(0).nested);
+ assertTrue(testSubject.entity.appliedEvents.get(1).nested);
+ assertFalse(testSubject.entity.appliedEvents.get(2).nested);
+
+ DomainEventStream uncommittedEvents = testSubject.getUncommittedEvents();
+ int i=0;
+ while (uncommittedEvents.hasNext()) {
+ assertSame(testSubject.entity.appliedEvents.get(i), uncommittedEvents.next().getPayload());
+ i++;
+ }
}
@Test
@@ -62,6 +77,10 @@ public void testIdentifierInitialization_LateInitialization() {
LateIdentifiedAggregate aggregate = new LateIdentifiedAggregate();
assertEquals("lateIdentifier", aggregate.getIdentifier());
assertEquals("lateIdentifier", aggregate.getUncommittedEvents().peek().getAggregateIdentifier());
+
+ DomainEventStream uncommittedEvents = aggregate.getUncommittedEvents();
+ assertFalse(((StubDomainEvent)uncommittedEvents.next().getPayload()).nested);
+ assertTrue(((StubDomainEvent)uncommittedEvents.next().getPayload()).nested);
}
@Test
@@ -71,18 +90,54 @@ public void testIdentifierInitialization_JavaxPersistenceId() {
assertEquals("lateIdentifier", aggregate.getUncommittedEvents().peek().getAggregateIdentifier());
}
+ @Test
+ public void testSerializationSetsLiveStateToTrue() throws Exception {
+ LateIdentifiedAggregate aggregate = new LateIdentifiedAggregate();
+ aggregate.commitEvents();
+ final XStreamSerializer serializer = new XStreamSerializer();
+ SerializedObject<String> serialized = serializer.serialize(aggregate, String.class);
+
+ LateIdentifiedAggregate deserializedAggregate = serializer.deserialize(serialized);
+ assertTrue(deserializedAggregate.isLive());
+ }
+
+ @Test
+ public void testEventNotAppliedInReplayMode() {
+ final UUID id = UUID.randomUUID();
+ testSubject = new SimpleAggregateRoot(id);
+ testSubject.initializeState(new SimpleDomainEventStream(
+ new GenericDomainEventMessage<StubDomainEvent>(id.toString(), 0, new StubDomainEvent(false)),
+ new GenericDomainEventMessage<StubDomainEvent>(id.toString(), 1, new StubDomainEvent(true))));
+
+ assertEquals(0, testSubject.getUncommittedEventCount());
+ assertEquals((Long) 1L, testSubject.getVersion());
+ assertEquals(2, testSubject.invocationCount);
+
+ // the nested handler must be invoked second
+ assertFalse(testSubject.entity.appliedEvents.get(0).nested);
+ assertTrue(testSubject.entity.appliedEvents.get(1).nested);
+ }
+
private static class LateIdentifiedAggregate extends AbstractAnnotatedAggregateRoot {
@AggregateIdentifier
private String aggregateIdentifier;
private LateIdentifiedAggregate() {
- apply(new StubDomainEvent());
+ apply(new StubDomainEvent(false));
+ }
+
+ @Override
+ public boolean isLive() {
+ return super.isLive();
}
@EventSourcingHandler
public void myEventHandlerMethod(StubDomainEvent event) {
aggregateIdentifier = "lateIdentifier";
+ if (!event.nested) {
+ apply(new StubDomainEvent(true));
+ }
}
}
@@ -92,7 +147,7 @@ public void myEventHandlerMethod(StubDomainEvent event) {
private String aggregateIdentifier;
private JavaxPersistenceIdIdentifiedAggregate() {
- apply(new StubDomainEvent());
+ apply(new StubDomainEvent(false));
}
@EventSourcingHandler
@@ -111,7 +166,7 @@ public void myEventHandlerMethod(StubDomainEvent event) {
private SimpleAggregateRoot() {
identifier = UUID.randomUUID();
- apply(new StubDomainEvent());
+ apply(new StubDomainEvent(false));
}
private SimpleAggregateRoot(UUID identifier) {
@@ -123,6 +178,7 @@ public void myEventHandlerMethod(StubDomainEvent event) {
this.invocationCount++;
if (entity == null) {
entity = new SimpleEntity();
+ apply(new StubDomainEvent(true));
}
}
@@ -131,73 +187,35 @@ public SimpleEntity getEntity() {
}
public void doSomething() {
- apply(new StubDomainEvent());
- }
- }
-
- private static class CustomParameterHandlerAggregateRoot extends SimpleAggregateRoot {
-
- private CustomParameterHandlerAggregateRoot() {
- super();
- }
-
- private CustomParameterHandlerAggregateRoot(UUID identifier) {
- super(identifier);
- }
-
- @EventHandler // the legacy annotation must still work
- public void myEventHandlerMethod(StubDomainEvent event, SomeResource resource) {
- super.myEventHandlerMethod(event);
- resource.registerInvocation();
- }
-
- @Override
- public void doSomething() {
- apply(new StubDomainEvent());
+ apply(new StubDomainEvent(false));
}
}
- interface SomeResource {
-
- void registerInvocation();
- }
-
private static class SimpleEntity extends AbstractAnnotatedEntity {
private int invocationCount;
+ private List<StubDomainEvent> appliedEvents = new ArrayList<StubDomainEvent>();
@EventSourcingHandler
public void myEventHandlerMethod(StubDomainEvent event) {
this.invocationCount++;
+ appliedEvents.add(event);
}
}
- private static class SimpleParameterResolverFactory implements ParameterResolverFactory, ParameterResolver<Object> {
+ private static class StubDomainEvent implements Serializable {
- private final Object resource;
+ private static final long serialVersionUID = 834667054977749990L;
- public SimpleParameterResolverFactory(Object resource) {
- this.resource = resource;
- }
+ private final boolean nested;
- @Override
- public ParameterResolver createInstance(Annotation[] memberAnnotations,
- Class<?> parameterType,
- Annotation[] parameterAnnotations) {
- if (parameterType.isInstance(resource)) {
- return this;
- }
- return null;
- }
-
- @Override
- public Object resolveParameterValue(Message message) {
- return resource;
+ private StubDomainEvent(boolean nested) {
+ this.nested = nested;
}
@Override
- public boolean matches(Message message) {
- return true;
+ public String toString() {
+ return "StubDomainEvent";
}
}
}

0 comments on commit 5ac5682

Please sign in to comment.