Permalink
Browse files

Added duplicate key exception checks on snapshot appending

Similar to inserting domain events, appending a snapshot event when an entry
already exists with the same aggregate identifier and sequence number will
throw a ConcurrencyException.
The snapshotter catches this exception and logs a single line on info level,
instead of logging the entire stacktrace.

Issue #AXON-248 Fixed
  • Loading branch information...
1 parent d5f8526 commit 448f21218f13467f67d5216cccb67c1fc977e78f @abuijze abuijze committed Jul 16, 2014
@@ -192,13 +192,13 @@ public void run() {
try {
snapshotterTask.run();
} catch (ConcurrencyException e) {
- logger.info("Failed to insert a snapshot. A snapshot entry already exists.");
+ logger.info("An up-to-date snapshot entry already exists, ignoring this attempts.");
} catch (RuntimeException e) {
if (logger.isDebugEnabled()) {
logger.warn("An attempt to create and store a snapshot resulted in an exception:", e);
} else {
logger.warn("An attempt to create and store a snapshot resulted in an exception. "
- + "Exception summary: {}", e);
+ + "Exception summary: {}", e.getMessage());
}
}
}
@@ -242,8 +242,21 @@ public void appendSnapshotEvent(String type, DomainEventMessage snapshotEvent) {
// an aggregate, which may occur when a READ_UNCOMMITTED transaction isolation level is used.
SerializedObject<byte[]> serializedPayload = serializer.serializePayload(snapshotEvent, byte[].class);
SerializedObject<byte[]> serializedMetaData = serializer.serializeMetaData(snapshotEvent, byte[].class);
- eventEntryStore.persistSnapshot(type, snapshotEvent, serializedPayload, serializedMetaData);
-
+ try {
+ eventEntryStore.persistSnapshot(type, snapshotEvent, serializedPayload, serializedMetaData);
+ } catch (RuntimeException exception) {
+ if (persistenceExceptionResolver != null
+ && persistenceExceptionResolver.isDuplicateKeyViolation(exception)) {
+ //noinspection ConstantConditions
+ throw new ConcurrencyException(
+ String.format("A snapshot for aggregate [%s] at sequence: [%s] was already inserted",
+ snapshotEvent.getAggregateIdentifier(),
+ snapshotEvent.getSequenceNumber()),
+ exception
+ );
+ }
+ throw exception;
+ }
if (maxSnapshotsArchived > 0) {
eventEntryStore.pruneSnapshots(type, snapshotEvent, maxSnapshotsArchived);
}
@@ -18,6 +18,7 @@
import org.axonframework.common.Assert;
import org.axonframework.common.io.IOUtils;
+import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.DomainEventStream;
@@ -86,7 +87,7 @@
private int batchSize = DEFAULT_BATCH_SIZE;
private UpcasterChain upcasterChain = SimpleUpcasterChain.EMPTY;
private int maxSnapshotsArchived = DEFAULT_MAX_SNAPSHOTS_ARCHIVED;
- private org.axonframework.common.jdbc.PersistenceExceptionResolver persistenceExceptionResolver;
+ private PersistenceExceptionResolver persistenceExceptionResolver;
/**
* Initialize a JpaEventStore using an {@link org.axonframework.serializer.xml.XStreamSerializer}, which
@@ -199,12 +200,12 @@ public DomainEventStream readEvents(String type, Object identifier) {
logger.warn("Error while reading snapshot event entry. "
+ "Reconstructing aggregate on entire event stream. Caused by: {} {}",
ex.getClass().getName(),
- ex.getMessage() );
+ ex.getMessage());
} catch (LinkageError error) {
logger.warn("Error while reading snapshot event entry. "
+ "Reconstructing aggregate on entire event stream. Caused by: {} {}",
error.getClass().getName(),
- error.getMessage() );
+ error.getMessage());
}
}
@@ -251,11 +252,25 @@ public void appendSnapshotEvent(String type, DomainEventMessage snapshotEvent) {
// an aggregate, which may occur when a READ_UNCOMMITTED transaction isolation level is used.
SerializedObject<byte[]> serializedPayload = serializer.serializePayload(snapshotEvent, byte[].class);
SerializedObject<byte[]> serializedMetaData = serializer.serializeMetaData(snapshotEvent, byte[].class);
- eventEntryStore.persistSnapshot(type, snapshotEvent, serializedPayload, serializedMetaData, entityManager);
+ try {
+ eventEntryStore.persistSnapshot(type, snapshotEvent, serializedPayload, serializedMetaData, entityManager);
+ if (maxSnapshotsArchived > 0) {
+ eventEntryStore.pruneSnapshots(type, snapshotEvent, maxSnapshotsArchived,
+ entityManagerProvider.getEntityManager());
+ }
- if (maxSnapshotsArchived > 0) {
- eventEntryStore.pruneSnapshots(type, snapshotEvent, maxSnapshotsArchived,
- entityManagerProvider.getEntityManager());
+ entityManager.flush();
+ } catch (RuntimeException exception) {
+ if (snapshotEvent != null
+ && persistenceExceptionResolver != null
+ && persistenceExceptionResolver.isDuplicateKeyViolation(exception)) {
+ throw new ConcurrencyException(
+ String.format("A snapshot for aggregate [%s] at sequence: [%s] was already inserted",
+ snapshotEvent.getAggregateIdentifier(),
+ snapshotEvent.getSequenceNumber()),
+ exception);
+ }
+ throw exception;
}
}
@@ -293,7 +308,7 @@ private void doVisitEvents(EventVisitor visitor, String whereClause, Map<String,
* Registers the data source that allows the EventStore to detect the database type and define the error codes that
* represent concurrent access failures.
* <p/>
- * Should not be used in combination with {@link #setPersistenceExceptionResolver(org.axonframework.common.jdbc.PersistenceExceptionResolver)},
+ * Should not be used in combination with {@link #setPersistenceExceptionResolver(PersistenceExceptionResolver)},
* but rather as a shorthand alternative for most common database types.
*
* @param dataSource A data source providing access to the backing database
@@ -311,8 +326,7 @@ public void setDataSource(DataSource dataSource) throws SQLException {
* @param persistenceExceptionResolver the persistenceExceptionResolver that will help detect concurrency
* exceptions
*/
- public void setPersistenceExceptionResolver(
- org.axonframework.common.jdbc.PersistenceExceptionResolver persistenceExceptionResolver) {
+ public void setPersistenceExceptionResolver(PersistenceExceptionResolver persistenceExceptionResolver) {
this.persistenceExceptionResolver = persistenceExceptionResolver;
}
@@ -348,6 +362,14 @@ public void setMaxSnapshotsArchived(int maxSnapshotsArchived) {
this.maxSnapshotsArchived = maxSnapshotsArchived;
}
+ private static class NoOpPersistenceExceptionResolver implements PersistenceExceptionResolver {
+
+ @Override
+ public boolean isDuplicateKeyViolation(Exception exception) {
+ return false;
+ }
+ }
+
private final class CursorBackedDomainEventStream implements DomainEventStream, Closeable {
private Iterator<DomainEventMessage> currentBatch;
@@ -17,14 +17,26 @@
package org.axonframework.eventsourcing;
import org.axonframework.common.ReflectionUtils;
+import org.axonframework.domain.GenericDomainEventMessage;
+import org.axonframework.domain.SimpleDomainEventStream;
+import org.axonframework.domain.StubAggregate;
+import org.axonframework.eventstore.EventStore;
import org.junit.*;
import org.junit.runner.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallbackWithoutResult;
+import org.springframework.transaction.support.TransactionTemplate;
+import java.util.Arrays;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
@@ -39,6 +51,13 @@
@Qualifier("inThreadsnapshotter")
private SpringAggregateSnapshotter snapshotter;
+ @Autowired
+ @Qualifier("eventStore")
+ private EventStore eventStore;
+
+ @Autowired
+ private PlatformTransactionManager transactionManager;
+
@SuppressWarnings({"unchecked"})
@Test
public void testSnapshotterKnowsAllFactories() throws NoSuchFieldException {
@@ -47,4 +66,25 @@ public void testSnapshotterKnowsAllFactories() throws NoSuchFieldException {
assertFalse("No snapshotters found", snapshotters.isEmpty());
}
+
+ @Test
+ public void testDuplicateSnapshotIsIgnored() throws Exception {
+ final ExecutorService executor = Executors.newFixedThreadPool(2);
+ snapshotter.setAggregateFactories(Arrays.<AggregateFactory<?>>asList(new GenericAggregateFactory<StubAggregate>(StubAggregate.class)));
+ new TransactionTemplate(transactionManager).execute(new TransactionCallbackWithoutResult() {
+ @Override
+ protected void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
+ eventStore.appendEvents("StubAggregate", new SimpleDomainEventStream(new GenericDomainEventMessage("id1", 0, "Payload1"),
+ new GenericDomainEventMessage("id1", 1, "Payload2")));
+ }
+ });
+ try {
+ snapshotter.setExecutor(executor);
+ snapshotter.scheduleSnapshot("StubAggregate", "id1");
+ snapshotter.scheduleSnapshot("StubAggregate", "id1");
+ } finally {
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ }
}
@@ -277,6 +277,19 @@ public void testLoadWithSnapshotEvent() {
assertEquals(2, domainEvents.size());
}
+ @Transactional
+ @Test
+ public void testInsertDuplicateSnapshot() throws Exception {
+ testSubject.appendSnapshotEvent("test", new GenericDomainEventMessage<String>("id1", 1, "test"));
+ try {
+ testSubject.appendSnapshotEvent("test", new GenericDomainEventMessage<String>("id1", 1, "test"));
+ fail("Expected concurrency exception");
+ } catch (ConcurrencyException e) {
+ assertTrue(e.getMessage().contains("snapshot"));
+ }
+ }
+
+
@Test(expected = EventStreamNotFoundException.class)
@Transactional
public void testLoadNonExistent() {
@@ -504,6 +504,18 @@ public void testLoadNonExistent() {
testSubject.readEvents("Stub", UUID.randomUUID());
}
+ @Transactional
+ @Test
+ public void testInsertDuplicateSnapshot() throws Exception {
+ testSubject.appendSnapshotEvent("test", new GenericDomainEventMessage<String>("id1", 1, "test"));
+ try {
+ testSubject.appendSnapshotEvent("test", new GenericDomainEventMessage<String>("id1", 1, "test"));
+ fail("Expected concurrency exception");
+ } catch (ConcurrencyException e) {
+ assertTrue(e.getMessage().contains("snapshot"));
+ }
+ }
+
@Test
@Transactional
public void testVisitAllEvents() {
@@ -191,7 +191,12 @@ public DomainEventStream readEvents(String type, Object identifier, long firstSe
public void appendSnapshotEvent(String type, DomainEventMessage snapshotEvent) {
final DBObject dbObject = storageStrategy.createDocuments(type, eventSerializer,
Collections.singletonList(snapshotEvent))[0];
- mongoTemplate.snapshotEventCollection().insert(dbObject);
+ try {
+ mongoTemplate.snapshotEventCollection().insert(dbObject);
+ } catch (MongoException.DuplicateKey e) {
+ throw new ConcurrencyException("Trying to insert a SnapshotEvent with aggregate identifier and sequence "
+ + "number that is already present in the Event Store", e);
+ }
if (logger.isDebugEnabled()) {
logger.debug("snapshot event of type {} appended.");
}
@@ -284,6 +284,17 @@ public void testLoadWithMultipleSnapshotEvents() {
assertEquals(2, domainEvents.size());
}
+ @Test
+ public void testInsertDuplicateSnapshot() throws Exception {
+ testSubject.appendSnapshotEvent("test", new GenericDomainEventMessage<String>("id1", 1, "test"));
+ try {
+ testSubject.appendSnapshotEvent("test", new GenericDomainEventMessage<String>("id1", 1, "test"));
+ fail("Expected concurrency exception");
+ } catch (ConcurrencyException e) {
+ assertTrue(e.getMessage().contains("Snapshot"));
+ }
+ }
+
@DirtiesContext
@Test(expected = EventStreamNotFoundException.class)
public void testLoadNonExistent() {

0 comments on commit 448f212

Please sign in to comment.