Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
  • 4 commits
  • 9 files changed
  • 0 comments
  • 2 contributors
Apr 13, 2012
Frank Versnel fversnel Fixed typo in UnitOfWorkListener. 71ae784
Frank Versnel fversnel Unit of work listeners are now handled in a specific order of precede…
…nce.

Introduced a new class which sole purpose it is to handle the order of
precedence in which unit of work listeners are called during the specific
stages of a unit of work. For example during the prepare commit stage the
listeners are handled in the order they are registered in the unit of work.
While during the after commit stage the order of precedence is reversed.
This change was introduced to further refine the behaviour in which unit of
work listeners are expected to function. This is handy, for example, when
using the AuditingListener which can perform audits before any transaction is
started and should only write an audit entry when the transaction has already
been successfully committed.
333bb43
Apr 17, 2012
Allard Buijze abuijze Added Reader and Writer for streaming of Event Messages
The reader and writer can be used to stream multiple messages to a single
byte stream, for example in the case of a FileSystem based Event Store, or
for messaging between systems.
9e75d9e
Allard Buijze abuijze Merge branch 'unit-of-work-listener-precedence' da5c4c7
80 core/src/main/java/org/axonframework/io/EventMessageReader.java
... ... @@ -0,0 +1,80 @@
  1 +package org.axonframework.io;
  2 +
  3 +import org.axonframework.domain.EventMessage;
  4 +import org.axonframework.serializer.SerializedDomainEventMessage;
  5 +import org.axonframework.serializer.SerializedEventMessage;
  6 +import org.axonframework.serializer.SerializedMetaData;
  7 +import org.axonframework.serializer.Serializer;
  8 +import org.axonframework.serializer.SimpleSerializedObject;
  9 +import org.joda.time.DateTime;
  10 +
  11 +import java.io.DataInput;
  12 +import java.io.IOException;
  13 +
  14 +/**
  15 + * Reader that reads EventMessage instances written to the underlying input. Typically, these messages have been
  16 + * written using a {@link EventMessageWriter}. This reader distinguishes between DomainEventMessage and regular
  17 + * EventMessage implementations and will reconstruct an instance implementing that same interface when reading.
  18 + *
  19 + * @author Allard Buijze
  20 + * @since 2.0
  21 + */
  22 +public class EventMessageReader {
  23 +
  24 + private final Serializer serializer;
  25 + private final DataInput in;
  26 +
  27 + /**
  28 + * Creates a new EventMessageReader that reads the data from the given <code>input</code> and deserializes payload
  29 + * and meta data using the given <code>serializer</code>.
  30 + *
  31 + * @param input The input providing access to the written data
  32 + * @param serializer The serializer to deserialize payload and meta data with
  33 + */
  34 + public EventMessageReader(DataInput input, Serializer serializer) {
  35 + this.in = input;
  36 + this.serializer = serializer;
  37 + }
  38 +
  39 + /**
  40 + * Reads an EventMessage from the underlying input. If the written event was a DomainEventMessage, an instance of
  41 + * DomainEventMessage is returned.
  42 + *
  43 + * @param <T> The type of payload expected to be in the returned EventMessage. This is not checked at runtime!
  44 + * @return an EventMessage representing the message originally written.
  45 + *
  46 + * @throws IOException when an error occurs reading from the underlying input
  47 + * @throws java.io.EOFException when the end of the stream was reached before the message was entirely read
  48 + */
  49 + public <T> EventMessage<T> readEventMessage() throws IOException {
  50 + EventMessageType messageType = EventMessageType.fromTypeByte(in.readByte());
  51 + String identifier = in.readUTF();
  52 + String timestamp = in.readUTF();
  53 + String aggregateIdentifier = null;
  54 + long sequenceNumber = 0;
  55 + if (messageType == EventMessageType.DOMAIN_EVENT_MESSAGE) {
  56 + aggregateIdentifier = in.readUTF();
  57 + sequenceNumber = in.readLong();
  58 + }
  59 + String payloadType = in.readUTF();
  60 + String payloadRevision = in.readUTF();
  61 + byte[] payload = new byte[in.readInt()];
  62 + in.readFully(payload);
  63 + int metaDataSize = in.readInt();
  64 + byte[] metaData = new byte[metaDataSize];
  65 + in.readFully(metaData);
  66 + SimpleSerializedObject<byte[]> serializedPayload = new SimpleSerializedObject<byte[]>(payload,
  67 + byte[].class,
  68 + payloadType,
  69 + payloadRevision);
  70 + SerializedMetaData<byte[]> serializedMetaData = new SerializedMetaData<byte[]>(metaData, byte[].class);
  71 +
  72 + SerializedEventMessage<T> message = new SerializedEventMessage<T>(identifier, new DateTime(timestamp),
  73 + serializedPayload, serializedMetaData,
  74 + serializer);
  75 + if (messageType == EventMessageType.DOMAIN_EVENT_MESSAGE) {
  76 + return new SerializedDomainEventMessage<T>(message, aggregateIdentifier, sequenceNumber);
  77 + }
  78 + return message;
  79 + }
  80 +}
