Skip to content

Commit

Permalink
Add Builder pattern for AbstractEventStore and implementations
Browse files Browse the repository at this point in the history
-Add abstract Builder pattern for AbstractEventStore
-Add Builder pattern for EmbeddedEventStore
-Add Builder pattern for AxonServerEventStore
-Change usages of constructor to builder
-Reindent touched files
-Update axon-4-api-changes.md

#754
  • Loading branch information
smcvb committed Oct 3, 2018
1 parent bb18f9d commit 616e487
Show file tree
Hide file tree
Showing 27 changed files with 634 additions and 269 deletions.
4 changes: 3 additions & 1 deletion axon-4-api-changes.md
Expand Up @@ -138,4 +138,6 @@ the other Builder implementations introduced. This entails the following changes
- org.axonframework.mongo.eventsourcing.eventstore.MongoEventStorageEngine
- org.axonframework.eventsourcing.AbstractSnapshotter
- org.axonframework.eventsourcing.AggregateSnapshotter
- org.axonframework.spring.eventsourcing.SpringAggregateSnapshotter
- org.axonframework.spring.eventsourcing.SpringAggregateSnapshotter
- org.axonframework.eventsourcing.eventstore.AbstractEventStore
- org.axonframework.eventsourcing.eventstore.EmbeddedEventStore

Large diffs are not rendered by default.

Expand Up @@ -24,17 +24,14 @@
import org.axonframework.messaging.Message;
import org.axonframework.messaging.unitofwork.DefaultUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;

