Skip to content

Commit

Permalink
Fixed concurrency issue in GapAwareTrackingToken
Browse files Browse the repository at this point in the history
While the TreeSet itself was stored in a final field, the TreeSet's
mutations were not guaranteed to be visible to threads that got the
newly created copy.

Resolves #646
  • Loading branch information
abuijze committed Jul 5, 2018
1 parent a83259b commit 8fa8b68
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
Expand Up @@ -22,7 +22,11 @@
import org.axonframework.common.CollectionUtils;

import java.io.Serializable;
import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.LongStream;

/**
Expand All @@ -42,11 +46,6 @@ public class GapAwareTrackingToken implements TrackingToken, Serializable {
private final long index;
private final SortedSet<Long> gaps;

private GapAwareTrackingToken(long index, SortedSet<Long> gaps) {
this.index = index;
this.gaps = gaps;
}

/**
* Returns a new {@link GapAwareTrackingToken} instance based on the given {@code index} and collection of {@code
* gaps}.
Expand All @@ -63,12 +62,17 @@ public static GapAwareTrackingToken newInstance(@JsonProperty("index") long inde
if (gaps.isEmpty()) {
return new GapAwareTrackingToken(index, Collections.emptySortedSet());
}
SortedSet<Long> gapSet = new TreeSet<>(gaps);
SortedSet<Long> gapSet = new ConcurrentSkipListSet<>(gaps);
Assert.isTrue(gapSet.last() < index,
() -> String.format("Gap indices [%s] should all be smaller than head index [%d]", gaps, index));
return new GapAwareTrackingToken(index, gapSet);
}

private GapAwareTrackingToken(long index, SortedSet<Long> gaps) {
this.index = index;
this.gaps = gaps;
}

/**
* Returns a new {@link GapAwareTrackingToken} instance based on this token but which has advanced to given {@code
* index}. Gaps that have fallen behind the index by more than the {@code maxGapOffset} will not be included in the
Expand Down Expand Up @@ -105,7 +109,7 @@ public GapAwareTrackingToken advanceTo(long index, int maxGapOffset) {
*/
public GapAwareTrackingToken advanceTo(long index, int maxGapOffset, boolean allowGaps) {
long newIndex;
SortedSet<Long> gaps = new TreeSet<>(this.gaps);
SortedSet<Long> gaps = new ConcurrentSkipListSet<>(this.gaps);
if (gaps.remove(index)) {
newIndex = this.index;
} else if (index > this.index) {
Expand Down Expand Up @@ -144,7 +148,7 @@ public GapAwareTrackingToken lowerBound(TrackingToken other) {
Assert.isTrue(other instanceof GapAwareTrackingToken, () -> "Incompatible token type provided.");
GapAwareTrackingToken otherToken = (GapAwareTrackingToken) other;

SortedSet<Long> mergedGaps = new TreeSet<>(this.gaps);
SortedSet<Long> mergedGaps = new ConcurrentSkipListSet<>(this.gaps);
mergedGaps.addAll(otherToken.gaps);
long mergedIndex = calculateIndex(otherToken, mergedGaps);
mergedGaps.removeIf(i -> i >= mergedIndex);
Expand All @@ -155,9 +159,9 @@ public GapAwareTrackingToken lowerBound(TrackingToken other) {
public TrackingToken upperBound(TrackingToken otherToken) {
Assert.isTrue(otherToken instanceof GapAwareTrackingToken, () -> "Incompatible token type provided.");
GapAwareTrackingToken other = (GapAwareTrackingToken) otherToken;
SortedSet<Long> newGaps = CollectionUtils.intersect(this.gaps, other.gaps, TreeSet::new);
SortedSet<Long> newGaps = CollectionUtils.intersect(this.gaps, other.gaps, ConcurrentSkipListSet::new);
long min = Math.min(this.index, other.index) + 1;
SortedSet<Long> mergedGaps = CollectionUtils.merge(this.gaps.tailSet(min), other.gaps.tailSet(min), TreeSet::new);
SortedSet<Long> mergedGaps = CollectionUtils.merge(this.gaps.tailSet(min), other.gaps.tailSet(min), ConcurrentSkipListSet::new);
newGaps.addAll(mergedGaps);

return new GapAwareTrackingToken(Math.max(this.index, other.index), newGaps);
Expand Down
Expand Up @@ -20,6 +20,11 @@

import java.util.Collections;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -31,6 +36,33 @@

public class GapAwareTrackingTokenTest {

@Test
public void testGapAwareTokenConcurrency() throws InterruptedException {
AtomicLong counter = new AtomicLong();
AtomicReference<GapAwareTrackingToken> currentToken = new AtomicReference<>(GapAwareTrackingToken.newInstance(-1, emptySortedSet()));

ExecutorService executorService = Executors.newCachedThreadPool();

// we need more threads than available processors, for a high likelihood to trigger this concurrency issue
int threadCount = Runtime.getRuntime().availableProcessors() + 1;
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
long deadline = System.currentTimeMillis() + 1000;
while (System.currentTimeMillis() < deadline) {
long next = counter.getAndIncrement();
currentToken.getAndUpdate(t -> t.advanceTo(next, Integer.MAX_VALUE, true));
}
});
}
executorService.shutdown();
assertTrue("ExecutorService not stopped within expected reasonable time frame",
executorService.awaitTermination(5, TimeUnit.SECONDS));

assertTrue("The test did not seem to have generated any tokens", counter.get() > 0);
assertEquals(counter.get() - 1, currentToken.get().getIndex());
assertEquals(emptySortedSet(), currentToken.get().getGaps());
}

@Test
public void testAdvanceToWithoutGaps() {
GapAwareTrackingToken subject = GapAwareTrackingToken.newInstance(0L, Collections.emptyList());
Expand Down Expand Up @@ -100,6 +132,8 @@ public void testTokenCoversOther() {
assertFalse(token1.covers(token4));
assertFalse(token2.covers(token4));
assertFalse(token4.covers(token2));
assertTrue(token4.covers(token5));
assertFalse(token5.covers(token4));
assertFalse(token1.covers(token5));
assertFalse(token5.covers(token1));
assertFalse(token1.covers(token6));
Expand Down

0 comments on commit 8fa8b68

Please sign in to comment.