Skip to content
Permalink
Browse files

implement stateful event-sourced sagas, add some debug info for

 VersionMismatchException, add alternative constructors for
 CommandHandlers/Sagas, fixed wrong read-only transaction property
 ordering for JdbcStoreProvider. EmptyStreamException now will not be
 thrown on querying aggregate store. ZoneDateTime now correctly
 deserialized by Jackson Parser.
  • Loading branch information...
egetman committed Oct 10, 2019
1 parent f5a5d40 commit d9f2f9b10850bfcde3c8be026586600cb96b9fe5
Showing with 486 additions and 66 deletions.
  1. +3 −7 demo/write-part/src/main/java/store/jesframework/writemodel/handler/OrderHandler.java
  2. +8 −1 pom.xml
  3. +35 −11 src/main/java/store/jesframework/AggregateStore.java
  4. +1 −6 src/main/java/store/jesframework/JEventStore.java
  5. +43 −0 src/main/java/store/jesframework/common/ContextUpdated.java
  6. +2 −2 src/main/java/store/jesframework/common/Failure.java
  7. +7 −2 src/main/java/store/jesframework/ex/VersionMismatchException.java
  8. +8 −0 src/main/java/store/jesframework/handler/CommandHandler.java
  9. +22 −11 src/main/java/store/jesframework/provider/InMemoryStoreProvider.java
  10. +4 −2 src/main/java/store/jesframework/provider/JdbcStoreProvider.java
  11. +1 −1 src/main/java/store/jesframework/provider/JpaStoreProvider.java
  12. +29 −0 src/main/java/store/jesframework/reactors/ReactorUtils.java
  13. +160 −4 src/main/java/store/jesframework/reactors/Saga.java
  14. +2 −0 src/main/java/store/jesframework/serializer/JacksonSerializer.java
  15. +0 −2 src/main/java/store/jesframework/snapshot/SnapshotReader.java
  16. +8 −7 src/test/java/store/jesframework/JEventStoreTest.java
  17. +21 −4 src/test/java/store/jesframework/handler/CommandHandlerTest.java
  18. +1 −1 src/test/java/store/jesframework/handler/HandlerUtilsTest.java
  19. +100 −0 src/test/java/store/jesframework/reactors/StatefullSagaTest.java
  20. +21 −0 src/test/java/store/jesframework/snapshot/JdbcSnapshotProviderTest.java
  21. +10 −5 src/test/java/store/jesframework/snapshot/SnapshotProviderTest.java
@@ -5,25 +5,21 @@

