Skip to content

Commit

Permalink
Issue #221 added multithreading tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Christophe Bouhier committed Apr 19, 2017
1 parent f8e637b commit 4887c89
Show file tree
Hide file tree
Showing 6 changed files with 508 additions and 129 deletions.
@@ -0,0 +1,24 @@
package org.axonframework.eventhandling;

import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.eventsourcing.DomainEventMessage;

import java.util.Objects;
import java.util.UUID;

/**
* A policy which guarantees a unique Identifier.
*
* @author Christophe Bouhier
*/
public class MessageSequenceNumberSequentialPolicy implements SequencingPolicy<EventMessage> {

@Override
public Object getSequenceIdentifierFor(EventMessage event) {
if(event instanceof DomainEventMessage<?>){
return ((DomainEventMessage) event).getSequenceNumber();
}
return null;
}
}

This file was deleted.

Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;


import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.axonframework.common.io.IOUtils.closeQuietly; import static org.axonframework.common.io.IOUtils.closeQuietly;
Expand Down Expand Up @@ -113,7 +114,7 @@ public TrackingEventProcessor(String name, EventHandlerInvoker eventHandlerInvok
StreamableMessageSource<TrackedEventMessage<?>> messageSource, TokenStore tokenStore, StreamableMessageSource<TrackedEventMessage<?>> messageSource, TokenStore tokenStore,
TransactionManager transactionManager, int batchSize) { TransactionManager transactionManager, int batchSize) {
this(name, eventHandlerInvoker, RollbackConfigurationType.ANY_THROWABLE, PropagatingErrorHandler.INSTANCE, this(name, eventHandlerInvoker, RollbackConfigurationType.ANY_THROWABLE, PropagatingErrorHandler.INSTANCE,
messageSource, tokenStore, new SegmentingPerAggregatePolicy(), transactionManager, batchSize, NoOpMessageMonitor.INSTANCE); messageSource, tokenStore, new SequentialPerAggregatePolicy(), transactionManager, batchSize, NoOpMessageMonitor.INSTANCE);
} }