70 core/src/main/java/org/axonframework/io/EventMessageType.java
... ... @@ -0,0 +1,70 @@
  1 +package org.axonframework.io;
  2 +
  3 +import org.axonframework.domain.DomainEventMessage;
  4 +import org.axonframework.domain.EventMessage;
  5 +
  6 +/**
  7 + * Enumeration of supported Message Types by the {@link EventMessageWriter} and {@link EventMessageReader}.
  8 + *
  9 + * @author Allard Buijze
  10 + * @since 2.0
  11 + */
  12 +public enum EventMessageType {
  13 +
  14 + /**
  15 + * Represents a DomainEventMessage
  16 + */
  17 + DOMAIN_EVENT_MESSAGE((byte) 3, DomainEventMessage.class),
  18 +
  19 + /**
  20 + * Represents an EventMessage which is not a DomainEventMessage
  21 + */
  22 + EVENT_MESSAGE((byte) 1, EventMessage.class);
  23 +
  24 + private final byte typeByte;
  25 + private final Class<? extends EventMessage> messageClass;
  26 +
  27 + /**
  28 + * Returns the most specific EventMessageType for the given <code>message</code>.
  29 + *
  30 + * @param message The message to resolve the type for
  31 + * @return The EventMessageType for the given <code>message</code>
  32 + */
  33 + public static EventMessageType forMessage(EventMessage message) {
  34 + for (EventMessageType type : EventMessageType.values()) {
  35 + if (type.messageClass.isInstance(message)) {
  36 + return type;
  37 + }
  38 + }
  39 + return EVENT_MESSAGE;
  40 + }
  41 +
  42 + /**
  43 + * Returns the EventMessageType identified by the given <code>typeByte</code>.
  44 + *
  45 + * @param typeByte The byte representing the EventMessageType
  46 + * @return the EventMessageType represented by the typeByte, or <code>null</code> if unknown
  47 + */
  48 + public static EventMessageType fromTypeByte(byte typeByte) {
  49 + for (EventMessageType type : EventMessageType.values()) {
  50 + if (type.typeByte == typeByte) {
  51 + return type;
  52 + }
  53 + }
  54 + return null;
  55 + }
  56 +
  57 + private EventMessageType(byte typeByte, Class<? extends EventMessage> messageClass) {
  58 + this.typeByte = typeByte;
  59 + this.messageClass = messageClass;
  60 + }
  61 +
  62 + /**
  63 + * Returns the Type Byte for this EventMessageType.
  64 + *
  65 + * @return the byte representing this EventMessageType
  66 + */
  67 + public byte getTypeByte() {
  68 + return typeByte;
  69 + }
  70 +}