import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;
import store.jesframework.AggregateStore;
import store.jesframework.JEventStore;
import store.jesframework.bus.CommandBus;
import store.jesframework.handler.CommandHandler;
import store.jesframework.handler.Handle;
import store.jesframework.writemodel.command.PlaceOrder;
import store.jesframework.writemodel.domain.Item;
import store.jesframework.writemodel.event.OrderPlaced;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class OrderHandler extends CommandHandler {

private final AggregateStore aggregateStore;

public OrderHandler(@Nonnull JEventStore store, @Nonnull CommandBus bus, @Nonnull AggregateStore aggregateStore) {
super(store, bus);
this.aggregateStore = aggregateStore;
public OrderHandler(@Nonnull AggregateStore aggregateStore, @Nonnull CommandBus bus) {
super(aggregateStore, bus);
}

@Handle
@@ -53,9 +53,10 @@
<lombok.version>1.18.10</lombok.version>
<logback.version>1.2.3</logback.version>
<jackson.version>2.9.10</jackson.version>
<jackson.databind.version>2.9.10</jackson.databind.version>
<redisson.version>3.11.3</redisson.version>
<byte-buddy.version>1.10.1</byte-buddy.version>
<jackson.databind.version>2.9.10</jackson.databind.version>
<java-uuid-generator.version>3.2.0</java-uuid-generator.version>
<javax.persistence-api.version>2.2</javax.persistence-api.version>

<!-- test dependencies versions -->
@@ -139,6 +140,12 @@
<version>${byte-buddy.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.uuid</groupId>
<artifactId>java-uuid-generator</artifactId>
<version>${java-uuid-generator.version}</version>
</dependency>

<!-- for distributed locks/offset -->
<dependency>
<groupId>org.redisson</groupId>
@@ -4,7 +4,6 @@
import java.util.UUID;
import javax.annotation.Nonnull;

import store.jesframework.ex.EmptyEventStreamException;
import store.jesframework.snapshot.AlwaysSnapshotStrategy;
import store.jesframework.snapshot.NoopSnapshotProvider;
import store.jesframework.snapshot.SnapshotProvider;
@@ -38,28 +37,53 @@ public AggregateStore(@Nonnull JEventStore eventStore, @Nonnull SnapshotProvider
}

/**
* Returns specified aggregate of type {@code type} with restored state from {@link JEventStore}.
* Note: if {@link SnapshotProvider} was specified during {@link AggregateStore} initialization, snapshotting
* (aggregate caching) will be performed on {@code this#readBy(UUID, Class)} calls based on {@link SnapshotProvider}
* implementation.
* This method provide the ability to retrieve the delegate instance.
*
* @param uuid identifier of event stream (uuid) to read.
* @param type class of aggregate to load
* @param <T> type of aggregate.
* @return the underlying event store.
*/
public JEventStore unwrap() {
return eventStore;
}

/**
* Returns specified aggregate of type {@code type} with restored state from {@link JEventStore}. Note: if {@link
* SnapshotProvider} was specified during {@link AggregateStore} initialization, snapshotting (aggregate caching)
* will be performed on {@code this#readBy(UUID, Class)} calls based on {@link SnapshotStrategy} implementation.
*
* @param uuid identifier of the event stream (uuid) to read.
* @param type class of the aggregate to load.
* @param <T> type of the aggregate.
* @return recreated/restored form {@link JEventStore} aggregate instance.
* @throws NullPointerException if any of {@code uuid}/{@code type} is null.
* @throws EmptyEventStreamException if no event stream found by given {@code uuid}.
*/
@Nonnull
public <T extends Aggregate> T readBy(@Nonnull UUID uuid, @Nonnull Class<T> type) {
final T aggregate = snapshotProvider.initialStateOf(uuid, type);
return readBy(uuid, aggregate);
}

/**
* Returns specified aggregate with refreshed state from {@link JEventStore}. Note: if {@link SnapshotProvider} was
* specified during {@link AggregateStore} initialization, snapshotting (aggregate caching) will be performed on
* {@code this#readBy(UUID, Class)} calls based on {@link SnapshotStrategy} implementation.
*
* @param uuid identifier of the event stream (uuid) to read.
* @param aggregate instance of the aggregate to refresh.
* @param <T> type of the aggregate.
* @return refreshed form {@link JEventStore} aggregate instance.
* @throws NullPointerException if any of {@code uuid}/{@code type} is null.
*/
@Nonnull
public <T extends Aggregate> T readBy(@Nonnull UUID uuid, @Nonnull T aggregate) {
final Collection<Event> events = eventStore.readBy(uuid, aggregate.streamVersion());
if (events.isEmpty()) {
return aggregate;
}
aggregate.handleEventStream(events);
return snapshotStrategy.isSnapshotNecessary(aggregate, events) ? snapshotProvider.snapshot(aggregate) :
aggregate;
if (snapshotStrategy.isSnapshotNecessary(aggregate, events)) {
return snapshotProvider.snapshot(aggregate);
}
return aggregate;
}

/**
@@ -6,10 +6,8 @@
import java.util.stream.Stream;
import javax.annotation.Nonnull;

import store.jesframework.ex.EmptyEventStreamException;
import store.jesframework.provider.StoreProvider;
import store.jesframework.snapshot.SnapshotReader;
import store.jesframework.util.Check;

import static java.util.Objects.requireNonNull;

@@ -53,12 +51,9 @@ public JEventStore(@Nonnull StoreProvider provider) {
* @param uuid identifier of event uuid to read.
* @return {@link Collection} of events stored in that {@literal EventStore}, grouped by {@literal uuid}.
* @throws NullPointerException if uuid is null.
* @throws EmptyEventStreamException if event stream with given {@code uuid} not found.
*/
public Collection<Event> readBy(@Nonnull UUID uuid) {
final Collection<Event> events = provider.readBy(requireNonNull(uuid, NON_NULL_UUID));
Check.nonEmpty(events, () -> new EmptyEventStreamException("Event stream with uuid " + uuid + " not found"));
return events;
return provider.readBy(requireNonNull(uuid, NON_NULL_UUID));
}

Collection<Event> readBy(@Nonnull UUID uuid, long skip) {
@@ -0,0 +1,43 @@
package store.jesframework.common;

import java.beans.ConstructorProperties;
import java.util.Objects;
import java.util.UUID;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import store.jesframework.Event;

@ToString
@EqualsAndHashCode
public class ContextUpdated implements Event {

private final UUID uuid;
@Getter
private final String key;
@Getter
private final Object value;
private final long version;

@ConstructorProperties({"uuid", "key", "value", "version"})
public ContextUpdated(@Nonnull UUID uuid, @Nonnull String key, @Nullable Object value, long version) {
this.uuid = Objects.requireNonNull(uuid, "UUID must not be null");
this.key = Objects.requireNonNull(key, "Key must not be null");
this.value = value;
this.version = version;
}

@Nullable
@Override
public UUID uuid() {
return uuid;
}

@Override
public long expectedStreamVersion() {
return version;
}
}
@@ -1,7 +1,7 @@
package store.jesframework.common;

import java.beans.ConstructorProperties;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.util.Objects;
import java.util.UUID;
import javax.annotation.Nonnull;
@@ -29,7 +29,7 @@
/**
* A date-time of the incident. Default is #now().
*/
private final LocalDateTime when = LocalDateTime.now();
private final ZonedDateTime when = ZonedDateTime.now();

/**
* Constructor for {@literal Failure} event.
@@ -1,10 +1,15 @@
package store.jesframework.ex;

import java.util.UUID;

import static java.lang.String.format;

public class VersionMismatchException extends RuntimeException {

public VersionMismatchException(long expected, long actual) {
super(format("Event uuid version mismatch. Expected version: [%d] Actual version: [%d]", expected, actual));
private static final String TEMPLATE = "Event stream version mismatch. Expected version: [%d] actual version: "
+ "[%d] for stream %s";

public VersionMismatchException(UUID uuid, long expected, long actual) {
super(format(TEMPLATE, expected, actual, uuid));
}
}
@@ -8,6 +8,7 @@
import java.util.function.Consumer;
import javax.annotation.Nonnull;

import store.jesframework.AggregateStore;
import store.jesframework.Command;
import store.jesframework.JEventStore;
import store.jesframework.bus.CommandBus;
@@ -23,6 +24,13 @@
public abstract class CommandHandler {

protected final JEventStore store;
// for cases when you need some validation against aggregate state
protected AggregateStore aggregateStore;

public CommandHandler(@Nonnull AggregateStore aggregateStore, @Nonnull CommandBus bus) {
this(aggregateStore.unwrap(), bus);
this.aggregateStore = aggregateStore;
}

public CommandHandler(@Nonnull JEventStore store, @Nonnull CommandBus bus) {
Objects.requireNonNull(bus, "Command bus must not be null");
@@ -7,6 +7,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
@@ -21,6 +23,7 @@

private final List<Event> events = new CopyOnWriteArrayList<>();
private final Map<UUID, LongAdder> streamsVersions = new ConcurrentHashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();

@Override
public Stream<Event> readFrom(long offset) {
@@ -33,20 +36,28 @@
return events.stream().filter(event -> uuid.equals(event.uuid())).collect(Collectors.toList());
}

// todo: need to make it thread safe to write versioned events
// not optimal exclusive write
@Override
public void write(@Nonnull Event event) {
if (event.uuid() != null) {
streamsVersions.putIfAbsent(event.uuid(), new LongAdder());
final LongAdder actualVersion = streamsVersions.get(event.uuid());
final long expectedVersion = event.expectedStreamVersion();
if (expectedVersion != -1 && actualVersion.longValue() != expectedVersion) {
throw new VersionMismatchException(expectedVersion, actualVersion.longValue());
try {
lock.writeLock().lock();
if (event.uuid() != null) {
final long expectedVersion = event.expectedStreamVersion();
// check current event stream version
if (expectedVersion != -1) {
final LongAdder actual = streamsVersions.computeIfAbsent(event.uuid(), uuid -> new LongAdder());
if (actual.longValue() != expectedVersion) {
throw new VersionMismatchException(event.uuid(), expectedVersion, actual.longValue());
}
}
}
}
events.add(event);
if (event.uuid() != null) {
streamsVersions.get(event.uuid()).increment();
events.add(event);
// update written event stream version
if (event.uuid() != null) {
streamsVersions.computeIfAbsent(event.uuid(), uuid -> new LongAdder()).increment();
}
} finally {
lock.writeLock().unlock();
}
}

@@ -102,8 +102,9 @@ private void createEventStore(@Nonnull Connection connection, @Nonnull String dd
private Stream<Event> readBy(@Nonnull String from, @Nonnull Object... values) {
final Connection connection = createConnection(dataSource);
try {
connection.setReadOnly(true);
// order of calls matters
connection.setAutoCommit(false);
connection.setReadOnly(true);

final PreparedStatement statement = connection.prepareStatement(from, TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
statement.setFetchSize(FETCH_SIZE);
@@ -200,7 +201,7 @@ private void verifyStreamVersion(Event event, Connection connection) {
final long actualVersion = resultSet.getLong(1);
if (expectedVersion != actualVersion) {
log.error("Version mismatch detected for {}", event);
throw new VersionMismatchException(expectedVersion, actualVersion);
throw new VersionMismatchException(uuid, expectedVersion, actualVersion);
}
}
}
@@ -317,6 +318,7 @@ public void close() {
private ResultSetIterator createIterator(@Nonnull String from, long offset) {
final Connection connection = createConnection(dataSource);
try {
// order of calls matters
connection.setAutoCommit(false);
connection.setReadOnly(true);

@@ -131,7 +131,7 @@ public void write(@Nonnull Event event) {
});

if (expectedVersion != actualVersion) {
throw new VersionMismatchException(expectedVersion, actualVersion);
throw new VersionMismatchException(uuid, expectedVersion, actualVersion);
}
}

@@ -2,16 +2,38 @@

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.security.NoSuchAlgorithmException;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nonnull;

import com.fasterxml.uuid.impl.NameBasedGenerator;

import lombok.extern.slf4j.Slf4j;
import store.jesframework.Event;
import store.jesframework.ex.BrokenReactorException;
import store.jesframework.util.Check;

import static com.fasterxml.uuid.Generators.nameBasedGenerator;
import static com.fasterxml.uuid.impl.NameBasedGenerator.NAMESPACE_DNS;
import static java.security.MessageDigest.getInstance;

@Slf4j
class ReactorUtils {

private static final NameBasedGenerator GENERATOR;

static {
try {
GENERATOR = nameBasedGenerator(NAMESPACE_DNS, getInstance("SHA-1"));
} catch (NoSuchAlgorithmException e) {
log.error("Failed to find sha-1 impl: ", e);
throw new IllegalStateException(e);
}
}

private ReactorUtils() {}

@Nonnull
@@ -58,4 +80,11 @@ static void invokeReactsOn(@Nonnull Method method, @Nonnull Object source, @Nonn
}
}

/**
* Method for generating name-based UUIDs using specified name (serialized to bytes using UTF-8 encoding).
*/
static UUID uuidByKey(@Nonnull String key) {
Objects.requireNonNull(key, "Key must not be null");
return GENERATOR.generate(key);
}
}

0 comments on commit d9f2f9b

Please sign in to comment.
You can’t perform that action at this time.