diff --git a/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/SpringAMQPEventBus.java b/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/SpringAMQPEventBus.java index e416cdd813..5b77381c41 100644 --- a/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/SpringAMQPEventBus.java +++ b/amqp/src/main/java/org/axonframework/eventhandling/amqp/spring/SpringAMQPEventBus.java @@ -23,7 +23,6 @@ import org.axonframework.common.Registration; import org.axonframework.eventhandling.AbstractEventBus; import org.axonframework.eventhandling.EventMessage; -import org.axonframework.eventhandling.EventProcessor; import org.axonframework.eventhandling.EventProcessorMetaData; import org.axonframework.eventhandling.amqp.*; import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; @@ -41,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import static org.axonframework.eventhandling.amqp.AMQPConsumerConfiguration.AMQP_CONFIG_PROPERTY; @@ -177,7 +177,7 @@ private void tryRollback(Channel channel) { } @Override - public Registration subscribe(EventProcessor eventProcessor) { + public Registration subscribe(Consumer>> eventProcessor) { EventProcessorMetaData processorMetaData = eventProcessor.getMetaData(); AMQPConsumerConfiguration config; if (processorMetaData.getProperty(AMQP_CONFIG_PROPERTY) instanceof AMQPConsumerConfiguration) { diff --git a/core/src/main/java/org/axonframework/eventhandling/AbstractEventBus.java b/core/src/main/java/org/axonframework/eventhandling/AbstractEventBus.java index 89205cffaf..40ed6d9c17 100644 --- a/core/src/main/java/org/axonframework/eventhandling/AbstractEventBus.java +++ b/core/src/main/java/org/axonframework/eventhandling/AbstractEventBus.java @@ -33,40 +33,22 @@ public abstract class AbstractEventBus implements EventBus { private static final Logger logger = LoggerFactory.getLogger(AbstractEventBus.class); final String eventsKey = this + "_EVENTS"; - private final Set eventProcessors = new CopyOnWriteArraySet<>(); + private final Set>>> eventProcessors = new CopyOnWriteArraySet<>(); private final Set>> dispatchInterceptors = new CopyOnWriteArraySet<>(); - private final PublicationStrategy publicationStrategy; - - /** - * Initializes an event bus with a {@link PublicationStrategy} that forwards events to all subscribed event - * processors. - */ - public AbstractEventBus() { - this(new DirectTerminal()); - } - - /** - * Initializes an event bus with given {@link PublicationStrategy}. - * - * @param publicationStrategy The strategy used by the event bus to publish events to listeners - */ - public AbstractEventBus(PublicationStrategy publicationStrategy) { - this.publicationStrategy = publicationStrategy; - } @Override - public Registration subscribe(EventProcessor eventProcessor) { + public Registration subscribe(Consumer>> eventProcessor) { if (this.eventProcessors.add(eventProcessor)) { - logger.debug("EventProcessor [{}] subscribed successfully", eventProcessor.getName()); + logger.debug("EventProcessor [{}] subscribed successfully", eventProcessor); } else { - logger.info("EventProcessor [{}] not added. It was already subscribed", eventProcessor.getName()); + logger.info("EventProcessor [{}] not added. It was already subscribed", eventProcessor); } return () -> { if (eventProcessors.remove(eventProcessor)) { - logger.debug("EventListener {} unsubscribed successfully", eventProcessor.getName()); + logger.debug("EventListener {} unsubscribed successfully", eventProcessor); return true; } else { - logger.info("EventListener {} not removed. It was already unsubscribed", eventProcessor.getName()); + logger.info("EventListener {} not removed. It was already unsubscribed", eventProcessor); return false; } }; @@ -163,7 +145,9 @@ private void doWithEvents(Consumer>> eventsConsumer, List> events) { - publicationStrategy.publish(events, eventProcessors); + for (Consumer>> eventProcessor : eventProcessors) { + eventProcessor.accept(events); + } } /** @@ -183,13 +167,4 @@ protected void commit(List> events) { */ protected void afterCommit(List> events) { } - - private static class DirectTerminal implements PublicationStrategy { - @Override - public void publish(List> events, Set eventProcessors) { - for (EventProcessor eventProcessor : eventProcessors) { - eventProcessor.handle(events); - } - } - } } diff --git a/core/src/main/java/org/axonframework/eventhandling/EventBus.java b/core/src/main/java/org/axonframework/eventhandling/EventBus.java index 5d534a9557..0ffa12925c 100644 --- a/core/src/main/java/org/axonframework/eventhandling/EventBus.java +++ b/core/src/main/java/org/axonframework/eventhandling/EventBus.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; +import java.util.function.Consumer; import java.util.stream.Stream; /** @@ -91,7 +92,7 @@ default void publish(EventMessage... events) { * events. * @throws EventListenerSubscriptionFailedException if the listener could not be subscribed */ - Registration subscribe(EventProcessor eventProcessor); + Registration subscribe(Consumer>> eventProcessor); /** * Register the given interceptor with this bus. When subscribed it will intercept any event messages diff --git a/core/src/main/java/org/axonframework/eventhandling/PublicationStrategy.java b/core/src/main/java/org/axonframework/eventhandling/PublicationStrategy.java deleted file mode 100644 index 722944a408..0000000000 --- a/core/src/main/java/org/axonframework/eventhandling/PublicationStrategy.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2010-2015. Axon Framework - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.axonframework.eventhandling; - -import java.util.List; -import java.util.Set; - -/** - * Interface describing a mechanism that connects {@link EventProcessor EventProcessors}. The terminal is - * responsible for delivering published Events with all of the event processors available in the Event Bus - * (either locally, or remotely). - *

- * Terminals are typically bound to a single Event Bus instance, but may be aware that multiple instances exist in - * order to form a bridge between these Event Buses. - * - * @author Allard Buijze - * @since 1.2 - */ -public interface PublicationStrategy { - - /** - * Publishes the given events to all event processors on the Event Bus. The terminal is responsible - * for the delivery process, albeit local or remote. - * - * @param events the collections of events to publish - */ - void publish(List> events, Set eventProcessors); -} diff --git a/core/src/main/java/org/axonframework/eventhandling/SimpleEventBus.java b/core/src/main/java/org/axonframework/eventhandling/SimpleEventBus.java index 934946d4ac..7199077d32 100644 --- a/core/src/main/java/org/axonframework/eventhandling/SimpleEventBus.java +++ b/core/src/main/java/org/axonframework/eventhandling/SimpleEventBus.java @@ -46,23 +46,14 @@ public class SimpleEventBus extends AbstractEventBus { private static final int DEFAULT_QUEUE_CAPACITY = Integer.MAX_VALUE; private final Collection eventReaders = new CopyOnWriteArraySet<>(); - private int queueCapacity = DEFAULT_QUEUE_CAPACITY; + private final int queueCapacity; - /** - * Initializes an event bus with a {@link PublicationStrategy} that forwards events to all subscribed event - * processors. - */ public SimpleEventBus() { - super(); + this(DEFAULT_QUEUE_CAPACITY); } - /** - * Initializes an event bus with given {@link PublicationStrategy}. - * - * @param publicationStrategy The strategy used by the event bus to publish events to listeners - */ - public SimpleEventBus(PublicationStrategy publicationStrategy) { - super(publicationStrategy); + public SimpleEventBus(int queueCapacity) { + this.queueCapacity = queueCapacity; } @Override @@ -91,11 +82,7 @@ public Stream> readEvents(TrackingToken trackin return stream; } - public void setQueueCapacity(int queueCapacity) { - this.queueCapacity = queueCapacity; - } - - private static class EventStreamSpliterator extends Spliterators.AbstractSpliterator> { + private class EventStreamSpliterator extends Spliterators.AbstractSpliterator> { private final BlockingQueue> eventQueue; private EventStreamSpliterator(int queueCapacity) { @@ -105,7 +92,14 @@ private EventStreamSpliterator(int queueCapacity) { private void addEvents(List> events) { //add one by one because bulk operations on LinkedBlockingQueues are not thread-safe - events.forEach(eventMessage -> eventQueue.offer(asTrackedEventMessage(eventMessage, null))); + events.forEach(eventMessage -> { + try { + eventQueue.put(asTrackedEventMessage(eventMessage, null)); + } catch (InterruptedException e) { + logger.warn("Event producer thread was interrupted. Shutting down.", e); + Thread.currentThread().interrupt(); + } + }); } @Override @@ -114,6 +108,7 @@ public boolean tryAdvance(Consumer> action) { action.accept(eventQueue.take()); } catch (InterruptedException e) { logger.warn("Thread was interrupted while waiting for the next item on the queue", e); + Thread.currentThread().interrupt(); return false; } return true; diff --git a/core/src/main/java/org/axonframework/eventstore/AbstractEventStore.java b/core/src/main/java/org/axonframework/eventstore/AbstractEventStore.java index 9f5af0b41a..efa42f2ed2 100644 --- a/core/src/main/java/org/axonframework/eventstore/AbstractEventStore.java +++ b/core/src/main/java/org/axonframework/eventstore/AbstractEventStore.java @@ -15,7 +15,6 @@ import org.axonframework.eventhandling.AbstractEventBus; import org.axonframework.eventhandling.EventMessage; -import org.axonframework.eventhandling.PublicationStrategy; import org.axonframework.eventsourcing.DomainEventMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,11 +35,6 @@ public AbstractEventStore(EventStorageEngine storageEngine) { this.storageEngine = storageEngine; } - public AbstractEventStore(PublicationStrategy publicationStrategy, EventStorageEngine storageEngine) { - super(publicationStrategy); - this.storageEngine = storageEngine; - } - @Override protected void commit(List> events) { storageEngine.appendEvents(events); diff --git a/core/src/main/java/org/axonframework/eventstore/BatchingEventStorageEngine.java b/core/src/main/java/org/axonframework/eventstore/BatchingEventStorageEngine.java index 76e924bf59..883adf238e 100644 --- a/core/src/main/java/org/axonframework/eventstore/BatchingEventStorageEngine.java +++ b/core/src/main/java/org/axonframework/eventstore/BatchingEventStorageEngine.java @@ -87,7 +87,7 @@ private static class EventStreamSpliterator extends Spliterators.AbstractSpli private int lastBatchSize; private EventStreamSpliterator(Function> fetchFunction, int batchSize) { - super(Long.MAX_VALUE, NONNULL | ORDERED | DISTINCT); + super(Long.MAX_VALUE, NONNULL | ORDERED | DISTINCT | CONCURRENT); this.fetchFunction = fetchFunction; this.batchSize = batchSize; } diff --git a/core/src/main/java/org/axonframework/eventstore/EmbeddedEventStore.java b/core/src/main/java/org/axonframework/eventstore/EmbeddedEventStore.java index 68cc8d01dc..d25f64f877 100644 --- a/core/src/main/java/org/axonframework/eventstore/EmbeddedEventStore.java +++ b/core/src/main/java/org/axonframework/eventstore/EmbeddedEventStore.java @@ -13,81 +13,340 @@ package org.axonframework.eventstore; +import org.axonframework.common.AxonThreadFactory; import org.axonframework.eventhandling.EventMessage; -import org.axonframework.eventhandling.PublicationStrategy; import org.axonframework.eventhandling.TrackedEventMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static java.util.Comparator.comparing; /** * @author Rene de Waele */ public class EmbeddedEventStore extends AbstractEventStore { + private static final Logger logger = LoggerFactory.getLogger(EmbeddedEventStore.class); + private static final ThreadGroup THREAD_GROUP = new ThreadGroup(EmbeddedEventStore.class.getSimpleName()); + + private final Lock consumerLock = new ReentrantLock(); + private final Condition consumableEventsCondition = consumerLock.newCondition(); + private final Set tailingConsumers = new CopyOnWriteArraySet<>(); + private final Producer producer; + private final long cleanupDelayMillis; + private final ThreadFactory threadFactory; + private final ScheduledExecutorService cleanupService; - private static final int DEFAULT_BACKLOG_LENGTH = 1024; - private final EventCache eventCache; + private volatile Node oldest; public EmbeddedEventStore(EventStorageEngine storageEngine) { + this(storageEngine, 10000, 1000L, 10000L, TimeUnit.MILLISECONDS); + } + + public EmbeddedEventStore(EventStorageEngine storageEngine, int cachedEvents, long fetchDelay, long cleanupDelay, + TimeUnit timeUnit) { super(storageEngine); - eventCache = new EventCache(DEFAULT_BACKLOG_LENGTH); + threadFactory = new AxonThreadFactory(THREAD_GROUP); + cleanupService = Executors.newScheduledThreadPool(1, threadFactory); + producer = new Producer(timeUnit.toNanos(fetchDelay), cachedEvents); + cleanupDelayMillis = timeUnit.toMillis(cleanupDelay); } - public EmbeddedEventStore(PublicationStrategy publicationStrategy, EventStorageEngine storageEngine) { - this(publicationStrategy, storageEngine, DEFAULT_BACKLOG_LENGTH); + @PostConstruct + public void initialize() { + threadFactory.newThread(producer::tryFetch); + cleanupService + .scheduleWithFixedDelay(new Cleaner(), cleanupDelayMillis, cleanupDelayMillis, TimeUnit.MILLISECONDS); } - public EmbeddedEventStore(PublicationStrategy publicationStrategy, EventStorageEngine storageEngine, - int backlogLength) { - super(publicationStrategy, storageEngine); - eventCache = new EventCache(backlogLength); + @PreDestroy + public void destroy() { + cleanupService.shutdownNow(); + oldest = null; } @Override protected void afterCommit(List> events) { - //todo wake up waiting polling stream + producer.fetchIfWaiting(); } @Override public Stream> readEvents(TrackingToken trackingToken) { - //todo - return null; + Node node = findNode(trackingToken); + EventConsumer eventConsumer; + if (node != null) { + eventConsumer = new EventConsumer(node); + tailingConsumers.add(eventConsumer); + } else { + eventConsumer = new EventConsumer(trackingToken); + } + return StreamSupport.stream(eventConsumer, false).onClose(eventConsumer::closePrivateStream); + } + + private Node findNode(TrackingToken trackingToken) { + Node oldest = this.oldest; + return oldest == null ? null : oldest.find(trackingToken); + } + + private class Producer { + private final Lock lock = new ReentrantLock(); + private final Condition dataAvailableCondition = lock.newCondition(); + private final long fetchDelayNanos; + private final int cachedEvents; + private volatile boolean fetching, shouldFetch; + private Node newest; + + private Producer(long fetchDelayNanos, int cachedEvents) { + this.fetchDelayNanos = fetchDelayNanos; + this.cachedEvents = cachedEvents; + } + + private void fetchIfWaiting() { + shouldFetch = true; + if (!fetching) { + lock.lock(); + try { + dataAvailableCondition.signal(); + } finally { + lock.unlock(); + } + } + } + + private void tryFetch() { + shouldFetch = true; + fetching = true; + while (shouldFetch) { + shouldFetch = false; + if (!tailingConsumers.isEmpty()) { + if (newest == null) { + newest = tailingConsumers.stream().map(consumer -> consumer.lastMessage) + .min(comparing(TrackedEventMessage::trackingToken, Comparator.naturalOrder())) + .map(event -> new Node(0, event)).orElse(null); + } + storageEngine().readEvents(lastToken()).forEach(event -> { + Node node = new Node(nextIndex(), event); + newest.next = node; + newest = node; + if (oldest == null) { + oldest = node; + } + notifyConsumers(); + }); + trimCache(); + } + } + fetching = false; + if (tailingConsumers.isEmpty()) { + newest = null; + } + delayedFetch(); + } + + private TrackingToken lastToken() { + return newest == null ? null : newest.event.trackingToken(); + } + + private long nextIndex() { + return newest == null ? 0 : newest.index + 1; + } + + private void delayedFetch() { + lock.lock(); + try { + dataAvailableCondition.awaitNanos(fetchDelayNanos); + } catch (InterruptedException e) { + logger.warn("Producer thread was interrupted. Shutting down event store.", e); + Thread.currentThread().interrupt(); + return; + } finally { + lock.unlock(); + } + tryFetch(); + } + + private void notifyConsumers() { + consumerLock.lock(); + try { + consumableEventsCondition.signalAll(); + } finally { + consumerLock.unlock(); + } + } + + private void trimCache() { + while (newest != null && newest.index - oldest.index > cachedEvents) { + oldest = oldest.next; + } + } } - private class Consumer { - private Node head, tail; + private class EventConsumer extends Spliterators.AbstractSpliterator> { + private final TrackingToken startToken; + private Stream> privateStream; + private Spliterator> privateSpliterator; + private volatile TrackedEventMessage lastMessage; + private volatile Node lastNode; + + private EventConsumer(Node lastNode) { + this(lastNode.event.trackingToken()); + this.lastNode = lastNode; + this.lastMessage = lastNode.event; + } + + private EventConsumer(TrackingToken startToken) { + super(Long.MAX_VALUE, NONNULL | ORDERED | DISTINCT); + this.startToken = startToken; + } + + @Override + public boolean tryAdvance(Consumer> action) { + if (tailingConsumers.contains(this)) { + Node nextNode; + if ((nextNode = nextNode()) == null) { + consumerLock.lock(); + try { + while ((nextNode = nextNode()) == null) { + if (!tailingConsumers.contains(this)) { + return tryAdvance(action); //cleaner removed consumer: reopen a private stream + } + waitForEvents(); + } + } catch (InterruptedException e) { + return false; + } finally { + consumerLock.unlock(); + } + } + lastNode = nextNode; + lastMessage = nextNode.event; + action.accept(nextNode.event); + return true; + } else { + TrackingToken lastToken = lastToken(); + if (privateSpliterator == null) { + privateStream = storageEngine().readEvents(lastToken); + privateSpliterator = privateStream.spliterator(); + } + if (privateSpliterator.tryAdvance(event -> action.accept(lastMessage = event))) { + return true; + } else if (lastMessage == null) { + consumerLock.lock(); + try { + waitForEvents(); + } catch (InterruptedException e) { + return false; + } finally { + consumerLock.unlock(); + } + return tryAdvance(action); + } else { + closePrivateStream(); + tailingConsumers.add(this); + return tryAdvance(action); + } + } + } + + private void waitForEvents() throws InterruptedException { + try { + consumableEventsCondition.await(); + } catch (InterruptedException e) { + logger.warn("Consumer thread was interrupted. Returning thread to event processor.", e); + Thread.currentThread().interrupt(); + throw e; + } + } + + private Node nextNode() { + Node current; + if ((current = lastNode) == null && (current = lastNode = findNode(lastToken())) == null) { + return null; + } + return current.next; + } + + + private void closePrivateStream() { + Optional.ofNullable(privateStream).ifPresent(stream -> { + privateStream = null; + privateSpliterator = null; + stream.close(); + }); + } + + private TrackingToken lastToken() { + return lastMessage == null ? startToken : lastMessage.trackingToken(); + } + } + private class Cleaner implements Runnable { + @Override + public void run() { + Node currentOldest = oldest; + if (currentOldest == null) { + return; + } + Iterator iterator = tailingConsumers.iterator(); + while (iterator.hasNext()) { + EventConsumer consumer = iterator.next(); + Node lastNode = consumer.lastNode; + if (lastNode != null && currentOldest.index > lastNode.index) { + iterator.remove(); + consumer.lastNode = null; //make old nodes garbage collectable + } + } + } } private static class Node { + private final long index; private final TrackedEventMessage event; - private final AtomicReference> next = new AtomicReference<>(); + private volatile Node next; - private Node(TrackedEventMessage event) { + private Node(long index, TrackedEventMessage event) { + this.index = index; this.event = event; } + + private Node find(TrackingToken trackingToken) { + if (event.trackingToken().isAfter(trackingToken)) { + return null; + } + Node node = this; + while (node != null && !node.event.trackingToken().equals(trackingToken)) { + node = node.next; + } + return node; + } } - private static class EventCache extends LinkedHashMap> { - private final int maxSize; - private int size; + private static class ConsumerEventStream implements EventStream { + + private final EventConsumer eventConsumer; - private EventCache(int maxSize) { - super(maxSize); - this.maxSize = maxSize; + public ConsumerEventStream(EventConsumer eventConsumer) { + this.eventConsumer = eventConsumer; } - private void add(TrackedEventMessage event) { - size++; - put(event.trackingToken(), event); + @Override + public Stream> all() { + return StreamSupport.stream(eventConsumer, false).onClose(eventConsumer::closePrivateStream); } @Override - protected boolean removeEldestEntry(Map.Entry> eldest) { - return size > maxSize; + public Stream> batch(int maxSize) { + return null; } } } diff --git a/core/src/main/java/org/axonframework/eventstore/EventStream.java b/core/src/main/java/org/axonframework/eventstore/EventStream.java new file mode 100644 index 0000000000..6156792888 --- /dev/null +++ b/core/src/main/java/org/axonframework/eventstore/EventStream.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2010-2016. Axon Framework + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.eventstore; + +import org.axonframework.eventhandling.TrackedEventMessage; + +import java.util.stream.Stream; + +/** + * @author Rene de Waele + */ +public interface EventStream { + + Stream> all(); + + Stream> batch(int maxSize); + +} diff --git a/core/src/main/java/org/axonframework/eventstore/StupidEventStore.java b/core/src/main/java/org/axonframework/eventstore/StupidEventStore.java deleted file mode 100644 index 6f1a8530e6..0000000000 --- a/core/src/main/java/org/axonframework/eventstore/StupidEventStore.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2010-2016. Axon Framework - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.axonframework.eventstore; - -import org.axonframework.eventhandling.EventMessage; -import org.axonframework.eventhandling.PublicationStrategy; -import org.axonframework.eventhandling.TrackedEventMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.function.Consumer; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -/** - * @author Rene de Waele - */ -public class StupidEventStore extends AbstractEventStore { - - private static final long DEFAULT_POLLING_INTERVAL = 1000L; - private static final Logger logger = LoggerFactory.getLogger(StupidEventStore.class); - - private long pollingInterval = DEFAULT_POLLING_INTERVAL; - - public StupidEventStore(EventStorageEngine storageEngine) { - super(storageEngine); - } - - public StupidEventStore(PublicationStrategy publicationStrategy, EventStorageEngine storageEngine) { - super(publicationStrategy, storageEngine); - } - - @Override - protected void afterCommit(List> events) { - //todo - } - - @Override - public Stream> readEvents(TrackingToken trackingToken) { - EventStreamSpliterator spliterator = new EventStreamSpliterator(trackingToken); - return StreamSupport.stream(spliterator, false); - } - - public void setPollingInterval(long pollingInterval) { - this.pollingInterval = pollingInterval; - } - - private class EventStreamSpliterator extends Spliterators.AbstractSpliterator> { - - //todo close event stream when downstream closes - - private Spliterator> delegate; - private final Consumer> eventConsumer = event -> lastToken = event.trackingToken(); - private TrackingToken lastToken; - - private EventStreamSpliterator(TrackingToken trackingToken) { - this(storageEngine().readEvents(trackingToken).spliterator(), trackingToken); - } - - private EventStreamSpliterator(Spliterator> delegate, TrackingToken startToken) { - super(delegate.estimateSize(), delegate.characteristics()); - this.delegate = delegate; - this.lastToken = startToken; - } - - @Override - public boolean tryAdvance(Consumer> action) { - if (!delegate.tryAdvance(eventConsumer.andThen(action))) { - try { - Thread.sleep(pollingInterval); - delegate = storageEngine().readEvents(lastToken).spliterator(); - return tryAdvance(action); - } catch (InterruptedException e) { - logger.warn("Reader thread interrupted", e); - return false; - } - } - return true; - } - } -} diff --git a/core/src/test/java/org/axonframework/commandhandling/CommandHandlingTest.java b/core/src/test/java/org/axonframework/commandhandling/CommandHandlingTest.java index 1769418c89..c8ce777b4c 100644 --- a/core/src/test/java/org/axonframework/commandhandling/CommandHandlingTest.java +++ b/core/src/test/java/org/axonframework/commandhandling/CommandHandlingTest.java @@ -20,7 +20,6 @@ import org.axonframework.domain.StubAggregate; import org.axonframework.eventhandling.AbstractEventBus; import org.axonframework.eventhandling.EventMessage; -import org.axonframework.eventhandling.EventProcessor; import org.axonframework.eventhandling.TrackedEventMessage; import org.axonframework.eventsourcing.DomainEventMessage; import org.axonframework.eventsourcing.EventSourcingRepository; @@ -36,6 +35,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -98,7 +98,7 @@ public Stream> readEvents(TrackingToken trackin } @Override - public Registration subscribe(EventProcessor eventProcessor) { + public Registration subscribe(Consumer>> eventProcessor) { throw new UnsupportedOperationException(); } } diff --git a/core/src/test/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBusBenchmark.java b/core/src/test/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBusBenchmark.java index bb64ae6e38..dd6960a7e7 100644 --- a/core/src/test/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBusBenchmark.java +++ b/core/src/test/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBusBenchmark.java @@ -24,7 +24,6 @@ import org.axonframework.common.Registration; import org.axonframework.eventhandling.EventBus; import org.axonframework.eventhandling.EventMessage; -import org.axonframework.eventhandling.EventProcessor; import org.axonframework.eventsourcing.DomainEventMessage; import org.axonframework.eventsourcing.GenericAggregateFactory; import org.axonframework.eventsourcing.GenericDomainEventMessage; @@ -40,6 +39,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; @@ -177,7 +177,7 @@ public void publish(List> events) { } @Override - public Registration subscribe(EventProcessor eventProcessor) { + public Registration subscribe(Consumer>> eventProcessor) { throw new UnsupportedOperationException(); } diff --git a/core/src/test/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded.java b/core/src/test/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded.java index 93ccbbb173..ae83dd3cfb 100644 --- a/core/src/test/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded.java +++ b/core/src/test/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded.java @@ -26,7 +26,10 @@ import org.axonframework.commandhandling.model.Repository; import org.axonframework.common.Registration; import org.axonframework.domain.IdentifierFactory; -import org.axonframework.eventhandling.*; +import org.axonframework.eventhandling.AbstractEventBus; +import org.axonframework.eventhandling.EventBus; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.TrackedEventMessage; import org.axonframework.eventsourcing.DomainEventMessage; import org.axonframework.eventsourcing.GenericAggregateFactory; import org.axonframework.eventsourcing.annotation.AggregateIdentifier; @@ -47,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.stream.Stream; import static org.axonframework.commandhandling.GenericCommandMessage.asCommandMessage; @@ -296,7 +300,7 @@ public Stream> readEvents(TrackingToken trackin } @Override - public Registration subscribe(EventProcessor eventProcessor) { + public Registration subscribe(Consumer>> eventProcessor) { throw new UnsupportedOperationException(); } diff --git a/core/src/test/java/org/axonframework/eventhandling/AbstractEventBusTest.java b/core/src/test/java/org/axonframework/eventhandling/AbstractEventBusTest.java index 3e213b2aa5..87bac466d5 100644 --- a/core/src/test/java/org/axonframework/eventhandling/AbstractEventBusTest.java +++ b/core/src/test/java/org/axonframework/eventhandling/AbstractEventBusTest.java @@ -27,6 +27,7 @@ import org.mockito.ArgumentCaptor; import java.util.*; +import java.util.function.Consumer; import java.util.function.Function; import static junit.framework.TestCase.assertEquals; @@ -216,7 +217,7 @@ private void onEvents(List> events) { } @Override - public Registration subscribe(EventProcessor eventProcessor) { + public Registration subscribe(Consumer>> eventProcessor) { throw new UnsupportedOperationException(); } } diff --git a/core/src/test/java/org/axonframework/eventhandling/SimpleEventBusTest.java b/core/src/test/java/org/axonframework/eventhandling/SimpleEventBusTest.java index 0b8c7f9803..bbae7d72b0 100644 --- a/core/src/test/java/org/axonframework/eventhandling/SimpleEventBusTest.java +++ b/core/src/test/java/org/axonframework/eventhandling/SimpleEventBusTest.java @@ -43,12 +43,12 @@ public void setUp() { @Test public void testEventIsDispatchedToSubscribedListeners() throws Exception { testSubject.publish(newEvent()); - testSubject.subscribe(listener1); + testSubject.subscribe(eventProcessor); // subscribing twice should not make a difference - Registration subscription1 = testSubject.subscribe(listener1); + Registration subscription1 = testSubject.subscribe(eventProcessor); testSubject.publish(newEvent()); - Registration subscription2 = testSubject.subscribe(listener2); - Registration subscription3 = testSubject.subscribe(listener3); + Registration subscription2 = testSubject.subscribe(eventProcessor); + Registration subscription3 = testSubject.subscribe(eventProcessor); testSubject.publish(newEvent()); subscription1.close(); testSubject.publish(newEvent()); diff --git a/core/src/test/java/org/axonframework/eventsourcing/CachingEventSourcingRepositoryTest.java b/core/src/test/java/org/axonframework/eventsourcing/CachingEventSourcingRepositoryTest.java index 2b2f8d9ddd..edc8691c8c 100644 --- a/core/src/test/java/org/axonframework/eventsourcing/CachingEventSourcingRepositoryTest.java +++ b/core/src/test/java/org/axonframework/eventsourcing/CachingEventSourcingRepositoryTest.java @@ -29,7 +29,6 @@ import org.axonframework.eventhandling.AbstractEventBus; import org.axonframework.eventhandling.EventBus; import org.axonframework.eventhandling.EventMessage; -import org.axonframework.eventhandling.EventProcessor; import org.axonframework.eventstore.EventStore; import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; import org.axonframework.messaging.unitofwork.DefaultUnitOfWork; @@ -43,6 +42,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -226,7 +226,7 @@ protected void commit(List> events) { } @Override - public Registration subscribe(EventProcessor eventProcessor) { + public Registration subscribe(Consumer>> eventProcessor) { throw new UnsupportedOperationException(); } } diff --git a/core/src/test/java/org/axonframework/testutils/RecordingEventStore.java b/core/src/test/java/org/axonframework/testutils/RecordingEventStore.java index 811e669d2f..c38e157140 100644 --- a/core/src/test/java/org/axonframework/testutils/RecordingEventStore.java +++ b/core/src/test/java/org/axonframework/testutils/RecordingEventStore.java @@ -32,6 +32,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.function.Consumer; import java.util.stream.Stream; /** @@ -68,7 +69,7 @@ public void publish(List> events) { } @Override - public Registration subscribe(EventProcessor eventProcessor) { + public Registration subscribe(Consumer>> eventProcessor) { subscriptions.add(eventProcessor); return () -> subscriptions.remove(eventProcessor); } diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/loopbacktest/synchronous/SynchronousLoopbackTest.java b/integrationtests/src/test/java/org/axonframework/integrationtests/loopbacktest/synchronous/SynchronousLoopbackTest.java index 9d203f691b..483b287e61 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/loopbacktest/synchronous/SynchronousLoopbackTest.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/loopbacktest/synchronous/SynchronousLoopbackTest.java @@ -27,8 +27,10 @@ import org.axonframework.commandhandling.model.Repository; import org.axonframework.common.lock.LockFactory; import org.axonframework.common.lock.PessimisticLockFactory; -import org.axonframework.eventhandling.*; +import org.axonframework.eventhandling.EventBus; import org.axonframework.eventhandling.EventListener; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.SimpleEventBus; import org.axonframework.eventsourcing.DomainEventMessage; import org.axonframework.eventsourcing.EventSourcingRepository; import org.axonframework.eventsourcing.GenericDomainEventMessage; @@ -120,7 +122,7 @@ public void testLoopBackKeepsProperEventOrder_PessimisticLocking() { } } }; - eventBus.subscribe(new SimpleEventProcessor("test", el)); + eventBus.subscribe(eventProcessor); commandBus.dispatch(asCommandMessage(new ChangeCounterCommand(aggregateIdentifier, 1)), reportErrorCallback); @@ -158,7 +160,7 @@ public void testLoopBackKeepsProperEventOrder_PessimisticLocking_ProcessingFails } } }; - eventBus.subscribe(new SimpleEventProcessor("test", el)); + eventBus.subscribe(eventProcessor); commandBus.dispatch(asCommandMessage(new ChangeCounterCommand(aggregateIdentifier, 1)), expectErrorCallback); diff --git a/quickstart/src/main/java/org/axonframework/quickstart/RunAnnotatedAggregate.java b/quickstart/src/main/java/org/axonframework/quickstart/RunAnnotatedAggregate.java index 4559f05e4d..7e23b0e9ca 100644 --- a/quickstart/src/main/java/org/axonframework/quickstart/RunAnnotatedAggregate.java +++ b/quickstart/src/main/java/org/axonframework/quickstart/RunAnnotatedAggregate.java @@ -23,13 +23,10 @@ import org.axonframework.commandhandling.gateway.DefaultCommandGateway; import org.axonframework.eventhandling.EventBus; import org.axonframework.eventhandling.SimpleEventBus; -import org.axonframework.eventhandling.SimpleEventProcessor; -import org.axonframework.eventhandling.annotation.AnnotationEventListenerAdapter; import org.axonframework.eventsourcing.EventSourcingRepository; import org.axonframework.eventstore.EventStore; import org.axonframework.eventstore.fs.FileSystemEventStore; import org.axonframework.eventstore.fs.SimpleEventFileResolver; -import org.axonframework.quickstart.annotated.ToDoEventHandler; import org.axonframework.quickstart.annotated.ToDoItem; import java.io.File; @@ -63,7 +60,7 @@ public static void main(String[] args) { new AggregateAnnotationCommandHandler<>(ToDoItem.class, repository).subscribe(commandBus); // We register an event listener to see which events are created - eventBus.subscribe(new SimpleEventProcessor("logging", new AnnotationEventListenerAdapter(new ToDoEventHandler()))); + eventBus.subscribe(eventProcessor); // and let's send some Commands on the CommandBus. CommandGenerator.sendCommands(commandGateway); diff --git a/quickstart/src/main/java/org/axonframework/quickstart/RunBasicCommandHandling.java b/quickstart/src/main/java/org/axonframework/quickstart/RunBasicCommandHandling.java index bbaf3e4a07..f19266cb25 100644 --- a/quickstart/src/main/java/org/axonframework/quickstart/RunBasicCommandHandling.java +++ b/quickstart/src/main/java/org/axonframework/quickstart/RunBasicCommandHandling.java @@ -22,13 +22,10 @@ import org.axonframework.commandhandling.gateway.DefaultCommandGateway; import org.axonframework.eventhandling.EventBus; import org.axonframework.eventhandling.SimpleEventBus; -import org.axonframework.eventhandling.SimpleEventProcessor; -import org.axonframework.eventhandling.annotation.AnnotationEventListenerAdapter; import org.axonframework.eventsourcing.EventSourcingRepository; import org.axonframework.eventstore.EventStore; import org.axonframework.eventstore.fs.FileSystemEventStore; import org.axonframework.eventstore.fs.SimpleEventFileResolver; -import org.axonframework.quickstart.annotated.ToDoEventHandler; import org.axonframework.quickstart.api.CreateToDoItemCommand; import org.axonframework.quickstart.api.MarkCompletedCommand; import org.axonframework.quickstart.handler.CreateToDoCommandHandler; @@ -68,7 +65,7 @@ public static void main(String[] args) { new MarkCompletedCommandHandler(repository)); // We register an event listener to see which events are created - eventBus.subscribe(new SimpleEventProcessor("handler", new AnnotationEventListenerAdapter(new ToDoEventHandler()))); + eventBus.subscribe(eventProcessor); // and let's send some Commands on the CommandBus using the special runner configured with our CommandGateway. CommandGenerator.sendCommands(commandGateway); diff --git a/quickstart/src/main/java/org/axonframework/quickstart/RunDisruptorCommandBus.java b/quickstart/src/main/java/org/axonframework/quickstart/RunDisruptorCommandBus.java index 04bb1cef4e..44639018a8 100644 --- a/quickstart/src/main/java/org/axonframework/quickstart/RunDisruptorCommandBus.java +++ b/quickstart/src/main/java/org/axonframework/quickstart/RunDisruptorCommandBus.java @@ -23,13 +23,10 @@ import org.axonframework.commandhandling.model.Repository; import org.axonframework.eventhandling.EventBus; import org.axonframework.eventhandling.SimpleEventBus; -import org.axonframework.eventhandling.SimpleEventProcessor; -import org.axonframework.eventhandling.annotation.AnnotationEventListenerAdapter; import org.axonframework.eventsourcing.GenericAggregateFactory; import org.axonframework.eventstore.EventStore; import org.axonframework.eventstore.fs.FileSystemEventStore; import org.axonframework.eventstore.fs.SimpleEventFileResolver; -import org.axonframework.quickstart.annotated.ToDoEventHandler; import org.axonframework.quickstart.annotated.ToDoItem; import java.io.File; @@ -51,7 +48,7 @@ public static void main(String[] args) throws InterruptedException { // TODO: Listeners should subscribe to Event Store instead of Event Bus // we register the event handlers - eventBus.subscribe(new SimpleEventProcessor("handler", new AnnotationEventListenerAdapter(new ToDoEventHandler()))); + eventBus.subscribe(eventProcessor); // we use default settings for the disruptor command bus DisruptorCommandBus commandBus = new DisruptorCommandBus(eventStore, eventBus); diff --git a/quickstart/src/main/java/org/axonframework/quickstart/RunSaga.java b/quickstart/src/main/java/org/axonframework/quickstart/RunSaga.java index 7eebe74ce0..7c2186c25b 100644 --- a/quickstart/src/main/java/org/axonframework/quickstart/RunSaga.java +++ b/quickstart/src/main/java/org/axonframework/quickstart/RunSaga.java @@ -23,7 +23,6 @@ import org.axonframework.commandhandling.gateway.DefaultCommandGateway; import org.axonframework.eventhandling.EventBus; import org.axonframework.eventhandling.SimpleEventBus; -import org.axonframework.eventhandling.SimpleEventProcessor; import org.axonframework.eventhandling.scheduling.EventScheduler; import org.axonframework.eventhandling.scheduling.java.SimpleEventScheduler; import org.axonframework.messaging.unitofwork.UnitOfWork; @@ -85,7 +84,7 @@ public static void main(String[] args) throws InterruptedException { AnnotatedSagaManager sagaManager = new AnnotatedSagaManager(sagaRepository, sagaFactory, ToDoSaga.class); // and we need to subscribe the Saga Manager to the Event Bus - eventBus.subscribe(new SimpleEventProcessor("saga", sagaManager)); + eventBus.subscribe(eventProcessor); // That's the infrastructure we need... // Let's pretend a few things are happening diff --git a/spring/src/main/java/org/axonframework/spring/config/EventListenerSubscriber.java b/spring/src/main/java/org/axonframework/spring/config/EventListenerSubscriber.java index 22d8e6695d..abe6f7ac3a 100644 --- a/spring/src/main/java/org/axonframework/spring/config/EventListenerSubscriber.java +++ b/spring/src/main/java/org/axonframework/spring/config/EventListenerSubscriber.java @@ -85,7 +85,8 @@ public void start() { } if (subscribeEventProcessorsToEventBus) { - applicationContext.getBeansOfType(EventProcessor.class).values().forEach(eventBus::subscribe); + applicationContext.getBeansOfType(EventProcessor.class).values().forEach( + (eventProcessor) -> eventBus.subscribe(eventProcessor)); } this.started = true; } @@ -95,7 +96,7 @@ private EventProcessor selectEventProcessor(EventListener bean) { if (eventProcessorSelectors.isEmpty()) { Map eventProcessors = applicationContext.getBeansOfType(EventProcessor.class); if (eventProcessors.isEmpty()) { - eventBus.subscribe(defaultEventProcessor); + eventBus.subscribe(eventProcessor); return defaultEventProcessor; } else if (eventProcessors.size() == 1) { return eventProcessors.values().iterator().next(); diff --git a/spring/src/main/java/org/axonframework/spring/messaging/adapter/EventListeningMessageChannelAdapter.java b/spring/src/main/java/org/axonframework/spring/messaging/adapter/EventListeningMessageChannelAdapter.java index a5bdb1193b..fbb9c1e5f4 100644 --- a/spring/src/main/java/org/axonframework/spring/messaging/adapter/EventListeningMessageChannelAdapter.java +++ b/spring/src/main/java/org/axonframework/spring/messaging/adapter/EventListeningMessageChannelAdapter.java @@ -19,7 +19,6 @@ import org.axonframework.eventhandling.EventBus; import org.axonframework.eventhandling.EventListener; import org.axonframework.eventhandling.EventMessage; -import org.axonframework.eventhandling.SimpleEventProcessor; import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.messaging.MessageChannel; @@ -75,7 +74,7 @@ public EventListeningMessageChannelAdapter(EventBus eventBus, MessageChannel cha */ @Override public void afterPropertiesSet() { - eventBus.subscribe(new SimpleEventProcessor(beanName, this)); + eventBus.subscribe(eventProcessor); } /** diff --git a/spring/src/main/java/org/axonframework/spring/messaging/eventbus/SpringMessagingEventBus.java b/spring/src/main/java/org/axonframework/spring/messaging/eventbus/SpringMessagingEventBus.java index cb9e1c94ab..34d726a96c 100644 --- a/spring/src/main/java/org/axonframework/spring/messaging/eventbus/SpringMessagingEventBus.java +++ b/spring/src/main/java/org/axonframework/spring/messaging/eventbus/SpringMessagingEventBus.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; /** * {@link org.axonframework.eventhandling.EventBus} implementation that delegates all subscription and publishing @@ -58,7 +59,7 @@ protected void prepareCommit(List> events) { } @Override - public Registration subscribe(EventProcessor eventProcessor) { + public Registration subscribe(Consumer>> eventProcessor) { MessageHandler messagehandler = new MessageHandlerAdapter(eventProcessor); MessageHandler oldHandler = handlers.putIfAbsent(eventProcessor, messagehandler); if (oldHandler == null) { diff --git a/spring/src/test/java/org/axonframework/spring/config/MessageHandlerSubscriberDefinitionRegistrarTest.java b/spring/src/test/java/org/axonframework/spring/config/MessageHandlerSubscriberDefinitionRegistrarTest.java index aede9862bf..f8e3a38e0d 100644 --- a/spring/src/test/java/org/axonframework/spring/config/MessageHandlerSubscriberDefinitionRegistrarTest.java +++ b/spring/src/test/java/org/axonframework/spring/config/MessageHandlerSubscriberDefinitionRegistrarTest.java @@ -43,7 +43,7 @@ public class MessageHandlerSubscriberDefinitionRegistrarTest { public void testHandlersRegisteredToEventBus() throws Exception { assertNotNull(eventBus); verify(eventBus).subscribe(eventProcessor); - verify(eventBus2, never()).subscribe(any()); + verify(eventBus2, never()).subscribe(eventProcessor); verify(eventProcessor).subscribe(eventListener); verify(commandBus).subscribe(eq(String.class.getName()), eq(annotationCommandHandler)); } diff --git a/spring/src/test/java/org/axonframework/spring/config/xml/SagaManagerBeanDefinitionParserTest.java b/spring/src/test/java/org/axonframework/spring/config/xml/SagaManagerBeanDefinitionParserTest.java index 7efd9356b3..01b74f488e 100644 --- a/spring/src/test/java/org/axonframework/spring/config/xml/SagaManagerBeanDefinitionParserTest.java +++ b/spring/src/test/java/org/axonframework/spring/config/xml/SagaManagerBeanDefinitionParserTest.java @@ -2,8 +2,8 @@ import org.axonframework.eventhandling.EventBus; import org.axonframework.saga.SagaManager; -import org.junit.*; -import org.junit.runner.*; +import org.junit.Test; +import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; @@ -12,7 +12,7 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import static org.junit.Assert.*; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.*; /** @@ -35,7 +35,7 @@ public class SagaManagerBeanDefinitionParserTest { @Test public void testSagaManagerSubscribedToEventBus() throws Exception { - verify(mockEventBus, times(2)).subscribe(any()); + verify(mockEventBus, times(2)).subscribe(eventProcessor); assertNotNull(explicitManager.getTargetType()); assertNotNull(autowiredManager.getTargetType()); diff --git a/spring/src/test/java/org/axonframework/spring/eventhandling/scheduling/quartz/QuartzSagaTimerIntegrationTest.java b/spring/src/test/java/org/axonframework/spring/eventhandling/scheduling/quartz/QuartzSagaTimerIntegrationTest.java index f544c17e1e..9617b3da97 100644 --- a/spring/src/test/java/org/axonframework/spring/eventhandling/scheduling/quartz/QuartzSagaTimerIntegrationTest.java +++ b/spring/src/test/java/org/axonframework/spring/eventhandling/scheduling/quartz/QuartzSagaTimerIntegrationTest.java @@ -19,7 +19,6 @@ import org.axonframework.eventhandling.EventBus; import org.axonframework.eventhandling.EventListener; import org.axonframework.eventhandling.GenericEventMessage; -import org.axonframework.eventhandling.SimpleEventProcessor; import org.axonframework.saga.AssociationValue; import org.axonframework.saga.SagaRepository; import org.axonframework.spring.eventhandling.scheduling.SimpleTimingSaga; @@ -111,7 +110,7 @@ public void jobWasExecuted(JobExecutionContext context, JobExecutionException jo assertNotNull(eventBus); final String randomAssociationValue = UUID.randomUUID().toString(); EventListener listener = mock(EventListener.class); - eventBus.subscribe(new SimpleEventProcessor("quartz", listener)); + eventBus.subscribe(eventProcessor); new TransactionTemplate(transactionManager) .execute(new TransactionCallbackWithoutResult() { diff --git a/spring/src/test/java/org/axonframework/spring/messaging/adapter/EventListeningMessageChannelAdapterTest.java b/spring/src/test/java/org/axonframework/spring/messaging/adapter/EventListeningMessageChannelAdapterTest.java index d52f22e29d..0187328178 100644 --- a/spring/src/test/java/org/axonframework/spring/messaging/adapter/EventListeningMessageChannelAdapterTest.java +++ b/spring/src/test/java/org/axonframework/spring/messaging/adapter/EventListeningMessageChannelAdapterTest.java @@ -26,13 +26,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.argThat; -import static org.mockito.Mockito.isA; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; /** * @author Allard Buijze @@ -63,9 +57,9 @@ public void testMessageForwardedToChannel() { @Test public void testEventListenerRegisteredOnInit() throws Exception { - verify(mockEventBus, never()).subscribe(any()); + verify(mockEventBus, never()).subscribe(eventProcessor); testSubject.afterPropertiesSet(); - verify(mockEventBus).subscribe(any()); + verify(mockEventBus).subscribe(eventProcessor); } @SuppressWarnings({"unchecked"}) diff --git a/spring/src/test/java/org/axonframework/spring/messaging/eventbus/SpringMessagingEventBusTest.java b/spring/src/test/java/org/axonframework/spring/messaging/eventbus/SpringMessagingEventBusTest.java index 051caa1ddc..925fd325d2 100644 --- a/spring/src/test/java/org/axonframework/spring/messaging/eventbus/SpringMessagingEventBusTest.java +++ b/spring/src/test/java/org/axonframework/spring/messaging/eventbus/SpringMessagingEventBusTest.java @@ -48,14 +48,14 @@ public void setUp() { @Test public void testSubscribeListener() { - testSubject.subscribe(mockEventProcessor); + testSubject.subscribe(eventProcessor); verify(mockChannel).subscribe(isA(MessageHandler.class)); } @Test public void testUnsubscribeListener() throws Exception { - Registration subscription = testSubject.subscribe(mockEventProcessor); + Registration subscription = testSubject.subscribe(eventProcessor); subscription.close(); verify(mockChannel).unsubscribe(isA(MessageHandler.class)); @@ -63,7 +63,7 @@ public void testUnsubscribeListener() throws Exception { @Test public void testUnsubscribeListener_UnsubscribedTwice() throws Exception { - Registration subscription = testSubject.subscribe(mockEventProcessor); + Registration subscription = testSubject.subscribe(eventProcessor); subscription.close(); subscription.close(); @@ -73,8 +73,8 @@ public void testUnsubscribeListener_UnsubscribedTwice() throws Exception { @Test public void testSubscribeListener_SubscribedTwice() { - testSubject.subscribe(mockEventProcessor); - testSubject.subscribe(mockEventProcessor); + testSubject.subscribe(eventProcessor); + testSubject.subscribe(eventProcessor); verify(mockChannel).subscribe(isA(MessageHandler.class)); } diff --git a/test/src/main/java/org/axonframework/test/GivenWhenThenTestFixture.java b/test/src/main/java/org/axonframework/test/GivenWhenThenTestFixture.java index 6ec0c758ba..c8cadf8e93 100644 --- a/test/src/main/java/org/axonframework/test/GivenWhenThenTestFixture.java +++ b/test/src/main/java/org/axonframework/test/GivenWhenThenTestFixture.java @@ -28,7 +28,6 @@ import org.axonframework.eventhandling.AbstractEventBus; import org.axonframework.eventhandling.EventBus; import org.axonframework.eventhandling.EventMessage; -import org.axonframework.eventhandling.EventProcessor; import org.axonframework.eventsourcing.AggregateFactory; import org.axonframework.eventsourcing.DomainEventMessage; import org.axonframework.eventsourcing.EventSourcingRepository; @@ -52,6 +51,7 @@ import java.lang.reflect.Modifier; import java.util.*; import java.util.concurrent.Callable; +import java.util.function.Consumer; import static java.lang.String.format; import static java.util.stream.Collectors.toList; @@ -528,7 +528,7 @@ protected void prepareCommit(List> events) { } @Override - public Registration subscribe(EventProcessor eventListener) { + public Registration subscribe(Consumer>> eventProcessor) { return () -> true; } } diff --git a/test/src/main/java/org/axonframework/test/saga/EventValidator.java b/test/src/main/java/org/axonframework/test/saga/EventValidator.java index eb1c5400df..99809c9743 100644 --- a/test/src/main/java/org/axonframework/test/saga/EventValidator.java +++ b/test/src/main/java/org/axonframework/test/saga/EventValidator.java @@ -19,7 +19,6 @@ import org.axonframework.eventhandling.EventBus; import org.axonframework.eventhandling.EventListener; import org.axonframework.eventhandling.EventMessage; -import org.axonframework.eventhandling.SimpleEventProcessor; import org.axonframework.test.AxonAssertionError; import org.axonframework.test.matchers.FieldFilter; import org.hamcrest.Matcher; @@ -96,7 +95,7 @@ public void handle(EventMessage event) { * Starts recording event published by the event bus. */ public void startRecording() { - eventBus.subscribe(new SimpleEventProcessor("recorder", this)); + eventBus.subscribe(eventProcessor); } @SuppressWarnings({"unchecked"})