Permalink
Browse files

Changed JPA DefaultEventEntryStore for easy configuration of custom e…

…ntity

It is now possible to customize the Event Entry entities used by the
DefaultEventEntryStore, by subclassing it and overriding the methods to
create entities.

Thanks to @Hippoom for providing the pull request which served as inspiration
to this solution.

Issue #AXON-211 Fixed
  • Loading branch information...
1 parent ca70797 commit 29525e8945abfdc4c5e2d58747dc75b56e375c23 @abuijze abuijze committed Apr 9, 2014
Showing with 95 additions and 19 deletions.
  1. +95 −19 core/src/main/java/org/axonframework/eventstore/jpa/DefaultEventEntryStore.java
@@ -51,7 +51,7 @@
@SuppressWarnings({"unchecked"})
public void persistEvent(String aggregateType, DomainEventMessage event, SerializedObject serializedPayload,
SerializedObject serializedMetaData, EntityManager entityManager) {
- entityManager.persist(new DomainEventEntry(aggregateType, event, serializedPayload, serializedMetaData));
+ entityManager.persist(createDomainEventEntry(aggregateType, event, serializedPayload, serializedMetaData));
}
@Override
@@ -62,7 +62,7 @@ public SimpleSerializedDomainEventData loadLastSnapshotEvent(String aggregateTyp
.createQuery("SELECT new org.axonframework.eventstore.jpa.SimpleSerializedDomainEventData("
+ "e.eventIdentifier, e.aggregateIdentifier, e.sequenceNumber, "
+ "e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData) "
- + "FROM SnapshotEventEntry e "
+ + "FROM " + snapshotEventEntryEntityName() + " e "
+ "WHERE e.aggregateIdentifier = :id AND e.type = :type "
+ "ORDER BY e.sequenceNumber DESC")
.setParameter("id", identifier.toString())
@@ -81,15 +81,84 @@ public SimpleSerializedDomainEventData loadLastSnapshotEvent(String aggregateTyp
public Iterator<SerializedDomainEventData> fetchFiltered(String whereClause, Map<String, Object> parameters,
int batchSize,
EntityManager entityManager) {
- return new BatchingIterator(whereClause, parameters, batchSize, entityManager);
+ return new BatchingIterator(whereClause, parameters, batchSize, domainEventEntryEntityName(), entityManager);
}
@Override
public void persistSnapshot(String aggregateType, DomainEventMessage snapshotEvent,
SerializedObject serializedPayload, SerializedObject serializedMetaData,
EntityManager entityManager) {
- entityManager.persist(new SnapshotEventEntry(aggregateType, snapshotEvent, serializedPayload,
- serializedMetaData));
+ entityManager.persist(createSnapshotEventEntry(aggregateType,
+ snapshotEvent,
+ serializedPayload,
+ serializedMetaData));
+ }
+
+ /**
+ * Allows for customization of the DomainEventEntry to store. Subclasses may choose to override this method to
+ * use a different entity configuration.
+ * <p/>
+ * When overriding this method, also make sure the {@link #domainEventEntryEntityName()} method is overridden to
+ * return the correct entity name.
+ *
+ * @param aggregateType The type identifier of the aggregate
+ * @param event The event to be stored
+ * @param serializedPayload The serialized payload of the event
+ * @param serializedMetaData The serialized meta data of the event
+ * @return a JPA entity, ready to be stored using the entity manager
+ *
+ * @see #domainEventEntryEntityName()
+ */
+ protected DomainEventEntry createDomainEventEntry(String aggregateType, DomainEventMessage event,
+ SerializedObject<byte[]> serializedPayload,
+ SerializedObject<byte[]> serializedMetaData) {
+ return new DomainEventEntry(aggregateType, event, serializedPayload, serializedMetaData);
+ }
+
+ /**
+ * Allows for customization of the SnapshotEventEntry to store. Subclasses may choose to override this method to
+ * use a different entity configuration.
+ * <p/>
+ * When overriding this method, also make sure the {@link #snapshotEventEntryEntityName()} method is overridden to
+ * return the correct entity name.
+ *
+ * @param aggregateType The type identifier of the aggregate
+ * @param snapshotEvent The snapshot event to be stored
+ * @param serializedPayload The serialized payload of the event
+ * @param serializedMetaData The serialized meta data of the event
+ * @return a JPA entity, ready to be stored using the entity manager
+ *
+ * @see #snapshotEventEntryEntityName()
+ */
+ protected SnapshotEventEntry createSnapshotEventEntry(String aggregateType, DomainEventMessage snapshotEvent,
+ SerializedObject serializedPayload,
+ SerializedObject serializedMetaData) {
+ return new SnapshotEventEntry(aggregateType, snapshotEvent, serializedPayload,
+ serializedMetaData);
+ }
+
+ /**
+ * The name of the DomainEventEntry entity to use when querying for domain events.
+ *
+ * @return The entity name of the DomainEventEntry subclass to use
+ *
+ * @see #createDomainEventEntry(String, org.axonframework.domain.DomainEventMessage,
+ * org.axonframework.serializer.SerializedObject, org.axonframework.serializer.SerializedObject)
+ */
+ protected String domainEventEntryEntityName() {
+ return "DomainEventEntry";
+ }
+
+ /**
+ * The name of the SnapshotEventEntry entity to use when querying for snapshot events.
+ *
+ * @return The entity name of the SnapshotEventEntry subclass to use
+ *
+ * @see #createSnapshotEventEntry(String, org.axonframework.domain.DomainEventMessage,
+ * org.axonframework.serializer.SerializedObject, org.axonframework.serializer.SerializedObject)
+ */
+ protected String snapshotEventEntryEntityName() {
+ return "SnapshotEventEntry";
}
@Override
@@ -99,7 +168,7 @@ public void pruneSnapshots(String type, DomainEventMessage mostRecentSnapshotEve
maxSnapshotsArchived, entityManager);
if (redundantSnapshots.hasNext()) {
Long sequenceOfFirstSnapshotToPrune = redundantSnapshots.next();
- entityManager.createQuery("DELETE FROM SnapshotEventEntry e "
+ entityManager.createQuery("DELETE FROM " + snapshotEventEntryEntityName() + " e "
+ "WHERE e.type = :type "
+ "AND e.aggregateIdentifier = :aggregateIdentifier "
+ "AND e.sequenceNumber <= :sequenceOfFirstSnapshotToPrune")
@@ -125,9 +194,10 @@ public void pruneSnapshots(String type, DomainEventMessage mostRecentSnapshotEve
int maxSnapshotsArchived,
EntityManager entityManager) {
return entityManager.createQuery(
- "SELECT e.sequenceNumber FROM SnapshotEventEntry e "
+ "SELECT e.sequenceNumber FROM " + snapshotEventEntryEntityName() + " e "
+ "WHERE e.type = :type AND e.aggregateIdentifier = :aggregateIdentifier "
- + "ORDER BY e.sequenceNumber DESC")
+ + "ORDER BY e.sequenceNumber DESC"
+ )
.setParameter("type", type)
.setParameter("aggregateIdentifier", snapshotEvent.getAggregateIdentifier().toString())
.setFirstResult(maxSnapshotsArchived)
@@ -141,11 +211,8 @@ public void pruneSnapshots(String type, DomainEventMessage mostRecentSnapshotEve
long firstSequenceNumber,
int batchSize, EntityManager entityManager) {
- return new BatchingAggregateStreamIterator(firstSequenceNumber,
- identifier,
- aggregateType,
- batchSize,
- entityManager);
+ return new BatchingAggregateStreamIterator(firstSequenceNumber, identifier, aggregateType, batchSize,
+ domainEventEntryEntityName(), entityManager);
}
private static final class BatchingAggregateStreamIterator implements Iterator<SerializedDomainEventData> {
@@ -156,13 +223,15 @@ public void pruneSnapshots(String type, DomainEventMessage mostRecentSnapshotEve
private final Object id;
private final String typeId;
private final int batchSize;
+ private final String domainEventEntryEntityName;
private final EntityManager entityManager;
private BatchingAggregateStreamIterator(long firstSequenceNumber, Object id, String typeId, int batchSize,
- EntityManager entityManager) {
+ String domainEventEntryEntityName, EntityManager entityManager) {
this.id = id;
this.typeId = typeId;
this.batchSize = batchSize;
+ this.domainEventEntryEntityName = domainEventEntryEntityName;
this.entityManager = entityManager;
List<SerializedDomainEventData> firstBatch = fetchBatch(firstSequenceNumber);
this.currentBatchSize = firstBatch.size();
@@ -197,10 +266,11 @@ public SerializedDomainEventData next() {
"SELECT new org.axonframework.eventstore.jpa.SimpleSerializedDomainEventData("
+ "e.eventIdentifier, e.aggregateIdentifier, e.sequenceNumber, "
+ "e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData) "
- + "FROM DomainEventEntry e "
+ + "FROM " + domainEventEntryEntityName + " e "
+ "WHERE e.aggregateIdentifier = :id AND e.type = :type "
+ "AND e.sequenceNumber >= :seq "
- + "ORDER BY e.sequenceNumber ASC")
+ + "ORDER BY e.sequenceNumber ASC"
+ )
.setParameter("id", id.toString())
.setParameter("type", typeId)
.setParameter("seq", firstSequenceNumber)
@@ -223,13 +293,17 @@ public void remove() {
private final String whereClause;
private final Map<String, Object> parameters;
private final int batchSize;
+
+ private final String domainEventEntryEntityName;
private final EntityManager entityManager;
public BatchingIterator(
- String whereClause, Map<String, Object> parameters, int batchSize, EntityManager entityManager) {
+ String whereClause, Map<String, Object> parameters, int batchSize, String domainEventEntryEntityName,
+ EntityManager entityManager) {
this.whereClause = whereClause;
this.parameters = parameters;
this.batchSize = batchSize;
+ this.domainEventEntryEntityName = domainEventEntryEntityName;
this.entityManager = entityManager;
List<SerializedDomainEventData> firstBatch = fetchBatch();
@@ -247,9 +321,11 @@ public BatchingIterator(
String.format("SELECT new org.axonframework.eventstore.jpa.SimpleSerializedDomainEventData("
+ "e.eventIdentifier, e.aggregateIdentifier, e.sequenceNumber, "
+ "e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData) "
- + "FROM DomainEventEntry e %s ORDER BY e.timeStamp ASC, "
+ + "FROM " + domainEventEntryEntityName + " e %s ORDER BY e.timeStamp ASC, "
+ "e.sequenceNumber ASC, e.aggregateIdentifier ASC",
- buildWhereClause(params)))
+ buildWhereClause(params)
+ )
+ )
.setMaxResults(batchSize);
for (Map.Entry<String, Object> entry : params.entrySet()) {
Object value = entry.getValue();

0 comments on commit 29525e8

Please sign in to comment.