public class AxonServerEventStoreTest {

Expand All @@ -48,8 +45,10 @@ public void setUp() throws Exception {
AxonServerConfiguration config = AxonServerConfiguration.newBuilder("localhost:6123", "JUNIT")
.flowControl(2, 1, 1)
.build();
PlatformConnectionManager platformConnectionManager = new PlatformConnectionManager(config);
testSubject = new AxonServerEventStore(config, platformConnectionManager, XStreamSerializer.builder().build());
testSubject = AxonServerEventStore.builder()
.configuration(config)
.platformConnectionManager(new PlatformConnectionManager(config))
.build();
}

@After
Expand All @@ -68,7 +67,7 @@ public void testPublishAndConsumeEvents() throws Exception {
TrackingEventStream stream = testSubject.openStream(null);

List<String> received = new ArrayList<>();
while(stream.hasNextAvailable(100, TimeUnit.MILLISECONDS)) {
while (stream.hasNextAvailable(100, TimeUnit.MILLISECONDS)) {
received.add(stream.nextAvailable().getPayload().toString());
}
stream.close();
Expand Down
Expand Up @@ -447,7 +447,10 @@ public Configurer configureEmbeddedEventStore(Function<Configuration, EventStora
return configureEventStore(c -> {
MessageMonitor<Message<?>> monitor =
messageMonitorFactoryComponent.get().apply(EmbeddedEventStore.class, "eventStore");
EmbeddedEventStore eventStore = new EmbeddedEventStore(storageEngineBuilder.apply(c), monitor);
EmbeddedEventStore eventStore = EmbeddedEventStore.builder()
.storageEngine(storageEngineBuilder.apply(c))
.messageMonitor(monitor)
.build();
c.onShutdown(eventStore::shutDown);
return eventStore;
});
Expand Down
Expand Up @@ -16,6 +16,7 @@

package org.axonframework.eventsourcing.eventstore;

import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.AbstractEventBus;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
Expand All @@ -29,10 +30,13 @@
import java.util.Optional;
import java.util.stream.Stream;

import static org.axonframework.common.BuilderUtils.assertNonNull;

/**
* Abstract implementation of an {@link EventStore} that uses a {@link EventStorageEngine} to store and load events.
*
* @author Rene de Waele
* @since 3.0
*/
public abstract class AbstractEventStore extends AbstractEventBus implements EventStore {

Expand All @@ -41,24 +45,17 @@ public abstract class AbstractEventStore extends AbstractEventBus implements Eve
private final EventStorageEngine storageEngine;

/**
* Initializes an event store with given {@code storageEngine} and {@link NoOpMessageMonitor}.
*
* @param storageEngine The storage engine used to store and load events
*/
protected AbstractEventStore(EventStorageEngine storageEngine) {
this(storageEngine, NoOpMessageMonitor.INSTANCE);
}

/**
* Initialize an event store with given {@code storageEngine} and {@code messageMonitor}.
* Instantiate an {@link AbstractEventStore} based on the fields contained in the {@link Builder}.
* <p>
* Will assert that the {@link EventStorageEngine} is not {@code null}, and will throw an
* {@link AxonConfigurationException} if it is {@code null}.
*
* @param storageEngine The storage engine used to store and load events
* @param messageMonitor The monitor used to record event publications
* @param builder the {@link Builder} used to instantiate a {@link AbstractEventStore} instance
*/
protected AbstractEventStore(EventStorageEngine storageEngine,
MessageMonitor<? super EventMessage<?>> messageMonitor) {
super(messageMonitor);
this.storageEngine = storageEngine;
protected AbstractEventStore(Builder builder) {
super(builder.messageMonitor);
builder.validate();
this.storageEngine = builder.storageEngine;
}

@Override
Expand Down Expand Up @@ -110,6 +107,7 @@ public DomainEventStream readEvents(String aggregateIdentifier) {
* @param aggregateIdentifier The identifier of the aggregate for which an snapshot failed to load
* @param e The exception or error that occurred while loading or deserializing the snapshot
* @return An optional DomainEventMessage to use as the snapshot for this aggregate
*
* @throws RuntimeException any runtimeException to fail loading the
*/
protected Optional<DomainEventMessage<?>> handleSnapshotReadingError(String aggregateIdentifier, Throwable e) {
Expand Down Expand Up @@ -179,4 +177,50 @@ public TrackingToken createHeadToken() {
public TrackingToken createTokenAt(Instant dateTime) {
return storageEngine.createTokenAt(dateTime);
}

/**
* Abstract Builder class to instantiate an {@link AbstractEventStore}.
* <p>
* The {@link MessageMonitor} is defaulted to an {@link NoOpMessageMonitor}. The {@link EventStorageEngine} is a
* <b>hard requirement</b> and as such should be provided.
*/
public abstract static class Builder {

protected EventStorageEngine storageEngine;
private MessageMonitor<? super EventMessage<?>> messageMonitor = NoOpMessageMonitor.INSTANCE;

/**
* Sets the {@link EventStorageEngine} used to store and load events.
*
* @param storageEngine the {@link EventStorageEngine} used to store and load events
* @return the current Builder instance, for a fluent interfacing
*/
public Builder storageEngine(EventStorageEngine storageEngine) {
assertNonNull(storageEngine, "EventStorageEngine may not be null");
this.storageEngine = storageEngine;
return this;
}

/**
* Sets the {@link MessageMonitor} to monitor {@link EventMessage}s. Defaults to a {@link NoOpMessageMonitor}.
*
* @param messageMonitor The message monitor used to monitor query messages
* @return the current Builder instance, for fluent interfacing
*/
public Builder messageMonitor(MessageMonitor<? super EventMessage<?>> messageMonitor) {
assertNonNull(messageMonitor, "MessageMonitor may not be null");
this.messageMonitor = messageMonitor;
return this;
}

/**
* Validate whether the fields contained in this Builder are set accordingly.
*
* @throws AxonConfigurationException if one field is asserted to be incorrect according to the Builder's
* specifications
*/
protected void validate() throws AxonConfigurationException {
assertNonNull(storageEngine, "The EventStorageEngine is a hard requirement and should be provided");
}
}
}

0 comments on commit 616e487

Please sign in to comment.