64 core/src/main/java/org/axonframework/io/EventMessageWriter.java
... ... @@ -0,0 +1,64 @@
  1 +package org.axonframework.io;
  2 +
  3 +
  4 +import org.axonframework.domain.DomainEventMessage;
  5 +import org.axonframework.domain.EventMessage;
  6 +import org.axonframework.serializer.SerializedObject;
  7 +import org.axonframework.serializer.Serializer;
  8 +
  9 +import java.io.DataOutput;
  10 +import java.io.IOException;
  11 +
  12 +/**
  13 + * Writer that writes Event Messages onto a an OutputStream. The format of the message makes them suitable to be read
  14 + * back in using a {@link EventMessageReader}. This writer distinguishes between DomainEventMessage and plain
  15 + * EventMessage when writing. The reader will reconstruct an aggregate implementation for the same message type (i.e.
  16 + * DomainEventMessage or EventMessage).
  17 + *
  18 + * @author Allard Buijze
  19 + * @since 2.0
  20 + */
  21 +public class EventMessageWriter {
  22 +
  23 + private final Serializer serializer;
  24 + private final DataOutput out;
  25 +
  26 + /**
  27 + * Creates a new EventMessageWriter writing data to the specified underlying <code>output</code>.
  28 + *
  29 + * @param output the underlying output
  30 + * @param serializer The serializer to deserialize payload and metadata with
  31 + */
  32 + public EventMessageWriter(DataOutput output, Serializer serializer) {
  33 + this.out = output;
  34 + this.serializer = serializer;
  35 + }
  36 +
  37 + /**
  38 + * Writes the given <code>eventMessage</code> to the underling output.
  39 + *
  40 + * @param eventMessage the EventMessage to write to the underlying output
  41 + * @throws IOException when any exception occurs writing to the underlying stream
  42 + */
  43 + public void writeEventMessage(EventMessage eventMessage) throws IOException {
  44 + EventMessageType messageType = EventMessageType.forMessage(eventMessage);
  45 + out.writeByte(messageType.getTypeByte());
  46 + out.writeUTF(eventMessage.getIdentifier());
  47 + out.writeUTF(eventMessage.getTimestamp().toString());
  48 + if (messageType == EventMessageType.DOMAIN_EVENT_MESSAGE) {
  49 + DomainEventMessage domainEventMessage = (DomainEventMessage) eventMessage;
  50 + out.writeUTF(domainEventMessage.getAggregateIdentifier().toString());
  51 + out.writeLong(domainEventMessage.getSequenceNumber());
  52 + }
  53 + SerializedObject<byte[]> serializedPayload = serializer.serialize(eventMessage.getPayload(), byte[].class);
  54 + SerializedObject<byte[]> serializedMetaData = serializer.serialize(eventMessage.getMetaData(), byte[].class);
  55 +
  56 + out.writeUTF(serializedPayload.getType().getName());
  57 + String revision = serializedPayload.getType().getRevision();
  58 + out.writeUTF(revision == null ? "" : revision);
  59 + out.writeInt(serializedPayload.getData().length);
  60 + out.write(serializedPayload.getData());
  61 + out.writeInt(serializedMetaData.getData().length);
  62 + out.write(serializedMetaData.getData());
  63 + }
  64 +}
16 core/src/main/java/org/axonframework/serializer/SerializedDomainEventMessage.java
@@ -60,6 +60,22 @@ public SerializedDomainEventMessage(SerializedDomainEventData domainEventData, S
60 60 sequenceNumber = domainEventData.getSequenceNumber();
61 61 }
62 62
  63 + /**
  64 + * Wrapper constructor for wrapping a SerializedEventMessage as a SerializedDomainEventMessage, using given
  65 + * <code>aggregateIdentifier</code> and <code>sequenceNumber</code>. This constructor should be used to reconstruct
  66 + * an instance of an existing serialized Domain Event Message
  67 + *
  68 + * @param eventMessage The eventMessage to wrap
  69 + * @param aggregateIdentifier The identifier of the aggregate that generated the message
  70 + * @param sequenceNumber The sequence number of the generated event
  71 + */
  72 + public SerializedDomainEventMessage(SerializedEventMessage<T> eventMessage, Object aggregateIdentifier,
  73 + long sequenceNumber) {
  74 + this.eventMessage = eventMessage;
  75 + this.aggregateIdentifier = aggregateIdentifier;
  76 + this.sequenceNumber = sequenceNumber;
  77 + }
  78 +
