Skip to content

Commit

Permalink
Fixed issue with potential disagreement of SegmentIDs in use
Browse files Browse the repository at this point in the history
When initializing segment IDs, the local representation might have been
different from the representation in the TokenStore. This commit ensures
that the representation in the TokenStore is always leading.

Resolves issue #508
  • Loading branch information
abuijze committed Feb 22, 2018
1 parent 759221a commit f74beae
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 19 deletions.
11 changes: 7 additions & 4 deletions core/src/main/java/org/axonframework/eventhandling/Segment.java
Expand Up @@ -77,14 +77,17 @@ public static Segment[] computeSegments(int... segments) {
* @return a collection of {@link Segment}'s.
*/
public static List<Segment> splitBalanced(Segment segment, int numberOfTimes) {

final LinkedList<Segment> toBeSplit = new LinkedList<>();
final SortedSet<Segment> toBeSplit = new TreeSet<>(Comparator.comparing(Segment::getMask)
.thenComparing(Segment::getSegmentId));
toBeSplit.add(segment);
for (int i = 0; i < numberOfTimes; i++) {
final Segment workingSegment = toBeSplit.pollFirst();
final Segment workingSegment = toBeSplit.first();
toBeSplit.remove(workingSegment);
toBeSplit.addAll(Arrays.asList(workingSegment.split()));
}
return toBeSplit;
ArrayList<Segment> result = new ArrayList<>(toBeSplit);
result.sort(Comparator.comparing(Segment::getSegmentId));
return result;
}

/**
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2017. Axon Framework
* Copyright (c) 2010-2018. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -493,16 +493,17 @@ private class WorkerLauncher implements Runnable {
public void run() {
while (getState().isRunning()) {
String processorName = TrackingEventProcessor.this.getName();
final int[] tokenStoreCurrentSegments = tokenStore.fetchSegments(processorName);
Segment[] segments = Segment.computeSegments(tokenStoreCurrentSegments);
int[] tokenStoreCurrentSegments = tokenStore.fetchSegments(processorName);

// When in an initial stage, split segments to the requested number.
if (tokenStoreCurrentSegments.length == 0 && segments.length == 1 && segments.length < segmentsSize) {
segments = Segment.splitBalanced(segments[0], segmentsSize - 1).toArray(new Segment[segmentsSize]);
transactionManager.executeInTransaction(() -> {
tokenStore.initializeTokenSegments(processorName, segmentsSize);
});
if (tokenStoreCurrentSegments.length == 0 && segmentsSize > 0) {
tokenStoreCurrentSegments = transactionManager.fetchInTransaction(
() -> {
tokenStore.initializeTokenSegments(processorName, segmentsSize);
return tokenStore.fetchSegments(processorName);
});
}
Segment[] segments = Segment.computeSegments(tokenStoreCurrentSegments);

// Submit segmentation workers matching the size of our thread pool (-1 for the current dispatcher).
// Keep track of the last processed segments...
Expand Down
@@ -1,5 +1,6 @@
/*
* Copyright (c) 2010-2017. Axon Framework
* Copyright (c) 2010-2018. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -26,8 +27,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;

Expand All @@ -52,7 +55,7 @@ public void testSegmentSplitAddsUp() {

// segment 0, mask 0;
final long count = identifiers.stream().filter(Segment.ROOT_SEGMENT::matches).count();
assertThat(count, is((long)identifiers.size()));
assertThat(count, is((long) identifiers.size()));

final Segment[] splitSegment = Segment.ROOT_SEGMENT.split();

Expand Down Expand Up @@ -125,10 +128,28 @@ public void testSegmentSplit() {

@Test
public void testSegmentSplitNTimes() {
{
//
final List<Segment> segmentMasks = Segment.splitBalanced(Segment.ROOT_SEGMENT, 5);
assertThat(segmentMasks.size(), is(6));
assertThat(segmentMasks.get(5), equalTo(new Segment(5, 0x7)));
assertThat(segmentMasks.get(0), equalTo(new Segment(0, 0x7)));
assertThat(segmentMasks.get(1), equalTo(new Segment(1, 0x7)));
assertThat(segmentMasks.get(2), equalTo(new Segment(2, 0x3)));
assertThat(segmentMasks.get(3), equalTo(new Segment(3, 0x3)));
assertThat(segmentMasks.get(4), equalTo(new Segment(4, 0x7)));
}
}

final List<Segment> splits = Segment.splitBalanced(Segment.ROOT_SEGMENT, 10);

assertThat(splits.size(), is(11));
@Test
public void testSplitFromRootSegmentAlwaysYieldsSequentialSegmentIds() {
for (int i = 0; i < 500; i++) {
List<Segment> segments = Segment.splitBalanced(Segment.ROOT_SEGMENT, i);
assertEquals(i + 1, segments.size());
for (int j = 0; j < i; j++) {
assertEquals(j, segments.get(j).getSegmentId());
}
}
}

@Test
Expand Down Expand Up @@ -242,6 +263,19 @@ public void testSegmentResolve() {
assertThat(segmentMasks[3].getMask(), is(0x7));
assertThat(segmentMasks[4].getMask(), is(0x7));
}

{
//
final int[] segments = {0, 1, 2, 3, 4, 5};
final Segment[] segmentMasks = Segment.computeSegments(segments);
assertThat(segmentMasks.length, is(6));
assertThat(segmentMasks[0], equalTo(new Segment(0, 0x7)));
assertThat(segmentMasks[1], equalTo(new Segment(1, 0x7)));
assertThat(segmentMasks[2], equalTo(new Segment(2, 0x3)));
assertThat(segmentMasks[3], equalTo(new Segment(3, 0x3)));
assertThat(segmentMasks[4], equalTo(new Segment(4, 0x7)));
assertThat(segmentMasks[5], equalTo(new Segment(5, 0x7)));
}
}

@Test(expected = IllegalArgumentException.class)
Expand All @@ -262,6 +296,16 @@ public void testSegmentSplitOnBoundary() {
assertThat(splitSegment[1].getMask(), is(Integer.MAX_VALUE));
}

@Test
public void testItemsAssignedToOnlyOneSegment() {
for (int j = 0; j < 10; j++) {
List<Segment> segments = Segment.splitBalanced(Segment.ROOT_SEGMENT, ThreadLocalRandom.current().nextInt(50) + 1);
for (int i = 0; i < 100_000; i++) {
String value = UUID.randomUUID().toString();
assertEquals(1, segments.stream().filter(s -> s.matches(value)).count());
}
}
}

private List<DomainEventMessage> produceEvents() {
final ArrayList<DomainEventMessage> events = new ArrayList<>();
Expand All @@ -276,6 +320,6 @@ private List<DomainEventMessage> produceEvents() {

private DomainEventMessage newStubDomainEvent(String aggregateIdentifier) {
return new GenericDomainEventMessage<>("type", aggregateIdentifier, (long) 0,
new Object(), MetaData.emptyInstance());
new Object(), MetaData.emptyInstance());
}
}
Expand Up @@ -112,8 +112,21 @@ public void testProcessorInitializesMoreTokensThanWorkerCount() throws Interrupt
assertArrayEquals(new int[]{0, 1, 2, 3}, actual);
}

// Reproduce issue #508 (https://github.com/AxonFramework/AxonFramework/issues/508)
@Test
public void testProcessorWorkerCountWithMultipleSegments() throws InterruptedException {
public void testProcessorInitializesAndUsesSameTokens() {
configureProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(6)
.andInitialSegmentsCount(6));
testSubject.start();

assertWithin(5, SECONDS, () -> {assertThat(testSubject.activeProcessorThreads(), is(6));});
int[] actual = tokenStore.fetchSegments(testSubject.getName());
Arrays.sort(actual);
assertArrayEquals(new int[]{0, 1, 2, 3, 4, 5}, actual);
}

@Test
public void testProcessorWorkerCountWithMultipleSegments() {

tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0);
tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1);
Expand Down

0 comments on commit f74beae

Please sign in to comment.