diff --git a/core/src/main/java/org/axonframework/eventsourcing/eventstore/GapAwareTrackingToken.java b/core/src/main/java/org/axonframework/eventsourcing/eventstore/GapAwareTrackingToken.java index 537ffecaf5..c7cb364cf5 100644 --- a/core/src/main/java/org/axonframework/eventsourcing/eventstore/GapAwareTrackingToken.java +++ b/core/src/main/java/org/axonframework/eventsourcing/eventstore/GapAwareTrackingToken.java @@ -22,7 +22,11 @@ import org.axonframework.common.CollectionUtils; import java.io.Serializable; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.stream.LongStream; /** @@ -42,11 +46,6 @@ public class GapAwareTrackingToken implements TrackingToken, Serializable { private final long index; private final SortedSet gaps; - private GapAwareTrackingToken(long index, SortedSet gaps) { - this.index = index; - this.gaps = gaps; - } - /** * Returns a new {@link GapAwareTrackingToken} instance based on the given {@code index} and collection of {@code * gaps}. @@ -63,12 +62,17 @@ public static GapAwareTrackingToken newInstance(@JsonProperty("index") long inde if (gaps.isEmpty()) { return new GapAwareTrackingToken(index, Collections.emptySortedSet()); } - SortedSet gapSet = new TreeSet<>(gaps); + SortedSet gapSet = new ConcurrentSkipListSet<>(gaps); Assert.isTrue(gapSet.last() < index, () -> String.format("Gap indices [%s] should all be smaller than head index [%d]", gaps, index)); return new GapAwareTrackingToken(index, gapSet); } + private GapAwareTrackingToken(long index, SortedSet gaps) { + this.index = index; + this.gaps = gaps; + } + /** * Returns a new {@link GapAwareTrackingToken} instance based on this token but which has advanced to given {@code * index}. Gaps that have fallen behind the index by more than the {@code maxGapOffset} will not be included in the @@ -105,7 +109,7 @@ public GapAwareTrackingToken advanceTo(long index, int maxGapOffset) { */ public GapAwareTrackingToken advanceTo(long index, int maxGapOffset, boolean allowGaps) { long newIndex; - SortedSet gaps = new TreeSet<>(this.gaps); + SortedSet gaps = new ConcurrentSkipListSet<>(this.gaps); if (gaps.remove(index)) { newIndex = this.index; } else if (index > this.index) { @@ -144,7 +148,7 @@ public GapAwareTrackingToken lowerBound(TrackingToken other) { Assert.isTrue(other instanceof GapAwareTrackingToken, () -> "Incompatible token type provided."); GapAwareTrackingToken otherToken = (GapAwareTrackingToken) other; - SortedSet mergedGaps = new TreeSet<>(this.gaps); + SortedSet mergedGaps = new ConcurrentSkipListSet<>(this.gaps); mergedGaps.addAll(otherToken.gaps); long mergedIndex = calculateIndex(otherToken, mergedGaps); mergedGaps.removeIf(i -> i >= mergedIndex); @@ -155,9 +159,9 @@ public GapAwareTrackingToken lowerBound(TrackingToken other) { public TrackingToken upperBound(TrackingToken otherToken) { Assert.isTrue(otherToken instanceof GapAwareTrackingToken, () -> "Incompatible token type provided."); GapAwareTrackingToken other = (GapAwareTrackingToken) otherToken; - SortedSet newGaps = CollectionUtils.intersect(this.gaps, other.gaps, TreeSet::new); + SortedSet newGaps = CollectionUtils.intersect(this.gaps, other.gaps, ConcurrentSkipListSet::new); long min = Math.min(this.index, other.index) + 1; - SortedSet mergedGaps = CollectionUtils.merge(this.gaps.tailSet(min), other.gaps.tailSet(min), TreeSet::new); + SortedSet mergedGaps = CollectionUtils.merge(this.gaps.tailSet(min), other.gaps.tailSet(min), ConcurrentSkipListSet::new); newGaps.addAll(mergedGaps); return new GapAwareTrackingToken(Math.max(this.index, other.index), newGaps); diff --git a/core/src/test/java/org/axonframework/eventsourcing/eventstore/GapAwareTrackingTokenTest.java b/core/src/test/java/org/axonframework/eventsourcing/eventstore/GapAwareTrackingTokenTest.java index 3711a5aaf9..0e6f15c2cf 100644 --- a/core/src/test/java/org/axonframework/eventsourcing/eventstore/GapAwareTrackingTokenTest.java +++ b/core/src/test/java/org/axonframework/eventsourcing/eventstore/GapAwareTrackingTokenTest.java @@ -20,6 +20,11 @@ import java.util.Collections; import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -31,6 +36,33 @@ public class GapAwareTrackingTokenTest { + @Test + public void testGapAwareTokenConcurrency() throws InterruptedException { + AtomicLong counter = new AtomicLong(); + AtomicReference currentToken = new AtomicReference<>(GapAwareTrackingToken.newInstance(-1, emptySortedSet())); + + ExecutorService executorService = Executors.newCachedThreadPool(); + + // we need more threads than available processors, for a high likelihood to trigger this concurrency issue + int threadCount = Runtime.getRuntime().availableProcessors() + 1; + for (int i = 0; i < threadCount; i++) { + executorService.submit(() -> { + long deadline = System.currentTimeMillis() + 1000; + while (System.currentTimeMillis() < deadline) { + long next = counter.getAndIncrement(); + currentToken.getAndUpdate(t -> t.advanceTo(next, Integer.MAX_VALUE, true)); + } + }); + } + executorService.shutdown(); + assertTrue("ExecutorService not stopped within expected reasonable time frame", + executorService.awaitTermination(5, TimeUnit.SECONDS)); + + assertTrue("The test did not seem to have generated any tokens", counter.get() > 0); + assertEquals(counter.get() - 1, currentToken.get().getIndex()); + assertEquals(emptySortedSet(), currentToken.get().getGaps()); + } + @Test public void testAdvanceToWithoutGaps() { GapAwareTrackingToken subject = GapAwareTrackingToken.newInstance(0L, Collections.emptyList()); @@ -100,6 +132,8 @@ public void testTokenCoversOther() { assertFalse(token1.covers(token4)); assertFalse(token2.covers(token4)); assertFalse(token4.covers(token2)); + assertTrue(token4.covers(token5)); + assertFalse(token5.covers(token4)); assertFalse(token1.covers(token5)); assertFalse(token5.covers(token1)); assertFalse(token1.covers(token6));