Skip to content

Commit

Permalink
If snapshotFilter set delegate to the super.readEvents to read snapsh…
Browse files Browse the repository at this point in the history
…ots in a separate Axon Server call.

Fixes #2285
  • Loading branch information
MGathier committed Jul 14, 2022
1 parent 6445c1d commit 24b62d2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ protected Optional<DomainEventMessage<?>> handleSnapshotReadingError(String aggr

@Override
public DomainEventStream readEvents(String aggregateIdentifier) {
if (Objects.equals(storageEngine().eventSerializer, storageEngine().snapshotSerializer)) {
if (!storageEngine().snapshotFilterSet && Objects.equals(storageEngine().eventSerializer, storageEngine().snapshotSerializer)) {
return storageEngine().readEventsWithAutoSnapshot(aggregateIdentifier, storageEngine().eventSerializer);
}
return super.readEvents(aggregateIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.axonframework.serialization.upcasting.event.ContextAwareSingleEventUpcaster;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.IntermediateEventRepresentation;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.junit.jupiter.api.*;

import java.util.ArrayList;
Expand Down Expand Up @@ -418,6 +419,49 @@ void testReadEventsWithMagicSequenceNumberAndSnapshotFilterSetIgnoresSnapshots()
assertFalse(resultStream.hasNext());
}

@Test
void testReadEventsWithSnapshotFilterAndSameSerializerSetReadsSnapshot() {
XStreamSerializer serializer = TestSerializer.xStreamSerializer();
testSubject = AxonServerEventStore.builder()
.configuration(config)
.platformConnectionManager(axonServerConnectionManager)
.upcasterChain(upcasterChain)
.eventSerializer(serializer)
.snapshotSerializer(serializer)
.snapshotFilter(SnapshotFilter.allowAll())
.build();

Map<String, String> testMetaData = Collections.singletonMap("key", "value");
testSubject.storeSnapshot(
new GenericDomainEventMessage<>(AGGREGATE_TYPE, AGGREGATE_ID, 1, "Snapshot1", testMetaData)
);
testSubject.publish(new GenericDomainEventMessage<>(AGGREGATE_TYPE, AGGREGATE_ID, 0, "Test1", testMetaData),
new GenericDomainEventMessage<>(AGGREGATE_TYPE, AGGREGATE_ID, 1, "Test2", testMetaData),
new GenericDomainEventMessage<>(AGGREGATE_TYPE, AGGREGATE_ID, 2, "Test3", testMetaData));

// Snapshot storage is async, so we need to make sure the first event through "readEvents" is the snapshot
assertWithin(2, TimeUnit.SECONDS, () -> {
DomainEventStream snapshotValidationStream = testSubject.readEvents(AGGREGATE_ID);
assertTrue(snapshotValidationStream.hasNext());
assertEquals("Snapshot1", snapshotValidationStream.next().getPayload());
});

DomainEventStream resultStream = testSubject.readEvents(AGGREGATE_ID);

assertTrue(resultStream.hasNext());
DomainEventMessage<?> firstResultEvent = resultStream.next();
assertEquals("Snapshot1", firstResultEvent.getPayload());
assertTrue(firstResultEvent.getMetaData().containsKey("key"));
assertTrue(firstResultEvent.getMetaData().containsValue("value"));

assertTrue(resultStream.hasNext());
DomainEventMessage<?> thirdResultEvent = resultStream.next();
assertEquals("Test3", thirdResultEvent.getPayload());
assertTrue(thirdResultEvent.getMetaData().containsKey("key"));
assertTrue(thirdResultEvent.getMetaData().containsValue("value"));

assertFalse(resultStream.hasNext());
}
@Test
void testRethrowStatusRuntimeExceptionAsEventStoreExceptionIfNotOfTypeUnknown() {
String testAggregateId = AGGREGATE_ID;
Expand Down

0 comments on commit 24b62d2

Please sign in to comment.