Skip to content

Commit

Permalink
Change event store API to allow for non-blocking event processing
Browse files Browse the repository at this point in the history
When a stream is opened a TrackingEventStream is returned (as opposed to a Java Stream) with blocking and non-blocking iterator methods.
  • Loading branch information
renedewaele committed May 12, 2016
1 parent 88fc2ce commit e0e152f
Show file tree
Hide file tree
Showing 42 changed files with 848 additions and 600 deletions.
Expand Up @@ -71,7 +71,7 @@ public class SpringAMQPEventBus extends AbstractEventBus implements Initializing
private long publisherAckTimeout;

@Override
protected void prepareCommit(List<EventMessage<?>> events) {
protected void prepareCommit(List<? extends EventMessage<?>> events) {
Channel channel = connectionFactory.createConnection().createChannel(isTransactional);
try {
if (waitForAck) {
Expand Down
Expand Up @@ -27,10 +27,9 @@
import org.axonframework.commandhandling.model.inspection.EventSourcedAggregate;
import org.axonframework.commandhandling.model.inspection.ModelInspector;
import org.axonframework.common.Assert;
import org.axonframework.common.PeekingIterator;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.eventstore.DomainEventStream;
import org.axonframework.eventstore.EventStore;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.slf4j.Logger;
Expand All @@ -40,7 +39,6 @@
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

/**
* Component of the DisruptorCommandBus that invokes the command handler. The execution is done within a Unit Of Work.
Expand Down Expand Up @@ -205,21 +203,16 @@ public Aggregate<T> load(String aggregateIdentifier) {
if (aggregateRoot == null) {
logger.debug("Aggregate {} not in first level cache, loading fresh one from Event Store",
aggregateIdentifier);
Stream<? extends DomainEventMessage<?>> events = eventStore.readEvents(aggregateIdentifier);
try {
events = decorator.decorateForRead(aggregateIdentifier, events);
PeekingIterator<? extends DomainEventMessage<?>> iterator = PeekingIterator.of(events.iterator());
if (!iterator.hasNext()) {
throw new AggregateNotFoundException(aggregateIdentifier,
"The aggregate was not found in the event store");
}
aggregateRoot = EventSourcedAggregate
.initialize(aggregateFactory.createAggregate(aggregateIdentifier, iterator.peek()),
model, eventStore);
aggregateRoot.initializeState(iterator);
} finally {
events.close();
DomainEventStream eventStream = eventStore.readEvents(aggregateIdentifier);
eventStream = decorator.decorateForRead(aggregateIdentifier, eventStream);
if (!eventStream.hasNext()) {
throw new AggregateNotFoundException(aggregateIdentifier,
"The aggregate was not found in the event store");
}
aggregateRoot = EventSourcedAggregate
.initialize(aggregateFactory.createAggregate(aggregateIdentifier, eventStream.peek()),
model, eventStore);
aggregateRoot.initializeState(eventStream);
firstLevelCache.put(aggregateRoot, PLACEHOLDER_VALUE);
cache.put(aggregateIdentifier, aggregateRoot);
}
Expand All @@ -228,8 +221,7 @@ public Aggregate<T> load(String aggregateIdentifier) {

@Override
public Aggregate<T> newInstance(Callable<T> factoryMethod) throws Exception {
EventSourcedAggregate<T> aggregate = EventSourcedAggregate.initialize(factoryMethod, model,
eventStore);
EventSourcedAggregate<T> aggregate = EventSourcedAggregate.initialize(factoryMethod, model, eventStore);
firstLevelCache.put(aggregate, PLACEHOLDER_VALUE);
cache.put(aggregate.identifier(), aggregate);
return aggregate;
Expand Down
Expand Up @@ -24,10 +24,10 @@
import org.axonframework.common.Assert;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.eventstore.DomainEventStream;
import org.axonframework.eventstore.EventStore;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
Expand All @@ -39,7 +39,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Stream;

import static java.lang.String.format;

Expand Down Expand Up @@ -125,8 +124,8 @@ public class DisruptorCommandBus implements CommandBus {
*
* @param eventStore The EventStore where generated events must be stored
*/
public DisruptorCommandBus(EventStore eventStore, EventBus eventBus) {
this(eventStore, eventBus, new DisruptorConfiguration());
public DisruptorCommandBus(EventStore eventStore) {
this(eventStore, new DisruptorConfiguration());
}

/**
Expand All @@ -137,7 +136,7 @@ public DisruptorCommandBus(EventStore eventStore, EventBus eventBus) {
* @param configuration The configuration for the command bus
*/
@SuppressWarnings("unchecked")
public DisruptorCommandBus(EventStore eventStore, EventBus eventBus, DisruptorConfiguration configuration) {
public DisruptorCommandBus(EventStore eventStore, DisruptorConfiguration configuration) {
Assert.notNull(eventStore, "eventStore may not be null");
Assert.notNull(configuration, "configuration may not be null");
Executor executor = configuration.getExecutor();
Expand All @@ -157,9 +156,9 @@ public DisruptorCommandBus(EventStore eventStore, EventBus eventBus, DisruptorCo
commandTargetResolver = configuration.getCommandTargetResolver();

// configure invoker Threads
commandHandlerInvokers = initializeInvokerThreads(eventStore, eventBus, configuration);
commandHandlerInvokers = initializeInvokerThreads(eventStore, configuration);
// configure publisher Threads
EventPublisher[] publishers = initializePublisherThreads(eventStore, configuration, executor,
EventPublisher[] publishers = initializePublisherThreads(configuration, executor,
transactionManager);
publisherCount = publishers.length;
disruptor.handleExceptionsWith(new ExceptionHandler());
Expand All @@ -170,8 +169,8 @@ public DisruptorCommandBus(EventStore eventStore, EventBus eventBus, DisruptorCo
disruptor.start();
}

private EventPublisher[] initializePublisherThreads(EventStore eventStore, DisruptorConfiguration configuration,
Executor executor, TransactionManager transactionManager) {
private EventPublisher[] initializePublisherThreads(DisruptorConfiguration configuration, Executor executor,
TransactionManager transactionManager) {
EventPublisher[] publishers = new EventPublisher[configuration.getPublisherThreadCount()];
for (int t = 0; t < publishers.length; t++) {
publishers[t] = new EventPublisher(executor, transactionManager,
Expand All @@ -180,8 +179,7 @@ private EventPublisher[] initializePublisherThreads(EventStore eventStore, Disru
return publishers;
}

private CommandHandlerInvoker[] initializeInvokerThreads(EventStore eventStore, EventBus eventBus,
DisruptorConfiguration configuration) {
private CommandHandlerInvoker[] initializeInvokerThreads(EventStore eventStore, DisruptorConfiguration configuration) {
CommandHandlerInvoker[] invokers;
invokers = new CommandHandlerInvoker[configuration.getInvokerThreadCount()];
for (int t = 0; t < invokers.length; t++) {
Expand Down Expand Up @@ -365,8 +363,8 @@ private static class NoOpEventStreamDecorator implements EventStreamDecorator {
public static final EventStreamDecorator INSTANCE = new NoOpEventStreamDecorator();

@Override
public Stream<? extends DomainEventMessage<?>> decorateForRead(String aggregateIdentifier,
Stream<? extends DomainEventMessage<?>> eventStream) {
public DomainEventStream decorateForRead(String aggregateIdentifier,
DomainEventStream eventStream) {
return eventStream;
}

Expand Down
92 changes: 0 additions & 92 deletions core/src/main/java/org/axonframework/common/PeekingIterator.java

This file was deleted.

Expand Up @@ -16,13 +16,15 @@
import java.util.function.Consumer;
import java.util.function.Function;

import static org.axonframework.messaging.unitofwork.UnitOfWork.Phase.*;

/**
* Base class for the Event Bus. In case events are published while a Unit of Work is active the Unit of Work root
* coordinates the timing and order of the publication.
* <p>
* This implementation of the {@link EventBus} directly forwards all published events (in the callers' thread) to
* subscribed event processors. Event processors are expected to implement asynchronous handling themselves or
* alternatively open an event stream using {@link #readEvents(TrackingToken)}.
* alternatively open an event stream using {@link #streamEvents(TrackingToken)}.
*
* @author Allard Buijze
* @author René de Waele
Expand Down Expand Up @@ -69,35 +71,35 @@ public Registration registerDispatchInterceptor(MessageDispatchInterceptor<Event
}

@Override
public void publish(List<EventMessage<?>> events) {
public void publish(List<? extends EventMessage<?>> events) {
if (CurrentUnitOfWork.isStarted()) {
UnitOfWork<?> unitOfWork = CurrentUnitOfWork.get();
Assert.state(!unitOfWork.phase().isAfter(UnitOfWork.Phase.PREPARE_COMMIT),
Assert.state(!unitOfWork.phase().isAfter(PREPARE_COMMIT),
"It is not allowed to publish events when the current Unit of Work has already been committed. " +
"Please start a new Unit of Work before publishing events.");
Assert.state(!unitOfWork.root().phase().isAfter(UnitOfWork.Phase.PREPARE_COMMIT),
Assert.state(!unitOfWork.root().phase().isAfter(PREPARE_COMMIT),
"It is not allowed to publish events when the root Unit of Work has already been committed.");

unitOfWork.getOrComputeResource(eventsKey, r -> {

List<EventMessage<?>> eventQueue = new ArrayList<>();

unitOfWork.onPrepareCommit(u -> {
if (u.parent().isPresent() && !u.root().phase().isAfter(UnitOfWork.Phase.PREPARE_COMMIT)) {
if (u.parent().isPresent() && !u.root().phase().isAfter(PREPARE_COMMIT)) {
u.root().onPrepareCommit(w -> doWithEvents(this::prepareCommit, intercept(eventQueue)));
} else {
doWithEvents(this::prepareCommit, intercept(eventQueue));
}
});
unitOfWork.onCommit(u -> {
if (u.parent().isPresent() && !u.root().phase().isAfter(UnitOfWork.Phase.COMMIT)) {
if (u.parent().isPresent() && !u.root().phase().isAfter(COMMIT)) {
u.root().onCommit(w -> doWithEvents(this::commit, eventQueue));
} else {
doWithEvents(this::commit, eventQueue);
}
});
unitOfWork.afterCommit(u -> {
if (u.parent().isPresent() && !u.root().phase().isAfter(UnitOfWork.Phase.AFTER_COMMIT)) {
if (u.parent().isPresent() && !u.root().phase().isAfter(AFTER_COMMIT)) {
u.root().afterCommit(w -> doWithEvents(this::afterCommit, eventQueue));
} else {
doWithEvents(this::afterCommit, eventQueue);
Expand All @@ -120,7 +122,7 @@ public void publish(List<EventMessage<?>> events) {
* @param events The original events being published
* @return The events to actually publish
*/
protected List<EventMessage<?>> intercept(List<EventMessage<?>> events) {
protected List<? extends EventMessage<?>> intercept(List<? extends EventMessage<?>> events) {
List<EventMessage<?>> preprocessedEvents = new ArrayList<>(events);
for (MessageDispatchInterceptor<EventMessage<?>> preprocessor : dispatchInterceptors) {
Function<Integer, EventMessage<?>> function = preprocessor.handle(preprocessedEvents);
Expand All @@ -131,7 +133,7 @@ protected List<EventMessage<?>> intercept(List<EventMessage<?>> events) {
return preprocessedEvents;
}

private void doWithEvents(Consumer<List<EventMessage<?>>> eventsConsumer, List<EventMessage<?>> events) {
private void doWithEvents(Consumer<List<? extends EventMessage<?>>> eventsConsumer, List<? extends EventMessage<?>> events) {
if (CurrentUnitOfWork.isStarted()) {
CurrentUnitOfWork.get().resources().remove(eventsKey);
}
Expand All @@ -140,11 +142,11 @@ private void doWithEvents(Consumer<List<EventMessage<?>>> eventsConsumer, List<E

/**
* Process given <code>events</code> while the Unit of Work root is preparing for commit. The default implementation
* does nothing.
* passes the events to each registered event processor.
*
* @param events Events to be published by this Event Bus
*/
protected void prepareCommit(List<EventMessage<?>> events) {
protected void prepareCommit(List<? extends EventMessage<?>> events) {
for (Consumer<List<? extends EventMessage<?>>> eventProcessor : eventProcessors) {
eventProcessor.accept(events);
}
Expand All @@ -156,7 +158,7 @@ protected void prepareCommit(List<EventMessage<?>> events) {
*
* @param events Events to be published by this Event Bus
*/
protected void commit(List<EventMessage<?>> events) {
protected void commit(List<? extends EventMessage<?>> events) {
}

/**
Expand All @@ -165,6 +167,6 @@ protected void commit(List<EventMessage<?>> events) {
*
* @param events Events to be published by this Event Bus
*/
protected void afterCommit(List<EventMessage<?>> events) {
protected void afterCommit(List<? extends EventMessage<?>> events) {
}
}
Expand Up @@ -17,13 +17,13 @@
package org.axonframework.eventhandling;

import org.axonframework.common.Registration;
import org.axonframework.eventstore.TrackingEventStream;
import org.axonframework.eventstore.TrackingToken;
import org.axonframework.messaging.MessageDispatchInterceptor;

import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;

/**
* Specification of the mechanism on which the Event Listeners can subscribe for events and event publishers can publish
Expand Down Expand Up @@ -55,7 +55,7 @@ public interface EventBus {
* @return a stream of events since the given trackingToken
* @throws UnsupportedOperationException in case this event bus does not support streaming from given token
*/
Stream<? extends TrackedEventMessage<?>> readEvents(TrackingToken trackingToken);
TrackingEventStream streamEvents(TrackingToken trackingToken);

/**
* Publish a collection of events on this bus (one, or multiple). The events will be dispatched to all subscribed
Expand All @@ -79,7 +79,7 @@ default void publish(EventMessage<?>... events) {
*
* @param events The collection of events to publish
*/
void publish(List<EventMessage<?>> events);
void publish(List<? extends EventMessage<?>> events);

/**
* Subscribe the given <code>eventProcessor</code> to this bus. When subscribed, it will receive all events
Expand Down

0 comments on commit e0e152f

Please sign in to comment.