63 79 private SerializedDomainEventMessage(SerializedDomainEventMessage<T> original, Map<String, Object> metaData) {
64 80 eventMessage = original.eventMessage.withMetaData(metaData);
65 81 this.aggregateIdentifier = original.getAggregateIdentifier();
56 core/src/main/java/org/axonframework/unitofwork/DefaultUnitOfWork.java
@@ -27,13 +27,11 @@
27 27
28 28 import java.util.ArrayList;
29 29 import java.util.Collections;
30   -import java.util.HashSet;
31 30 import java.util.LinkedHashMap;
32 31 import java.util.LinkedList;
33 32 import java.util.List;
34 33 import java.util.Map;
35 34 import java.util.Queue;
36   -import java.util.Set;
37 35
38 36 /**
39 37 * Implementation of the UnitOfWork that buffers all published events until it is committed. Aggregates that have not
@@ -48,10 +46,9 @@
48 46
49 47 private static final Logger logger = LoggerFactory.getLogger(DefaultUnitOfWork.class);
50 48
51   - private final Map<AggregateRoot, AggregateEntry> registeredAggregates =
52   - new LinkedHashMap<AggregateRoot, AggregateEntry>();
  49 + private final Map<AggregateRoot, AggregateEntry> registeredAggregates = new LinkedHashMap<AggregateRoot, AggregateEntry>();
53 50 private final Queue<EventEntry> eventsToPublish = new LinkedList<EventEntry>();
54   - private final Set<UnitOfWorkListener> listeners = new HashSet<UnitOfWorkListener>();
  51 + private final UnitOfWorkListenerCollection listeners = new UnitOfWorkListenerCollection();
55 52 private Status dispatcherStatus = Status.READY;
56 53
57 54 private static enum Status {
@@ -84,9 +81,7 @@ protected void doRollback(Throwable cause) {
84 81 @Override
85 82 protected void doCommit() {
86 83 publishEvents();
87   -
88 84 commitInnerUnitOfWork();
89   -
90 85 notifyListenersAfterCommit();
91 86 }
92 87
@@ -121,10 +116,7 @@ protected void doCommit() {
121 116 }
122 117
123 118 private <T> EventMessage<T> invokeEventRegistrationListeners(EventMessage<T> event) {
124   - for (UnitOfWorkListener listener : listeners) {
125   - event = listener.onEventRegistered(event);
126   - }
127   - return event;
  119 + return listeners.onEventRegistered(event);
128 120 }
129 121
130 122 @SuppressWarnings({"unchecked"})
@@ -140,9 +132,6 @@ protected void doCommit() {
140 132
141 133 @Override
142 134 public void registerListener(UnitOfWorkListener listener) {
143   - if (logger.isDebugEnabled()) {
144   - logger.debug("Registering listener: {}", listener.getClass().getName());
145   - }
146 135 listeners.add(listener);
147 136 }
148 137
@@ -163,26 +152,14 @@ public void publishEvent(EventMessage event, EventBus eventBus) {
163 152
164 153 @Override
165 154 protected void notifyListenersRollback(Throwable cause) {
166   - logger.debug("Notifying listeners of rollback");
167   - for (UnitOfWorkListener listener : listeners) {
168   - if (logger.isDebugEnabled()) {
169   - logger.debug("Notifying listener [{}] of rollback", listener.getClass().getName());
170   - }
171   - listener.onRollback(cause);
172   - }
  155 + listeners.onRollback(cause);
173 156 }
174 157
175 158 /**
176 159 * Send a {@link UnitOfWorkListener#afterCommit()} notification to all registered listeners.
177 160 */
178 161 protected void notifyListenersAfterCommit() {
179   - logger.debug("Notifying listeners after commit");
180   - for (UnitOfWorkListener listener : listeners) {
181   - if (logger.isDebugEnabled()) {
182   - logger.debug("Notifying listener [{}] after commit", listener.getClass().getName());
183   - }
184   - listener.afterCommit();
185   - }
  162 + listeners.afterCommit();
186 163 }
187 164
188 165 /**
@@ -227,31 +204,12 @@ protected void saveAggregates() {
227 204
228 205 @Override
229 206 protected void notifyListenersPrepareCommit() {
230   - logger.debug("Notifying listeners of commit request");
231   - List<EventMessage> events = eventsToPublish();
232   - for (UnitOfWorkListener listener : listeners) {
233   - if (logger.isDebugEnabled()) {
234   - logger.debug("Notifying listener [{}] of upcoming commit", listener.getClass().getName());
235   - }
236   - listener.onPrepareCommit(registeredAggregates.keySet(), events);
237   - }
238   - logger.debug("Listeners successfully notified");
  207 + listeners.onPrepareCommit(registeredAggregates.keySet(), eventsToPublish());
239 208 }
240 209
241 210 @Override
242 211 protected void notifyListenersCleanup() {
243   - logger.debug("Notifying listeners of cleanup");
244   - for (UnitOfWorkListener listener : listeners) {
245   - try {
246   - if (logger.isDebugEnabled()) {
247   - logger.debug("Notifying listener [{}] of cleanup", listener.getClass().getName());
248   - }
249   - listener.onCleanup();
250   - } catch (RuntimeException e) {
251   - logger.warn("Listener raised an exception on cleanup. Ignoring...", e);
252   - }
253   - }
254   - logger.debug("Listeners successfully notified");
  212 + listeners.onCleanup();
255 213 }
256 214
257 215 private List<EventMessage> eventsToPublish() {
2  core/src/main/java/org/axonframework/unitofwork/UnitOfWorkListener.java
@@ -57,7 +57,7 @@
57 57 *
58 58 * @param event The event about to be registered for publication
59 59 * @param <T> The type of payload of the EventMessage
60   - * @return the (modified) event to register for publicatino
  60 + * @return the (modified) event to register for publication
61 61 */
62 62 <T> EventMessage<T> onEventRegistered(EventMessage<T> event);
63 63
134 core/src/main/java/org/axonframework/unitofwork/UnitOfWorkListenerCollection.java
... ... @@ -0,0 +1,134 @@
  1 +package org.axonframework.unitofwork;
  2 +
  3 +import org.axonframework.domain.AggregateRoot;
  4 +import org.axonframework.domain.EventMessage;
  5 +import org.slf4j.Logger;
  6 +import org.slf4j.LoggerFactory;
  7 +
  8 +import java.util.ArrayDeque;
  9 +import java.util.Deque;
  10 +import java.util.Iterator;
  11 +import java.util.List;
  12 +import java.util.Set;
  13 +
  14 +/**
  15 + * This class is responsible for notifying registered listeners in a specific order of precedence.
  16 + * When {@link #onPrepareCommit(java.util.Set, java.util.List)}} and
  17 + * {@link #onEventRegistered(org.axonframework.domain.EventMessage)} are called the listeners will be handled
  18 + * in the order they have been registered (order of precedence).
  19 + * When {@link #afterCommit()}, {@link #onRollback(Throwable)}, and {@link #onCleanup()} are called
  20 + * the listeners will be handled in the reversed order of precedence.
  21 + * <p/>
  22 + * This behaviour is particularly useful because the {@link org.axonframework.auditing.AuditingUnitOfWorkListener}
  23 + * should write an entry before any other listeners are allowed to anything (prepare commit)
  24 + * and write an entry only after all other listeners have successfully committed (after commit).
  25 + *
  26 + * @author Frank Versnel
  27 + * @since 2.0
  28 + */
  29 +public class UnitOfWorkListenerCollection implements UnitOfWorkListener {
  30 +
  31 + private static final Logger logger = LoggerFactory.getLogger(UnitOfWorkListenerCollection.class);
  32 +
  33 + private final Deque<UnitOfWorkListener> listeners = new ArrayDeque<UnitOfWorkListener>();
  34 +
  35 + /**
  36 + * {@inheritDoc}
  37 + * <p/>
  38 + * Listeners are called in the reversed order of precedence.
  39 + */
  40 + @Override
  41 + public void afterCommit() {
  42 + logger.debug("Notifying listeners after commit");
  43 + for (Iterator<UnitOfWorkListener> listenerIter = listeners.descendingIterator(); listenerIter.hasNext();) {
  44 + UnitOfWorkListener listener = listenerIter.next();
  45 + if (logger.isDebugEnabled()) {
  46 + logger.debug("Notifying listener [{}] after commit", listener.getClass().getName());
  47 + }
  48 + listener.afterCommit();
  49 + }
  50 + }
  51 +
  52 + /**
  53 + * {@inheritDoc}
  54 + * <p/>
  55 + * Listeners are called in the reversed order of precedence.
  56 + */
  57 + @Override
  58 + public void onRollback(Throwable failureCause) {
  59 + logger.debug("Notifying listeners of rollback");
  60 + for (Iterator<UnitOfWorkListener> listenerIter = listeners.descendingIterator(); listenerIter.hasNext();) {
  61 + UnitOfWorkListener listener = listenerIter.next();
  62 + if (logger.isDebugEnabled()) {
  63 + logger.debug("Notifying listener [{}] of rollback", listener.getClass().getName());
  64 + }
  65 + listener.onRollback(failureCause);
  66 + }
  67 + }
  68 +
  69 + /**
  70 + * {@inheritDoc}
  71 + * <p/>
  72 + * Listeners are called in the order of precedence.
  73 + */
  74 + @Override
  75 + public <T> EventMessage<T> onEventRegistered(EventMessage<T> event) {
  76 + EventMessage<T> newEvent = event;
  77 + for (UnitOfWorkListener listener : listeners) {
  78 + newEvent = listener.onEventRegistered(event);
  79 + }
  80 + return newEvent;
  81 + }
  82 +
  83 + /**
  84 + * {@inheritDoc}
  85 + * <p/>
  86 + * Listeners are called in the order of precedence.
  87 + */
  88 + @Override
  89 + public void onPrepareCommit(Set<AggregateRoot> aggregateRoots, List<EventMessage> events) {
  90 + logger.debug("Notifying listeners of commit request");
  91 + for (UnitOfWorkListener listener : listeners) {
  92 + if (logger.isDebugEnabled()) {
  93 + logger.debug("Notifying listener [{}] of upcoming commit", listener.getClass().getName());
  94 + }
  95 + listener.onPrepareCommit(aggregateRoots, events);
  96 + }
  97 + logger.debug("Listeners successfully notified");
  98 + }
  99 +
  100 + /**
  101 + * {@inheritDoc}
  102 + * <p/>
  103 + * Listeners are called in the reversed order of precedence.
  104 + */
  105 + @Override
  106 + public void onCleanup() {
  107 + logger.debug("Notifying listeners of cleanup");
  108 + for (Iterator<UnitOfWorkListener> listenerIter = listeners.descendingIterator(); listenerIter.hasNext();) {
  109 + UnitOfWorkListener listener = listenerIter.next();
  110 + try {
  111 + if (logger.isDebugEnabled()) {
  112 + logger.debug("Notifying listener [{}] of cleanup", listener.getClass().getName());
  113 + }
  114 + listener.onCleanup();
  115 + } catch (RuntimeException e) {
  116 + logger.warn("Listener raised an exception on cleanup. Ignoring...", e);
  117 + }
  118 + }
  119 + logger.debug("Listeners successfully notified");
  120 + }
  121 +
  122 + /**
  123 + * Adds a listener to the collection. Note that the order in which you register the listeners determines the order
  124 + * in which they will be handled during the various stages of a unit of work.
  125 + *
  126 + * @param listener the listener to be added
  127 + */
  128 + public void add(UnitOfWorkListener listener) {
  129 + if (logger.isDebugEnabled()) {
  130 + logger.debug("Registering listener: {}", listener.getClass().getName());
  131 + }
  132 + listeners.add(listener);
  133 + }
  134 +}
68 core/src/test/java/org/axonframework/io/MessageStreamTest.java
... ... @@ -0,0 +1,68 @@
  1 +package org.axonframework.io;
  2 +
  3 +import org.axonframework.domain.DomainEventMessage;
  4 +import org.axonframework.domain.EventMessage;
  5 +import org.axonframework.domain.GenericDomainEventMessage;
  6 +import org.axonframework.domain.GenericEventMessage;
  7 +import org.axonframework.serializer.xml.XStreamSerializer;
  8 +import org.junit.*;
  9 +
  10 +import java.io.ByteArrayInputStream;
  11 +import java.io.ByteArrayOutputStream;
  12 +import java.io.DataInputStream;
  13 +import java.io.DataOutputStream;
  14 +import java.util.Collections;
  15 +
  16 +import static org.junit.Assert.*;
  17 +
  18 +/**
  19 + * @author Allard Buijze
  20 + */
  21 +public class MessageStreamTest {
  22 +
  23 + @Test
  24 + public void testStreamEventMessage() throws Exception {
  25 + ByteArrayOutputStream baos = new ByteArrayOutputStream();
  26 + XStreamSerializer serializer = new XStreamSerializer();
  27 + EventMessageWriter out = new EventMessageWriter(new DataOutputStream(baos), serializer);
  28 + GenericEventMessage<String> message = new GenericEventMessage<String>("This is the payload",
  29 + Collections.<String, Object>singletonMap(
  30 + "metaKey",
  31 + "MetaValue"));
  32 + out.writeEventMessage(message);
  33 + EventMessageReader in = new EventMessageReader(new DataInputStream(
  34 + new ByteArrayInputStream(baos.toByteArray())), serializer);
  35 + EventMessage<Object> serializedMessage = in.readEventMessage();
  36 +
  37 + assertEquals(message.getIdentifier(), serializedMessage.getIdentifier());
  38 + assertEquals(message.getPayloadType(), serializedMessage.getPayloadType());
  39 + assertEquals(message.getTimestamp(), serializedMessage.getTimestamp());
  40 + assertEquals(message.getMetaData(), serializedMessage.getMetaData());
  41 + assertEquals(message.getPayload(), serializedMessage.getPayload());
  42 + }
  43 +
  44 + @Test
  45 + public void testStreamDomainEventMessage() throws Exception {
  46 + ByteArrayOutputStream baos = new ByteArrayOutputStream();
  47 + XStreamSerializer serializer = new XStreamSerializer();
  48 + EventMessageWriter out = new EventMessageWriter(new DataOutputStream(baos), serializer);
  49 + GenericDomainEventMessage<String> message = new GenericDomainEventMessage<String>(
  50 + "AggregateID", 1L, "This is the payload", Collections.<String, Object>singletonMap("metaKey",
  51 + "MetaValue"));
  52 + out.writeEventMessage(message);
  53 + EventMessageReader in = new EventMessageReader(
  54 + new DataInputStream(new ByteArrayInputStream(baos.toByteArray())), serializer);
  55 + EventMessage<Object> serializedMessage = in.readEventMessage();
  56 + assertTrue(serializedMessage instanceof DomainEventMessage);
  57 +
  58 + DomainEventMessage serializedDomainEventMessage = (DomainEventMessage) serializedMessage;
  59 +
  60 + assertEquals(message.getIdentifier(), serializedDomainEventMessage.getIdentifier());
  61 + assertEquals(message.getPayloadType(), serializedDomainEventMessage.getPayloadType());
  62 + assertEquals(message.getTimestamp(), serializedDomainEventMessage.getTimestamp());
  63 + assertEquals(message.getMetaData(), serializedDomainEventMessage.getMetaData());
  64 + assertEquals(message.getPayload(), serializedDomainEventMessage.getPayload());
  65 + assertEquals(message.getAggregateIdentifier(), serializedDomainEventMessage.getAggregateIdentifier());
  66 + assertEquals(message.getSequenceNumber(), serializedDomainEventMessage.getSequenceNumber());
  67 + }
  68 +}
31 disruptor-commandbus/src/main/java/org/axonframework/commandhandling/disruptor/DisruptorUnitOfWork.java
@@ -28,6 +28,7 @@
28 28 import org.axonframework.unitofwork.SaveAggregateCallback;
29 29 import org.axonframework.unitofwork.UnitOfWork;
30 30 import org.axonframework.unitofwork.UnitOfWorkListener;
  31 +import org.axonframework.unitofwork.UnitOfWorkListenerCollection;
31 32
32 33 import java.util.ArrayList;
33 34 import java.util.Collections;
@@ -49,8 +50,8 @@
49 50 private DomainEventStream eventsToStore = EMPTY_DOMAIN_EVENT_STREAM;
50 51
51 52 private final List<EventMessage> eventsToPublish = new ArrayList<EventMessage>();
  53 + private final UnitOfWorkListenerCollection listeners = new UnitOfWorkListenerCollection();
52 54 private EventSourcedAggregateRoot aggregate;
53   - private List<UnitOfWorkListener> listeners = new ArrayList<UnitOfWorkListener>();
54 55
55 56 /**
56 57 * Initializes a UnitOfWork for execution of a command on the given <code>aggregate</code>.
@@ -74,9 +75,7 @@ public void commit() {
74 75 * committed, but before any of the changes are made public.
75 76 */
76 77 public void onPrepareCommit() {
77   - for (UnitOfWorkListener listener : listeners) {
78   - listener.onPrepareCommit(Collections.<AggregateRoot>singleton(aggregate), eventsToPublish);
79   - }
  78 + listeners.onPrepareCommit(Collections.<AggregateRoot>singleton(aggregate), eventsToPublish);
80 79 }
81 80
82 81 /**
@@ -84,9 +83,7 @@ public void onPrepareCommit() {
84 83 * and published.
85 84 */
86 85 public void onAfterCommit() {
87   - for (UnitOfWorkListener listener : listeners) {
88   - listener.afterCommit();
89   - }
  86 + listeners.afterCommit();
90 87 }
91 88
92 89 /**
@@ -94,9 +91,7 @@ public void onAfterCommit() {
94 91 * published and the after-commit cycle has been executed.
95 92 */
96 93 public void onCleanup() {
97   - for (UnitOfWorkListener listener : listeners) {
98   - listener.onCleanup();
99   - }
  94 + listeners.onCleanup();
100 95 }
101 96
102 97 /**
@@ -106,9 +101,7 @@ public void onCleanup() {
106 101 * @param cause The cause of the rollback
107 102 */
108 103 public void onRollback(Throwable cause) {
109   - for (UnitOfWorkListener listener : listeners) {
110   - listener.onRollback(cause);
111   - }
  104 + listeners.onRollback(cause);
112 105 }
113 106
114 107 @Override
@@ -137,7 +130,7 @@ public boolean isStarted() {
137 130
138 131 @Override
139 132 public void registerListener(UnitOfWorkListener listener) {
140   - this.listeners.add(listener);
  133 + listeners.add(listener);
141 134 }
142 135
143 136 @SuppressWarnings("unchecked")
@@ -191,16 +184,8 @@ public EventSourcedAggregateRoot getAggregate() {
191 184
192 185 @Override
193 186 public <T> DomainEventMessage<T> onRegisteredEvent(DomainEventMessage<T> event) {
194   - DomainEventMessage<T> message = (DomainEventMessage<T>) processListeners(event);
  187 + DomainEventMessage<T> message = (DomainEventMessage<T>) listeners.onEventRegistered(event);
195 188 eventsToPublish.add(message);
196 189 return message;
197 190 }
198   -
199   - private <T> EventMessage<T> processListeners(DomainEventMessage<T> event) {
200   - EventMessage<T> message = event;
201   - for (UnitOfWorkListener listener : listeners) {
202   - message = listener.onEventRegistered(message);
203   - }
204   - return message;
205   - }
206 191 }

No commit comments for this range

Something went wrong with that request. Please try again.