/** /**
Expand All @@ -135,7 +136,7 @@ public TrackingEventProcessor(String name, EventHandlerInvoker eventHandlerInvok
TransactionManager transactionManager, TransactionManager transactionManager,
MessageMonitor<? super EventMessage<?>> messageMonitor) { MessageMonitor<? super EventMessage<?>> messageMonitor) {
this(name, eventHandlerInvoker, RollbackConfigurationType.ANY_THROWABLE, PropagatingErrorHandler.INSTANCE, this(name, eventHandlerInvoker, RollbackConfigurationType.ANY_THROWABLE, PropagatingErrorHandler.INSTANCE,
messageSource, tokenStore, new SegmentingPerAggregatePolicy(), transactionManager, 1, messageMonitor); messageSource, tokenStore, new SequentialPerAggregatePolicy(), transactionManager, 1, messageMonitor);
} }


/** /**
Expand All @@ -149,7 +150,7 @@ public TrackingEventProcessor(String name, EventHandlerInvoker eventHandlerInvok
* @param messageSource The message source (e.g. Event Bus) which this event processor will track * @param messageSource The message source (e.g. Event Bus) which this event processor will track
* @param tokenStore Used to store and fetch event tokens that enable the processor to track its * @param tokenStore Used to store and fetch event tokens that enable the processor to track its
* progress * progress
* @param sequentialPolicy * @param sequentialPolicy The policy which will determine the segmentation identifier.
* @param transactionManager The transaction manager used when processing messages * @param transactionManager The transaction manager used when processing messages
* @param batchSize The maximum number of events to process in a single batch * @param batchSize The maximum number of events to process in a single batch
* @param messageMonitor Monitor to be invoked before and after event processing * @param messageMonitor Monitor to be invoked before and after event processing
Expand Down Expand Up @@ -183,16 +184,6 @@ public void start() {
if (!previousState.isRunning()) { if (!previousState.isRunning()) {
ensureRunningExecutor(); ensureRunningExecutor();
processingStrategy.startSegmentWorkers(); processingStrategy.startSegmentWorkers();
// Delegate to strategy to dispatch workers.

// executorService.submit(() -> {
// try {
// this.processingLoop();
// } catch (Throwable e) {
// logger.error("Processing loop ended due to uncaught exception. Processor pausing.", e);
// state.set(State.PAUSED_ERROR);
// }
// });
} }
} }


Expand All @@ -217,16 +208,6 @@ protected void processingLoop(TrackingSegmentWorker segmentWorker) {
long errorWaitTime = 1; long errorWaitTime = 1;
try { try {
while (state.get().isRunning()) { while (state.get().isRunning()) {

try{
if(segmentWorker.isDispatcher()){
processingStrategy.optimizeWorkers();
}
}catch(Exception e){

}


try { try {
eventStream = ensureEventStreamOpened(eventStream); eventStream = ensureEventStreamOpened(eventStream);
processBatch(segmentWorker, eventStream); processBatch(segmentWorker, eventStream);
Expand All @@ -245,7 +226,6 @@ protected void processingLoop(TrackingSegmentWorker segmentWorker) {
} catch (InterruptedException e1) { } catch (InterruptedException e1) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
logger.warn("Thread interrupted. Preparing to shut down event processor"); logger.warn("Thread interrupted. Preparing to shut down event processor");
// TODO, when in a worker thread, we can't shutdown the whole processing...
shutDown(); shutDown();
} }
errorWaitTime = Math.min(errorWaitTime * 2, 60); errorWaitTime = Math.min(errorWaitTime * 2, 60);
Expand All @@ -266,18 +246,15 @@ private void releaseToken(Segment segment) {
} }


private void processBatch(TrackingSegmentWorker segmentWorker, MessageStream<TrackedEventMessage<?>> eventStream) throws Exception { private void processBatch(TrackingSegmentWorker segmentWorker, MessageStream<TrackedEventMessage<?>> eventStream) throws Exception {

final Predicate<TrackedEventMessage<?>> trackedEventMessagePredicate = matchesSegmentPredicate(segmentWorker.getSegment());

List<TrackedEventMessage<?>> batch = new ArrayList<>(); List<TrackedEventMessage<?>> batch = new ArrayList<>();
try { try {
if (eventStream.hasNextAvailable(1, TimeUnit.SECONDS)) { if (eventStream.hasNextAvailable(1, TimeUnit.SECONDS)) {
while (batch.size() < batchSize && eventStream.hasNextAvailable()) { while (batch.size() < batchSize && eventStream.hasNextAvailable()) {

final TrackedEventMessage<?> trackedEventMessage = eventStream.nextAvailable(); final TrackedEventMessage<?> trackedEventMessage = eventStream.nextAvailable();

if (trackedEventMessagePredicate.test(trackedEventMessage)) {
if (sequentialPolicy instanceof SegmentingPerAggregatePolicy) {
if (((SegmentingPerAggregatePolicy) sequentialPolicy).matches(segmentWorker.getSegment(), trackedEventMessage)) {
batch.add(trackedEventMessage);
}
} else {
batch.add(trackedEventMessage); batch.add(trackedEventMessage);
} }
} }
Expand All @@ -293,20 +270,48 @@ private void processBatch(TrackingSegmentWorker segmentWorker, MessageStream<Tra
segmentWorker.setLastToken(batch.get(batch.size() - 1).trackingToken()); segmentWorker.setLastToken(batch.get(batch.size() - 1).trackingToken());
final TrackingToken lastToken = segmentWorker.getLastToken(); final TrackingToken lastToken = segmentWorker.getLastToken();
while (lastToken != null && eventStream.peek().filter(event -> lastToken.equals(event.trackingToken())).isPresent()) { while (lastToken != null && eventStream.peek().filter(event -> lastToken.equals(event.trackingToken())).isPresent()) {
// TODO, support sequential policy here as well. final TrackedEventMessage<?> trackedEventMessage = eventStream.nextAvailable();
batch.add(eventStream.nextAvailable()); if (trackedEventMessagePredicate.test(trackedEventMessage)) {
batch.add(trackedEventMessage);
}
} }


process(batch); process(batch);


} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error(String.format("Event processor [%s] was interrupted. Shutting down.", getName()), e); logger.error(String.format("Event processor [%s] was interrupted. Shutting down.", getName()), e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
// TODO, when in a worker thread, we can't shutdown the whole processing...
this.shutDown(); this.shutDown();
} }
} }


/**
* When tested this {@link Predicate} returns <code>true</code> when the tested {@link TrackedEventMessage} matches with the given {@link Segment}.
* <br/>
* Implementation note:
* This policy matches sequence identifier of the message with the segment.
* <br/>
*
* @param segment The {@link Segment#matches(long)} method is called.
* @return
*/
protected Predicate<TrackedEventMessage<?>> matchesSegmentPredicate(Segment segment) {

return message -> {
final Object sequenceIdentifierFor = sequentialPolicy.getSequenceIdentifierFor(message);
if (Objects.nonNull(sequenceIdentifierFor)) {
final Long numericIdentifier = (long) Objects.hashCode(sequenceIdentifierFor);
final boolean matches = segment.matches(numericIdentifier);
logger.info("{} policy for segment: {}, with identifier (binary): {}", matches ? "Matching" : "Not Matching",
segment.getSegmentId(), Integer.toBinaryString(numericIdentifier.intValue()));
return matches;
}
// when null, or when not matching the provided segment.
// Review: When segment ID is 0, consider returning true?
return false;
};
}

