Skip to content

Commit

Permalink
Add default event store implementation that periodically polls for ne…
Browse files Browse the repository at this point in the history
…w events

 If a consumer reaches the end of the global event stream the event store blocks the consumer thread until the store has new events.
  • Loading branch information
renedewaele committed Apr 21, 2016
1 parent fbec2f3 commit 88fc2ce
Show file tree
Hide file tree
Showing 32 changed files with 400 additions and 293 deletions.
Expand Up @@ -23,7 +23,6 @@
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.AbstractEventBus;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.EventProcessorMetaData;
import org.axonframework.eventhandling.amqp.*;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
Expand All @@ -41,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import static org.axonframework.eventhandling.amqp.AMQPConsumerConfiguration.AMQP_CONFIG_PROPERTY;

Expand Down Expand Up @@ -177,7 +177,7 @@ private void tryRollback(Channel channel) {
}

@Override
public Registration subscribe(EventProcessor eventProcessor) {
public Registration subscribe(Consumer<List<? extends EventMessage<?>>> eventProcessor) {
EventProcessorMetaData processorMetaData = eventProcessor.getMetaData();
AMQPConsumerConfiguration config;
if (processorMetaData.getProperty(AMQP_CONFIG_PROPERTY) instanceof AMQPConsumerConfiguration) {
Expand Down
Expand Up @@ -33,40 +33,22 @@ public abstract class AbstractEventBus implements EventBus {
private static final Logger logger = LoggerFactory.getLogger(AbstractEventBus.class);

final String eventsKey = this + "_EVENTS";
private final Set<EventProcessor> eventProcessors = new CopyOnWriteArraySet<>();
private final Set<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArraySet<>();
private final Set<MessageDispatchInterceptor<EventMessage<?>>> dispatchInterceptors = new CopyOnWriteArraySet<>();
private final PublicationStrategy publicationStrategy;

/**
* Initializes an event bus with a {@link PublicationStrategy} that forwards events to all subscribed event
* processors.
*/
public AbstractEventBus() {
this(new DirectTerminal());
}

/**
* Initializes an event bus with given {@link PublicationStrategy}.
*
* @param publicationStrategy The strategy used by the event bus to publish events to listeners
*/
public AbstractEventBus(PublicationStrategy publicationStrategy) {
this.publicationStrategy = publicationStrategy;
}

@Override
public Registration subscribe(EventProcessor eventProcessor) {
public Registration subscribe(Consumer<List<? extends EventMessage<?>>> eventProcessor) {
if (this.eventProcessors.add(eventProcessor)) {
logger.debug("EventProcessor [{}] subscribed successfully", eventProcessor.getName());
logger.debug("EventProcessor [{}] subscribed successfully", eventProcessor);
} else {
logger.info("EventProcessor [{}] not added. It was already subscribed", eventProcessor.getName());
logger.info("EventProcessor [{}] not added. It was already subscribed", eventProcessor);
}
return () -> {
if (eventProcessors.remove(eventProcessor)) {
logger.debug("EventListener {} unsubscribed successfully", eventProcessor.getName());
logger.debug("EventListener {} unsubscribed successfully", eventProcessor);
return true;
} else {
logger.info("EventListener {} not removed. It was already unsubscribed", eventProcessor.getName());
logger.info("EventListener {} not removed. It was already unsubscribed", eventProcessor);
return false;
}
};
Expand Down Expand Up @@ -163,7 +145,9 @@ private void doWithEvents(Consumer<List<EventMessage<?>>> eventsConsumer, List<E
* @param events Events to be published by this Event Bus
*/
protected void prepareCommit(List<EventMessage<?>> events) {
publicationStrategy.publish(events, eventProcessors);
for (Consumer<List<? extends EventMessage<?>>> eventProcessor : eventProcessors) {
eventProcessor.accept(events);
}
}

/**
Expand All @@ -183,13 +167,4 @@ protected void commit(List<EventMessage<?>> events) {
*/
protected void afterCommit(List<EventMessage<?>> events) {
}

private static class DirectTerminal implements PublicationStrategy {
@Override
public void publish(List<EventMessage<?>> events, Set<EventProcessor> eventProcessors) {
for (EventProcessor eventProcessor : eventProcessors) {
eventProcessor.handle(events);
}
}
}
}
Expand Up @@ -22,6 +22,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -91,7 +92,7 @@ default void publish(EventMessage<?>... events) {
* events.
* @throws EventListenerSubscriptionFailedException if the listener could not be subscribed
*/
Registration subscribe(EventProcessor eventProcessor);
Registration subscribe(Consumer<List<? extends EventMessage<?>>> eventProcessor);

/**
* Register the given <code>interceptor</code> with this bus. When subscribed it will intercept any event messages
Expand Down

This file was deleted.

Expand Up @@ -46,23 +46,14 @@ public class SimpleEventBus extends AbstractEventBus {
private static final int DEFAULT_QUEUE_CAPACITY = Integer.MAX_VALUE;

private final Collection<EventStreamSpliterator> eventReaders = new CopyOnWriteArraySet<>();
private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
private final int queueCapacity;

/**
* Initializes an event bus with a {@link PublicationStrategy} that forwards events to all subscribed event
* processors.
*/
public SimpleEventBus() {
super();
this(DEFAULT_QUEUE_CAPACITY);
}

/**
* Initializes an event bus with given {@link PublicationStrategy}.
*
* @param publicationStrategy The strategy used by the event bus to publish events to listeners
*/
public SimpleEventBus(PublicationStrategy publicationStrategy) {
super(publicationStrategy);
public SimpleEventBus(int queueCapacity) {
this.queueCapacity = queueCapacity;
}

@Override
Expand Down Expand Up @@ -91,11 +82,7 @@ public Stream<? extends TrackedEventMessage<?>> readEvents(TrackingToken trackin
return stream;
}

public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}

private static class EventStreamSpliterator extends Spliterators.AbstractSpliterator<TrackedEventMessage<?>> {
private class EventStreamSpliterator extends Spliterators.AbstractSpliterator<TrackedEventMessage<?>> {
private final BlockingQueue<TrackedEventMessage<?>> eventQueue;

private EventStreamSpliterator(int queueCapacity) {
Expand All @@ -105,7 +92,14 @@ private EventStreamSpliterator(int queueCapacity) {

private void addEvents(List<EventMessage<?>> events) {
//add one by one because bulk operations on LinkedBlockingQueues are not thread-safe
events.forEach(eventMessage -> eventQueue.offer(asTrackedEventMessage(eventMessage, null)));
events.forEach(eventMessage -> {
try {
eventQueue.put(asTrackedEventMessage(eventMessage, null));
} catch (InterruptedException e) {
logger.warn("Event producer thread was interrupted. Shutting down.", e);
Thread.currentThread().interrupt();
}
});
}

@Override
Expand All @@ -114,6 +108,7 @@ public boolean tryAdvance(Consumer<? super TrackedEventMessage<?>> action) {
action.accept(eventQueue.take());
} catch (InterruptedException e) {
logger.warn("Thread was interrupted while waiting for the next item on the queue", e);
Thread.currentThread().interrupt();
return false;
}
return true;
Expand Down
Expand Up @@ -15,7 +15,6 @@

import org.axonframework.eventhandling.AbstractEventBus;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.PublicationStrategy;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,11 +35,6 @@ public AbstractEventStore(EventStorageEngine storageEngine) {
this.storageEngine = storageEngine;
}

public AbstractEventStore(PublicationStrategy publicationStrategy, EventStorageEngine storageEngine) {
super(publicationStrategy);
this.storageEngine = storageEngine;
}

@Override
protected void commit(List<EventMessage<?>> events) {
storageEngine.appendEvents(events);
Expand Down
Expand Up @@ -87,7 +87,7 @@ private static class EventStreamSpliterator<T> extends Spliterators.AbstractSpli
private int lastBatchSize;

private EventStreamSpliterator(Function<T, List<T>> fetchFunction, int batchSize) {
super(Long.MAX_VALUE, NONNULL | ORDERED | DISTINCT);
super(Long.MAX_VALUE, NONNULL | ORDERED | DISTINCT | CONCURRENT);
this.fetchFunction = fetchFunction;
this.batchSize = batchSize;
}
Expand Down

0 comments on commit 88fc2ce

Please sign in to comment.