From 4887c895a32c2debebb56682f92fb820fd2d7510 Mon Sep 17 00:00:00 2001 From: Christophe Bouhier Date: Wed, 19 Apr 2017 15:51:36 +0200 Subject: [PATCH] Issue #221 added multithreading tests --- ...MessageSequenceNumberSequentialPolicy.java | 24 ++ .../SegmentingPerAggregatePolicy.java | 29 -- .../eventhandling/TrackingEventProcessor.java | 140 ++++--- .../eventhandling/SegmentTest.java | 8 +- .../TrackingEventMultiProcessorTest.java | 390 ++++++++++++++++++ .../TrackingEventProcessorTest.java | 46 +-- 6 files changed, 508 insertions(+), 129 deletions(-) create mode 100644 core/src/main/java/org/axonframework/eventhandling/MessageSequenceNumberSequentialPolicy.java delete mode 100644 core/src/main/java/org/axonframework/eventhandling/SegmentingPerAggregatePolicy.java create mode 100644 core/src/test/java/org/axonframework/eventhandling/TrackingEventMultiProcessorTest.java diff --git a/core/src/main/java/org/axonframework/eventhandling/MessageSequenceNumberSequentialPolicy.java b/core/src/main/java/org/axonframework/eventhandling/MessageSequenceNumberSequentialPolicy.java new file mode 100644 index 0000000000..6e391d8ee5 --- /dev/null +++ b/core/src/main/java/org/axonframework/eventhandling/MessageSequenceNumberSequentialPolicy.java @@ -0,0 +1,24 @@ +package org.axonframework.eventhandling; + +import org.axonframework.eventhandling.async.SequencingPolicy; +import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy; +import org.axonframework.eventsourcing.DomainEventMessage; + +import java.util.Objects; +import java.util.UUID; + +/** + * A policy which guarantees a unique Identifier. + * + * @author Christophe Bouhier + */ +public class MessageSequenceNumberSequentialPolicy implements SequencingPolicy { + + @Override + public Object getSequenceIdentifierFor(EventMessage event) { + if(event instanceof DomainEventMessage){ + return ((DomainEventMessage) event).getSequenceNumber(); + } + return null; + } +} \ No newline at end of file diff --git a/core/src/main/java/org/axonframework/eventhandling/SegmentingPerAggregatePolicy.java b/core/src/main/java/org/axonframework/eventhandling/SegmentingPerAggregatePolicy.java deleted file mode 100644 index cc3a072f21..0000000000 --- a/core/src/main/java/org/axonframework/eventhandling/SegmentingPerAggregatePolicy.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.axonframework.eventhandling; - -import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy; - -import java.util.Objects; -import java.util.UUID; - -/** - * @author Christophe Bouhier - */ -public class SegmentingPerAggregatePolicy extends SequentialPerAggregatePolicy { - - protected long toLong(String uuidAsString) { - final UUID uuid = UUID.fromString(uuidAsString); - return uuid.getLeastSignificantBits(); - } - - public boolean matches(Segment segment, TrackedEventMessage trackedEventMessage) { - final Object sequenceIdentifierFor = getSequenceIdentifierFor(trackedEventMessage); - if (Objects.nonNull(sequenceIdentifierFor)) { - if (sequenceIdentifierFor instanceof String) { - final long segmentIdentifierAsLong = toLong((String) sequenceIdentifierFor); - return segment.matches(segmentIdentifierAsLong); - } - } - // when null, or when not matching the provided segment. - return false; - } -} diff --git a/core/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java b/core/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java index e2488f9d72..b380462d26 100644 --- a/core/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java +++ b/core/src/main/java/org/axonframework/eventhandling/TrackingEventProcessor.java @@ -40,6 +40,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import static java.util.Objects.requireNonNull; import static org.axonframework.common.io.IOUtils.closeQuietly; @@ -113,7 +114,7 @@ public TrackingEventProcessor(String name, EventHandlerInvoker eventHandlerInvok StreamableMessageSource> messageSource, TokenStore tokenStore, TransactionManager transactionManager, int batchSize) { this(name, eventHandlerInvoker, RollbackConfigurationType.ANY_THROWABLE, PropagatingErrorHandler.INSTANCE, - messageSource, tokenStore, new SegmentingPerAggregatePolicy(), transactionManager, batchSize, NoOpMessageMonitor.INSTANCE); + messageSource, tokenStore, new SequentialPerAggregatePolicy(), transactionManager, batchSize, NoOpMessageMonitor.INSTANCE); } /** @@ -135,7 +136,7 @@ public TrackingEventProcessor(String name, EventHandlerInvoker eventHandlerInvok TransactionManager transactionManager, MessageMonitor> messageMonitor) { this(name, eventHandlerInvoker, RollbackConfigurationType.ANY_THROWABLE, PropagatingErrorHandler.INSTANCE, - messageSource, tokenStore, new SegmentingPerAggregatePolicy(), transactionManager, 1, messageMonitor); + messageSource, tokenStore, new SequentialPerAggregatePolicy(), transactionManager, 1, messageMonitor); } /** @@ -149,7 +150,7 @@ public TrackingEventProcessor(String name, EventHandlerInvoker eventHandlerInvok * @param messageSource The message source (e.g. Event Bus) which this event processor will track * @param tokenStore Used to store and fetch event tokens that enable the processor to track its * progress - * @param sequentialPolicy + * @param sequentialPolicy The policy which will determine the segmentation identifier. * @param transactionManager The transaction manager used when processing messages * @param batchSize The maximum number of events to process in a single batch * @param messageMonitor Monitor to be invoked before and after event processing @@ -183,16 +184,6 @@ public void start() { if (!previousState.isRunning()) { ensureRunningExecutor(); processingStrategy.startSegmentWorkers(); - // Delegate to strategy to dispatch workers. - -// executorService.submit(() -> { -// try { -// this.processingLoop(); -// } catch (Throwable e) { -// logger.error("Processing loop ended due to uncaught exception. Processor pausing.", e); -// state.set(State.PAUSED_ERROR); -// } -// }); } } @@ -217,16 +208,6 @@ protected void processingLoop(TrackingSegmentWorker segmentWorker) { long errorWaitTime = 1; try { while (state.get().isRunning()) { - - try{ - if(segmentWorker.isDispatcher()){ - processingStrategy.optimizeWorkers(); - } - }catch(Exception e){ - - } - - try { eventStream = ensureEventStreamOpened(eventStream); processBatch(segmentWorker, eventStream); @@ -245,7 +226,6 @@ protected void processingLoop(TrackingSegmentWorker segmentWorker) { } catch (InterruptedException e1) { Thread.currentThread().interrupt(); logger.warn("Thread interrupted. Preparing to shut down event processor"); - // TODO, when in a worker thread, we can't shutdown the whole processing... shutDown(); } errorWaitTime = Math.min(errorWaitTime * 2, 60); @@ -266,18 +246,15 @@ private void releaseToken(Segment segment) { } private void processBatch(TrackingSegmentWorker segmentWorker, MessageStream> eventStream) throws Exception { + + final Predicate> trackedEventMessagePredicate = matchesSegmentPredicate(segmentWorker.getSegment()); + List> batch = new ArrayList<>(); try { if (eventStream.hasNextAvailable(1, TimeUnit.SECONDS)) { while (batch.size() < batchSize && eventStream.hasNextAvailable()) { - final TrackedEventMessage trackedEventMessage = eventStream.nextAvailable(); - - if (sequentialPolicy instanceof SegmentingPerAggregatePolicy) { - if (((SegmentingPerAggregatePolicy) sequentialPolicy).matches(segmentWorker.getSegment(), trackedEventMessage)) { - batch.add(trackedEventMessage); - } - } else { + if (trackedEventMessagePredicate.test(trackedEventMessage)) { batch.add(trackedEventMessage); } } @@ -293,8 +270,10 @@ private void processBatch(TrackingSegmentWorker segmentWorker, MessageStream lastToken.equals(event.trackingToken())).isPresent()) { - // TODO, support sequential policy here as well. - batch.add(eventStream.nextAvailable()); + final TrackedEventMessage trackedEventMessage = eventStream.nextAvailable(); + if (trackedEventMessagePredicate.test(trackedEventMessage)) { + batch.add(trackedEventMessage); + } } process(batch); @@ -302,11 +281,37 @@ private void processBatch(TrackingSegmentWorker segmentWorker, MessageStreamtrue when the tested {@link TrackedEventMessage} matches with the given {@link Segment}. + *
+ * Implementation note: + * This policy matches sequence identifier of the message with the segment. + *
+ * + * @param segment The {@link Segment#matches(long)} method is called. + * @return + */ + protected Predicate> matchesSegmentPredicate(Segment segment) { + + return message -> { + final Object sequenceIdentifierFor = sequentialPolicy.getSequenceIdentifierFor(message); + if (Objects.nonNull(sequenceIdentifierFor)) { + final Long numericIdentifier = (long) Objects.hashCode(sequenceIdentifierFor); + final boolean matches = segment.matches(numericIdentifier); + logger.info("{} policy for segment: {}, with identifier (binary): {}", matches ? "Matching" : "Not Matching", + segment.getSegmentId(), Integer.toBinaryString(numericIdentifier.intValue())); + return matches; + } + // when null, or when not matching the provided segment. + // Review: When segment ID is 0, consider returning true? + return false; + }; + } + private MessageStream> ensureEventStreamOpened( MessageStream> eventStreamIn) { MessageStream> eventStream = eventStreamIn; @@ -426,9 +431,7 @@ public void setLastToken(TrackingToken lastToken) { public void run() { try { processingLoop(this); -// System.out.println("processing loop done"); } catch (Throwable e) { - // TODO state change on a single worker... logger.error("Processing loop ended due to uncaught exception. Processor pausing.", e); state.set(State.PAUSED_ERROR); } @@ -455,6 +458,8 @@ public boolean isDispatcher() { public class AsyncTrackingEventProcessingStrategy { int[] storedSegments; + boolean threadPoolChanged = false; + public void startSegmentWorkers() { @@ -464,45 +469,74 @@ public void startSegmentWorkers() { // TODO, algo to determine the segmentation, We have 3 possible cases. // 1. segment == thread max pool size => One thread per segment. // 2. segment > thread max pool size => handle multiple identifiers.. - // 3. segment < thread max pool size => Start splitting segments, accross number of available threads. + // 3. segment < thread max pool size => Start splitting segments, across number of available threads. // Note, when the SequentialPolicy is not capable to distinguish messages, there is no point in segmenting the stream of messages! // Currently these two constructs are not synched. - executorService.getCorePoolSize(); - storedSegments = tokenStore.fetchSegments(getName()); Segment[] segments = Segment.computeSegments(storedSegments); - if(segments.length == 1 && (threadsRemaining() + 1) > segments.length){ - // Split our segment according to the thread count. + // Split the root segment in at least two segments. + if (segments.length == 1 && (threadsRemaining() + 1) > segments.length) { segments = segments[0].split(); } + // Optimize the segments, for the number of threads. + // Note this could be done when running... + if (segments.length > threadsRemaining()) { + // TODO wait for functionality to compare token 'distance'. + // when two tokens are 'adjacent' within the boundaries of the Segment mask, these would be mergeable. + // The worker handling the most recent token, would wait until a candidate merge worker is 'adjacent'. + + // See which segments are mergeable. +// final List sortedByIndexTokens = Stream.of(segments) +// .map(segment -> tokenStore.fetchToken(getName(), segment.getSegmentId())) +// .map(trackingToken -> { +// long index = 0; +// if (trackingToken instanceof GlobalSequenceTrackingToken) { +// index = ((GlobalSequenceTrackingToken) trackingToken).getGlobalIndex(); +// }else if(trackingToken instanceof GapAwareTrackingToken){ +// index = ((GapAwareTrackingToken) trackingToken).getIndex(); +// } +// return index; +// }) +// .sorted() +// .collect(Collectors.toList()); + } + TrackingSegmentWorker workingInCurrentThread = null; for (Segment s : segments) { - final TrackingSegmentWorker trackingSegmentWorker = new TrackingSegmentWorker(s); + TrackingSegmentWorker trackingSegmentWorker = new TrackingSegmentWorker(s); + // TODO Do we de-register properly.... registerInterceptor(trackingSegmentWorker.getEventMessageMessageHandlerInterceptor()); if (threadsRemaining() > 0) { executorService.submit(trackingSegmentWorker); } else { - // Our dispatcher becomes a worker....no more dispatching.... - trackingSegmentWorker.run(); + workingInCurrentThread = trackingSegmentWorker; + break; + // Our dispatcher becomes a worker....no more dispatching....which lead to segments not being dispatched, segment > threads. } } + if(Objects.nonNull(workingInCurrentThread)){ + workingInCurrentThread.run(); + } + logger.info("Exiting dispatching thread"); }); } - public int threadsRemaining(){ + public int threadsRemaining() { return executorService.getMaximumPoolSize() - executorService.getActiveCount(); } - public void optimizeWorkers(){ - - final int activeCount = executorService.getActiveCount(); - if( storedSegments.length < activeCount){ - // stop our execution, and re-init with the new executor settings. (core and max pool size). - executorService.shutdown();; - ensureRunningExecutor(); - // do this stuff later. + public void optimizeWorkers() { + if (threadPoolChanged) { + // Any reason... why we should optimize, anything changed to our thread pool? + final int activeCount = executorService.getActiveCount(); + if (storedSegments.length < activeCount) { + // stop our execution, and re-init with the new executor settings. (core and max pool size). + executorService.shutdown(); + ensureRunningExecutor(); + // do this stuff later. + } } } } diff --git a/core/src/test/java/org/axonframework/eventhandling/SegmentTest.java b/core/src/test/java/org/axonframework/eventhandling/SegmentTest.java index 075bed39e8..7b0ebb16c1 100644 --- a/core/src/test/java/org/axonframework/eventhandling/SegmentTest.java +++ b/core/src/test/java/org/axonframework/eventhandling/SegmentTest.java @@ -39,19 +39,19 @@ public class SegmentTest { private static final Logger LOG = LoggerFactory.getLogger(SegmentTest.class); private List domainEventMessages; - private SegmentingPerAggregatePolicy sequentialPolicy; @Before public void before() { domainEventMessages = produceEvents(); - - sequentialPolicy = new SegmentingPerAggregatePolicy(); } @Test public void testSegmentSplitAddsUp() { - final List identifiers = domainEventMessages.stream().map(de -> sequentialPolicy.toLong(de.getAggregateIdentifier())).collect(Collectors.toList()); + final List identifiers = domainEventMessages.stream().map(de -> { + final String aggregateIdentifier = de.getAggregateIdentifier(); + return UUID.fromString(aggregateIdentifier).getLeastSignificantBits(); + }).collect(Collectors.toList()); // segment 0, mask 0; final long count = identifiers.stream().filter(Segment.ROOT_SEGMENT::matches).count(); diff --git a/core/src/test/java/org/axonframework/eventhandling/TrackingEventMultiProcessorTest.java b/core/src/test/java/org/axonframework/eventhandling/TrackingEventMultiProcessorTest.java new file mode 100644 index 0000000000..23cb4a5cff --- /dev/null +++ b/core/src/test/java/org/axonframework/eventhandling/TrackingEventMultiProcessorTest.java @@ -0,0 +1,390 @@ +/* + * Copyright (c) 2010-2017. Axon Framework + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.eventhandling; + +import org.axonframework.common.MockException; +import org.axonframework.common.transaction.NoTransactionManager; +import org.axonframework.eventhandling.tokenstore.TokenStore; +import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore; +import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore; +import org.axonframework.eventsourcing.eventstore.GlobalSequenceTrackingToken; +import org.axonframework.eventsourcing.eventstore.TrackingEventStream; +import org.axonframework.eventsourcing.eventstore.TrackingToken; +import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; +import org.axonframework.messaging.unitofwork.RollbackConfigurationType; +import org.axonframework.monitoring.NoOpMessageMonitor; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.theories.Theories; +import org.junit.runner.RunWith; +import org.springframework.test.annotation.DirtiesContext; + +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; +import static junit.framework.TestCase.*; +import static org.axonframework.eventsourcing.eventstore.EventStoreTestUtils.createEvent; +import static org.axonframework.eventsourcing.eventstore.EventStoreTestUtils.createEvents; +import static org.axonframework.eventsourcing.eventstore.EventUtils.asTrackedEventMessage; +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.*; + +/** + * @author Christophe Bouhier + */ +public class TrackingEventMultiProcessorTest { + + private TrackingEventProcessor testSubject; + private EmbeddedEventStore eventBus; + private TokenStore tokenStore; + private EventHandlerInvoker eventHandlerInvoker; + private EventListener mockListener; + + private static TrackingEventStream trackingEventStreamOf(Iterator> iterator) { + return new TrackingEventStream() { + private boolean hasPeeked; + private TrackedEventMessage peekEvent; + + @Override + public Optional> peek() { + if (!hasPeeked) { + if (!hasNextAvailable()) { + return Optional.empty(); + } + peekEvent = iterator.next(); + hasPeeked = true; + } + return Optional.of(peekEvent); + } + + @Override + public boolean hasNextAvailable(int timeout, TimeUnit unit) { + return hasPeeked || iterator.hasNext(); + } + + @Override + public TrackedEventMessage nextAvailable() { + if (!hasPeeked) { + return iterator.next(); + } + TrackedEventMessage result = peekEvent; + peekEvent = null; + hasPeeked = false; + return result; + } + + @Override + public void close() { + + } + }; + } + + @Before + public void setUp() throws Exception { + tokenStore = spy(new InMemoryTokenStore()); + mockListener = mock(EventListener.class); + eventHandlerInvoker = new SimpleEventHandlerInvoker(mockListener); + eventBus = new EmbeddedEventStore(new InMemoryEventStorageEngine()); + + // A processor, with a policy which guarantees segmenting with + testSubject = new TrackingEventProcessor("test", eventHandlerInvoker, RollbackConfigurationType.ANY_THROWABLE, PropagatingErrorHandler.INSTANCE, + eventBus, tokenStore, new MessageSequenceNumberSequentialPolicy(), NoTransactionManager.INSTANCE, 1, NoOpMessageMonitor.INSTANCE); + testSubject.tweakThreadPool(2, 2); + } + + @After + public void tearDown() throws Exception { + testSubject.shutDown(); + eventBus.shutDown(); + } + + @Test + @DirtiesContext + public void testProcessorWorkerCount() throws InterruptedException { + testSubject.start(); + // give it some time to split segments from the store and submit to executor service. + Thread.sleep(200); + assertThat(testSubject.activeProcessorThreads(), is(2)); + } + + @Test + @DirtiesContext + public void testProcessorWorkerCountWithMultipleSegments() throws InterruptedException { + + tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0); + tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1); + + testSubject.start(); + // give it some time to split segments from the store and submit to executor service. + Thread.sleep(200); + assertThat(testSubject.activeProcessorThreads(), is(2)); + } + + @Test + @DirtiesContext + public void testProcessorWorkerCountWithMultipleSegmentsWithOneThread() throws InterruptedException { + + tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0); + tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1); + + testSubject.tweakThreadPool(1,1); + testSubject.start(); + + // give it some time to split segments from the store and submit to executor service. + Thread.sleep(200); + assertThat(testSubject.activeProcessorThreads(), is(1)); + } + + + @Test + @DirtiesContext + public void testMultiThreadedPublishedEventsGetPassedToListener() throws Exception { + CountDownLatch countDownLatch = new CountDownLatch(2); + final AcknowledgeByThread acknowledgeByThread = new AcknowledgeByThread(); + doAnswer(invocation -> { + acknowledgeByThread.addMessage(Thread.currentThread(),(EventMessage) invocation.getArguments()[0]); + countDownLatch.countDown(); + return null; + }).when(mockListener).handle(any()); + testSubject.start(); + eventBus.publish(createEvents(2)); + assertTrue("Expected listener to have received 2 published events", countDownLatch.await(5, TimeUnit.SECONDS)); + acknowledgeByThread.assertEventsAckedByMultipleThreads(); + acknowledgeByThread.assertEventsAddUpTo(2); + } + + @Test + @DirtiesContext + public void testMultiThreadedTokenIsStoredWhenEventIsRead() throws Exception { + + CountDownLatch countDownLatch = new CountDownLatch(2); + testSubject.registerInterceptor(((unitOfWork, interceptorChain) -> { + unitOfWork.onCleanup(uow -> countDownLatch.countDown()); + return interceptorChain.proceed(); + })); + testSubject.start(); + eventBus.publish(createEvents(2)); + assertTrue("Expected Unit of Work to have reached clean up phase", countDownLatch.await(5, TimeUnit.SECONDS)); +// verify(tokenStore).storeToken(any(), any(), anyInt()); + IntStream.of(tokenStore.fetchSegments("test")).forEach(System.out::println); + assertThat(tokenStore.fetchToken(testSubject.getName(), 0), notNullValue()); + assertThat(tokenStore.fetchToken(testSubject.getName(), 1), notNullValue()); + } + + @Test + @DirtiesContext + public void testMultiThreadedTokenIsNotStoredWhenUnitOfWorkIsRolledBack() throws Exception { + CountDownLatch countDownLatch = new CountDownLatch(1); + testSubject.registerInterceptor(((unitOfWork, interceptorChain) -> { + unitOfWork.onCommit(uow -> { + throw new MockException(); + }); + return interceptorChain.proceed(); + })); + testSubject.registerInterceptor(((unitOfWork, interceptorChain) -> { + unitOfWork.onCleanup(uow -> countDownLatch.countDown()); + return interceptorChain.proceed(); + })); + testSubject.start(); + eventBus.publish(createEvent()); + assertTrue("Expected Unit of Work to have reached clean up phase", countDownLatch.await(5, TimeUnit.SECONDS)); + assertNull(tokenStore.fetchToken(testSubject.getName(), 0)); + assertNull(tokenStore.fetchToken(testSubject.getName(), 1)); + } + + @Test + @DirtiesContext + public void testMultiThreadContinueFromPreviousToken() throws Exception { + + tokenStore = spy(new InMemoryTokenStore()); + eventBus.publish(createEvents(10)); + TrackedEventMessage firstEvent = eventBus.openStream(null).nextAvailable(); + tokenStore.storeToken(firstEvent.trackingToken(), testSubject.getName(), 0); + assertEquals(firstEvent.trackingToken(), tokenStore.fetchToken(testSubject.getName(), 0)); + + + final AcknowledgeByThread acknowledgeByThread = new AcknowledgeByThread(); + CountDownLatch countDownLatch = new CountDownLatch(9); + doAnswer(invocation -> { + acknowledgeByThread.addMessage(Thread.currentThread(),(EventMessage) invocation.getArguments()[0]); + countDownLatch.countDown(); + return null; + }).when(mockListener).handle(any()); + + testSubject = new TrackingEventProcessor("test", eventHandlerInvoker, RollbackConfigurationType.ANY_THROWABLE, PropagatingErrorHandler.INSTANCE, + eventBus, tokenStore, new MessageSequenceNumberSequentialPolicy(), NoTransactionManager.INSTANCE, 1, NoOpMessageMonitor.INSTANCE); + + testSubject.start(); + + assertTrue("Expected 9 invocations on event listener by now", countDownLatch.await(60, TimeUnit.SECONDS)); + + acknowledgeByThread.assertEventsAckedByMultipleThreads(); + acknowledgeByThread.assertEventsAddUpTo(9); + } + +// @Test(timeout = 10000) + @Test + @DirtiesContext + public void testMultiContinueAfterPause() throws Exception { + + final AcknowledgeByThread acknowledgeByThread = new AcknowledgeByThread(); + + CountDownLatch countDownLatch = new CountDownLatch(2); + doAnswer(invocation -> { + acknowledgeByThread.addMessage(Thread.currentThread(), (EventMessage) invocation.getArguments()[0]); + countDownLatch.countDown(); + return null; + }).when(mockListener).handle(any()); + testSubject.start(); + + eventBus.publish(createEvents(2)); + + assertTrue("Expected 2 invocations on event listener by now", countDownLatch.await(5, TimeUnit.SECONDS)); + acknowledgeByThread.assertEventsAddUpTo(2); + + testSubject.pause(); + // The thread may block for 1 second waiting for a next event to pop up + while (testSubject.activeProcessorThreads() > 0) { + Thread.sleep(1); + // wait... + } + + CountDownLatch countDownLatch2 = new CountDownLatch(1); + doAnswer(invocation -> { + acknowledgeByThread.addMessage(Thread.currentThread(),(EventMessage) invocation.getArguments()[0]); + countDownLatch2.countDown(); + return null; + }).when(mockListener).handle(any()); + + eventBus.publish(createEvents(2)); + + assertEquals(1, countDownLatch2.getCount()); + + testSubject.start(); + assertTrue("Expected 4 invocations on event listener by now", countDownLatch2.await(5, TimeUnit.SECONDS)); + acknowledgeByThread.assertEventsAddUpTo(4); + + // batch size = 1 + verify(tokenStore, times(4)).storeToken(any(), anyString(), anyInt()); + } + + @Test + @DirtiesContext + public void testMultiProcessorGoesToRetryModeWhenOpenStreamFails() throws Exception { + eventBus = spy(eventBus); + + tokenStore = new InMemoryTokenStore(); + eventBus.publish(createEvents(5)); + when(eventBus.openStream(any())).thenThrow(new MockException()).thenCallRealMethod(); + + final AcknowledgeByThread acknowledgeByThread = new AcknowledgeByThread(); + CountDownLatch countDownLatch = new CountDownLatch(5); + doAnswer(invocation -> { + acknowledgeByThread.addMessage(Thread.currentThread(),(EventMessage) invocation.getArguments()[0]); + countDownLatch.countDown(); + return null; + }).when(mockListener).handle(any()); + + testSubject = new TrackingEventProcessor("test", eventHandlerInvoker, eventBus, tokenStore, NoTransactionManager.INSTANCE); + testSubject.start(); + assertTrue("Expected 5 invocations on event listener by now", countDownLatch.await(10, TimeUnit.SECONDS)); + acknowledgeByThread.assertEventsAddUpTo(5); + verify(eventBus, times(2)).openStream(any()); + } + + @Test + public void testFirstTokenIsStoredWhenUnitOfWorkIsRolledBackOnSecondEvent() throws Exception { + List> events = createEvents(2); + CountDownLatch countDownLatch = new CountDownLatch(2); + testSubject.registerInterceptor(((unitOfWork, interceptorChain) -> { + unitOfWork.onCommit(uow -> { + if (uow.getMessage().equals(events.get(1))) { + throw new MockException(); + } + }); + return interceptorChain.proceed(); + })); + testSubject.registerInterceptor(((unitOfWork, interceptorChain) -> { + unitOfWork.onCleanup(uow -> countDownLatch.countDown()); + return interceptorChain.proceed(); + })); + testSubject.start(); + eventBus.publish(events); + assertTrue("Expected Unit of Work to have reached clean up phase", countDownLatch.await(5, TimeUnit.SECONDS)); + verify(tokenStore, atLeastOnce()).storeToken(any(), any(), anyInt()); + assertNotNull(tokenStore.fetchToken(testSubject.getName(), 0)); + } + + @Test + @DirtiesContext + @SuppressWarnings("unchecked") + public void testEventsWithTheSameTokenAreProcessedInTheSameBatch() throws Exception { + eventBus.shutDown(); + + eventBus = mock(EmbeddedEventStore.class); + TrackingToken trackingToken = new GlobalSequenceTrackingToken(0); + List> events = + createEvents(2).stream().map(event -> asTrackedEventMessage(event, trackingToken)).collect(toList()); + when(eventBus.openStream(null)).thenReturn(trackingEventStreamOf(events.iterator())); + testSubject = new TrackingEventProcessor("test", eventHandlerInvoker, eventBus, tokenStore, NoTransactionManager.INSTANCE); + + testSubject.registerInterceptor(((unitOfWork, interceptorChain) -> { + unitOfWork.onCommit(uow -> { + if (uow.getMessage().equals(events.get(1))) { + throw new MockException(); + } + }); + return interceptorChain.proceed(); + })); + + CountDownLatch countDownLatch = new CountDownLatch(2); + + testSubject.registerInterceptor(((unitOfWork, interceptorChain) -> { + unitOfWork.onCleanup(uow -> countDownLatch.countDown()); + return interceptorChain.proceed(); + })); + + testSubject.start(); + + assertTrue("Expected Unit of Work to have reached clean up phase", countDownLatch.await(5, TimeUnit.SECONDS)); + verify(tokenStore, atLeastOnce()).storeToken(any(), any(), anyInt()); + assertNull(tokenStore.fetchToken(testSubject.getName(), 0)); + } + + // Utility to add up acknowledged messages by Thread (worker) name and assertions facilities. + class AcknowledgeByThread { + Map>> ackedEventsByThreadMap = new HashMap<>(); + + void addMessage(Thread handlingThread, EventMessage msg) { + ackedEventsByThreadMap.computeIfAbsent(handlingThread.getName(), k -> new ArrayList<>()).add(msg); + } + + void assertEventsAckedByMultipleThreads() { + ackedEventsByThreadMap.values().forEach(l -> assertThat(l.isEmpty(), is(false))); + } + + void assertEventsAddUpTo(int eventCount) { + assertThat(ackedEventsByThreadMap.values().stream().mapToLong(Collection::size).sum(), is(new Integer(eventCount).longValue())); + } + } + +} \ No newline at end of file diff --git a/core/src/test/java/org/axonframework/eventhandling/TrackingEventProcessorTest.java b/core/src/test/java/org/axonframework/eventhandling/TrackingEventProcessorTest.java index ed96e23824..9cab96e0e7 100644 --- a/core/src/test/java/org/axonframework/eventhandling/TrackingEventProcessorTest.java +++ b/core/src/test/java/org/axonframework/eventhandling/TrackingEventProcessorTest.java @@ -27,7 +27,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import org.springframework.test.annotation.DirtiesContext; import java.util.ArrayList; @@ -41,7 +40,6 @@ import static junit.framework.TestCase.*; import static org.axonframework.eventsourcing.eventstore.EventStoreTestUtils.createEvent; import static org.axonframework.eventsourcing.eventstore.EventStoreTestUtils.createEvents; -import static org.axonframework.eventsourcing.eventstore.EventStoreTestUtils.createUUIDEvents; import static org.axonframework.eventsourcing.eventstore.EventUtils.asTrackedEventMessage; import static org.mockito.Mockito.*; @@ -176,7 +174,6 @@ public void testContinueFromPreviousToken() throws Exception { }).when(mockListener).handle(any()); testSubject = new TrackingEventProcessor("test", eventHandlerInvoker, eventBus, tokenStore, NoTransactionManager.INSTANCE); - // Upgrade our thread pool, so the dispatcher is not used as worker. testSubject.start(); assertTrue("Expected 9 invocations on event listener by now", countDownLatch.await(5, TimeUnit.SECONDS)); assertEquals(9, ackedEvents.size()); @@ -310,6 +307,9 @@ public void testEventsWithTheSameTokenAreProcessedInTheSameBatch() throws Except @Test public void testEventProcessorIsReEntrant() throws Exception { + testSubject.start(); + assertTrue("TrackingEventProcessor is not started", testSubject.getState() == TrackingEventProcessor.State.STARTED); + testSubject.shutDown(); testSubject.start(); CountDownLatch countDownLatch2 = new CountDownLatch(2); @@ -317,48 +317,8 @@ public void testEventProcessorIsReEntrant() throws Exception { countDownLatch2.countDown(); return null; }).when(mockListener).handle(any()); - eventBus.publish(createEvents(2)); assertTrue("Expected listener to have received 2 published events", countDownLatch2.await(5, TimeUnit.SECONDS)); - } - - - // Multicore tests... - - @Test - @DirtiesContext - public void testTwoThreadsContinueFromPreviousToken() throws Exception { - - tokenStore = spy(new InMemoryTokenStore()); - eventBus.publish(createUUIDEvents(10)); - TrackedEventMessage firstEvent = eventBus.openStream(null).nextAvailable(); - tokenStore.storeToken(firstEvent.trackingToken(), testSubject.getName(), 0); - assertEquals(firstEvent.trackingToken(), tokenStore.fetchToken(testSubject.getName(), 0)); - - List> ackedEventsThread0 = new ArrayList<>(); - List> ackedEventsThread1 = new ArrayList<>(); - CountDownLatch countDownLatch = new CountDownLatch(9); - doAnswer(invocation -> { - final EventMessage eventMessage = (EventMessage) invocation.getArguments()[0]; - final Thread thread = Thread.currentThread(); - if(thread.getName().equals("TrackingEventProcessor - test-0")){ - ackedEventsThread0.add(eventMessage); - }else if(thread.getName().equals("TrackingEventProcessor - test-1")){ - ackedEventsThread1.add(eventMessage); - } - countDownLatch.countDown(); - return null; - }).when(mockListener).handle(any()); - - testSubject = new TrackingEventProcessor("test", eventHandlerInvoker, eventBus, tokenStore, NoTransactionManager.INSTANCE); - // Upgrade our thread pool, so we auto-split in two segments. - testSubject.tweakThreadPool(2,2); - testSubject.start(); - assertTrue("Expected 9 invocations on event listener by now", countDownLatch.await(60, TimeUnit.SECONDS)); - assertEquals(9, ackedEventsThread0.size() + ackedEventsThread1.size()); - } - - }