private MessageStream<TrackedEventMessage<?>> ensureEventStreamOpened( private MessageStream<TrackedEventMessage<?>> ensureEventStreamOpened(
MessageStream<TrackedEventMessage<?>> eventStreamIn) { MessageStream<TrackedEventMessage<?>> eventStreamIn) {
MessageStream<TrackedEventMessage<?>> eventStream = eventStreamIn; MessageStream<TrackedEventMessage<?>> eventStream = eventStreamIn;
Expand Down Expand Up @@ -426,9 +431,7 @@ public void setLastToken(TrackingToken lastToken) {
public void run() { public void run() {
try { try {
processingLoop(this); processingLoop(this);
// System.out.println("processing loop done");
} catch (Throwable e) { } catch (Throwable e) {
// TODO state change on a single worker...
logger.error("Processing loop ended due to uncaught exception. Processor pausing.", e); logger.error("Processing loop ended due to uncaught exception. Processor pausing.", e);
state.set(State.PAUSED_ERROR); state.set(State.PAUSED_ERROR);
} }
Expand All @@ -455,6 +458,8 @@ public boolean isDispatcher() {
public class AsyncTrackingEventProcessingStrategy { public class AsyncTrackingEventProcessingStrategy {


int[] storedSegments; int[] storedSegments;
boolean threadPoolChanged = false;



public void startSegmentWorkers() { public void startSegmentWorkers() {


Expand All @@ -464,45 +469,74 @@ public void startSegmentWorkers() {
// TODO, algo to determine the segmentation, We have 3 possible cases. // TODO, algo to determine the segmentation, We have 3 possible cases.
// 1. segment == thread max pool size => One thread per segment. // 1. segment == thread max pool size => One thread per segment.
// 2. segment > thread max pool size => handle multiple identifiers.. // 2. segment > thread max pool size => handle multiple identifiers..
// 3. segment < thread max pool size => Start splitting segments, accross number of available threads. // 3. segment < thread max pool size => Start splitting segments, across number of available threads.
// Note, when the SequentialPolicy is not capable to distinguish messages, there is no point in segmenting the stream of messages! // Note, when the SequentialPolicy is not capable to distinguish messages, there is no point in segmenting the stream of messages!
// Currently these two constructs are not synched. // Currently these two constructs are not synched.


executorService.getCorePoolSize();

storedSegments = tokenStore.fetchSegments(getName()); storedSegments = tokenStore.fetchSegments(getName());
Segment[] segments = Segment.computeSegments(storedSegments); Segment[] segments = Segment.computeSegments(storedSegments);


if(segments.length == 1 && (threadsRemaining() + 1) > segments.length){ // Split the root segment in at least two segments.
// Split our segment according to the thread count. if (segments.length == 1 && (threadsRemaining() + 1) > segments.length) {
segments = segments[0].split(); segments = segments[0].split();
} }


// Optimize the segments, for the number of threads.
// Note this could be done when running...
if (segments.length > threadsRemaining()) {
// TODO wait for functionality to compare token 'distance'.
// when two tokens are 'adjacent' within the boundaries of the Segment mask, these would be mergeable.
// The worker handling the most recent token, would wait until a candidate merge worker is 'adjacent'.

// See which segments are mergeable.
// final List<TrackingToken> sortedByIndexTokens = Stream.of(segments)
// .map(segment -> tokenStore.fetchToken(getName(), segment.getSegmentId()))
// .map(trackingToken -> {
// long index = 0;
// if (trackingToken instanceof GlobalSequenceTrackingToken) {
// index = ((GlobalSequenceTrackingToken) trackingToken).getGlobalIndex();
// }else if(trackingToken instanceof GapAwareTrackingToken){
// index = ((GapAwareTrackingToken) trackingToken).getIndex();
// }
// return index;
// })
// .sorted()
// .collect(Collectors.toList());
}
TrackingSegmentWorker workingInCurrentThread = null;
for (Segment s : segments) { for (Segment s : segments) {
final TrackingSegmentWorker trackingSegmentWorker = new TrackingSegmentWorker(s); TrackingSegmentWorker trackingSegmentWorker = new TrackingSegmentWorker(s);
// TODO Do we de-register properly....
registerInterceptor(trackingSegmentWorker.getEventMessageMessageHandlerInterceptor()); registerInterceptor(trackingSegmentWorker.getEventMessageMessageHandlerInterceptor());
if (threadsRemaining() > 0) { if (threadsRemaining() > 0) {
executorService.submit(trackingSegmentWorker); executorService.submit(trackingSegmentWorker);
} else { } else {
// Our dispatcher becomes a worker....no more dispatching.... workingInCurrentThread = trackingSegmentWorker;
trackingSegmentWorker.run(); break;
// Our dispatcher becomes a worker....no more dispatching....which lead to segments not being dispatched, segment > threads.
} }
} }
if(Objects.nonNull(workingInCurrentThread)){
workingInCurrentThread.run();
}
logger.info("Exiting dispatching thread");
}); });
} }


public int threadsRemaining(){ public int threadsRemaining() {
return executorService.getMaximumPoolSize() - executorService.getActiveCount(); return executorService.getMaximumPoolSize() - executorService.getActiveCount();
} }


public void optimizeWorkers(){ public void optimizeWorkers() {

if (threadPoolChanged) {
final int activeCount = executorService.getActiveCount(); // Any reason... why we should optimize, anything changed to our thread pool?
if( storedSegments.length < activeCount){ final int activeCount = executorService.getActiveCount();
// stop our execution, and re-init with the new executor settings. (core and max pool size). if (storedSegments.length < activeCount) {
executorService.shutdown();; // stop our execution, and re-init with the new executor settings. (core and max pool size).
ensureRunningExecutor(); executorService.shutdown();
// do this stuff later. ensureRunningExecutor();
// do this stuff later.
}
} }
} }
} }
Expand Down
Expand Up @@ -39,19 +39,19 @@ public class SegmentTest {
private static final Logger LOG = LoggerFactory.getLogger(SegmentTest.class); private static final Logger LOG = LoggerFactory.getLogger(SegmentTest.class);


private List<DomainEventMessage> domainEventMessages; private List<DomainEventMessage> domainEventMessages;
private SegmentingPerAggregatePolicy sequentialPolicy;


@Before @Before
public void before() { public void before() {
domainEventMessages = produceEvents(); domainEventMessages = produceEvents();

sequentialPolicy = new SegmentingPerAggregatePolicy();
} }


@Test @Test
public void testSegmentSplitAddsUp() { public void testSegmentSplitAddsUp() {


final List<Long> identifiers = domainEventMessages.stream().map(de -> sequentialPolicy.toLong(de.getAggregateIdentifier())).collect(Collectors.toList()); final List<Long> identifiers = domainEventMessages.stream().map(de -> {
final String aggregateIdentifier = de.getAggregateIdentifier();
return UUID.fromString(aggregateIdentifier).getLeastSignificantBits();
}).collect(Collectors.toList());


// segment 0, mask 0; // segment 0, mask 0;
final long count = identifiers.stream().filter(Segment.ROOT_SEGMENT::matches).count(); final long count = identifiers.stream().filter(Segment.ROOT_SEGMENT::matches).count();
Expand Down

0 comments on commit 4887c89

Please sign in to comment.