Skip to content

Commit

Permalink
Move initializing the token store for PSEP to the coordinator.
Browse files Browse the repository at this point in the history
  • Loading branch information
gklijs committed Jul 12, 2022
1 parent b9a22b7 commit 096db9b
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.common;

import java.io.Serializable;

/**
* Exception indicating that an action was retried a maximum times, without a good result.
*
* @author Gerard Klijs
* @since 4.5.13
*/
public class AxonRetriesExhaustedException extends AxonException implements Serializable {

private static final long serialVersionUID = 6920545831431694106L;

/**
* Initializes the exception using the given {@code message}.
*
* @param message The message describing the exception
*/
public AxonRetriesExhaustedException(String message) {
super(message);
}

/**
* Initializes the exception using the given {@code message} and {@code cause}.
*
* @param message The message describing the exception
* @param cause The underlying cause of the exception
*/
public AxonRetriesExhaustedException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.axonframework.common;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;

Expand Down Expand Up @@ -57,10 +58,17 @@ public static void executeWithRetry(Runnable runnable, Predicate<RuntimeExceptio
*
* @param runnable action to execute
* @param retryInterval time to wait between retries of the action
* @param maxTries maximum number of times the action is invoked,
*/
public static void executeUntilTrue(BooleanSupplier runnable, long retryInterval) {
public static void executeUntilTrue(BooleanSupplier runnable, long retryInterval, long maxTries) {
AtomicLong totalTriesCounter = new AtomicLong();
boolean result = runnable.getAsBoolean();
while (!result) {
if (totalTriesCounter.incrementAndGet() >= maxTries){
throw new AxonRetriesExhaustedException(String.format(
"Tried invoking the action for %d times, without the result being true",
maxTries));
}
try {
Thread.sleep(retryInterval);
result = runnable.getAsBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static org.axonframework.common.ProcessUtils.executeUntilTrue;
import static org.axonframework.common.io.IOUtils.closeQuietly;

/**
Expand Down Expand Up @@ -89,13 +91,15 @@ class Coordinator {
private final long claimExtensionThreshold;
private final Clock clock;
private final int maxClaimedSegments;

private final int initialSegmentCount;
private final Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken;
private final Map<Integer, WorkPackage> workPackages = new ConcurrentHashMap<>();
private final AtomicReference<RunState> runState;
private final Map<Integer, Instant> releasesDeadlines = new ConcurrentHashMap<>();
private int errorWaitBackOff = 500;
private final Queue<CoordinatorTask> coordinatorTasks = new ConcurrentLinkedQueue<>();
private final AtomicReference<CoordinationTask> coordinationTask = new AtomicReference<>();
private final AtomicBoolean tokenStoreInitialized = new AtomicBoolean(false);

/**
* Instantiate a Builder to be able to create a {@link Coordinator}. This builder <b>does not</b> validate the
Expand All @@ -121,6 +125,8 @@ private Coordinator(Builder builder) {
this.claimExtensionThreshold = builder.claimExtensionThreshold;
this.clock = builder.clock;
this.maxClaimedSegments = builder.maxClaimedSegments;
this.initialSegmentCount = builder.initialSegmentCount;
this.initialToken = builder.initialToken;
this.runState = new AtomicReference<>(RunState.initial(builder.shutdownAction));
}

Expand Down Expand Up @@ -253,6 +259,23 @@ public CompletableFuture<Boolean> mergeSegment(int segmentId) {
return result;
}

private boolean initializeTokenStore() {
transactionManager.executeInTransaction(() -> {
int[] segments = tokenStore.fetchSegments(name);
try {
if (segments == null || segments.length == 0) {
logger.info("Initializing segments for processor [{}] ({} segments)", name, initialSegmentCount);
tokenStore.initializeTokenSegments(name, initialSegmentCount, initialToken.apply(messageSource));
}
tokenStoreInitialized.set(true);
} catch (Exception e) {
logger.info("Error while initializing the Token Store. " +
"This may simply indicate concurrent attempts to initialize.", e);
}
});
return tokenStoreInitialized.get();
}

/**
* Status holder for this service. Defines whether it is running, has been started (to ensure double {@link
* #start()} invocations do not restart this coordinator) and maintains a shutdown handler to complete
Expand Down Expand Up @@ -353,6 +376,9 @@ static class Builder {
private long claimExtensionThreshold = 5000;
private Clock clock = GenericEventMessage.clock;
private int maxClaimedSegments;
private int initialSegmentCount = 16;
private Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken =
StreamableMessageSource::createTailToken;
private Runnable shutdownAction = () -> {
};

Expand Down Expand Up @@ -510,6 +536,32 @@ Builder maxClaimedSegments(int maxClaimedSegments) {
return this;
}

/**
* Sets the initial segment count used to create segments on start up.
*
* @param initialSegmentCount an {@code int} specifying the initial segment count used to create segments on
* start up
* @return the current Builder instance, for fluent interfacing
*/
public Builder initialSegmentCount(int initialSegmentCount) {
this.initialSegmentCount = initialSegmentCount;
return this;
}

/**
* Specifies the {@link Function} used to generate the initial {@link TrackingToken}s.
*
* @param initialToken a {@link Function} generating the initial {@link TrackingToken} based on a given {@link
* StreamableMessageSource}
* @return the current Builder instance, for fluent interfacing
*/
public Builder initialToken(
Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken
) {
this.initialToken = initialToken;
return this;
}

/**
* Registers an action to perform when the coordinator shuts down. Will override any previously registered
* actions. Defaults to a no-op.
Expand Down Expand Up @@ -594,6 +646,10 @@ public void run() {
return;
}

if (!tokenStoreInitialized.get()) {
executeUntilTrue(Coordinator.this::initializeTokenStore, 100L, 30L);
}

workPackages.entrySet().stream()
.filter(entry -> isSegmentBlockedFromClaim(entry.getKey()))
.map(Map.Entry::getValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public class PooledStreamingEventProcessor extends AbstractEventProcessor implem
private final TransactionManager transactionManager;
private final ScheduledExecutorService workerExecutor;
private final Coordinator coordinator;
private final int initialSegmentCount;
private final Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken;
private final long tokenClaimInterval;
private final int maxClaimedSegments;
Expand Down Expand Up @@ -163,7 +162,6 @@ protected PooledStreamingEventProcessor(PooledStreamingEventProcessor.Builder bu
this.tokenStore = builder.tokenStore;
this.transactionManager = builder.transactionManager;
this.workerExecutor = builder.workerExecutorBuilder.apply(name);
this.initialSegmentCount = builder.initialSegmentCount;
this.initialToken = builder.initialToken;
this.tokenClaimInterval = builder.tokenClaimInterval;
this.maxClaimedSegments = builder.maxClaimedSegments;
Expand All @@ -185,35 +183,18 @@ protected PooledStreamingEventProcessor(PooledStreamingEventProcessor.Builder bu
.claimExtensionThreshold(claimExtensionThreshold)
.clock(clock)
.maxClaimedSegments(maxClaimedSegments)
.initialSegmentCount(builder.initialSegmentCount)
.initialToken(initialToken)
.build();
}

@StartHandler(phase = Phase.INBOUND_EVENT_CONNECTORS)
@Override
public void start() {
logger.info("Starting PooledStreamingEventProcessor [{}].", name);
executeUntilTrue(this::initializeTokenStore, 100L);
coordinator.start();
}

private boolean initializeTokenStore() {
AtomicBoolean result = new AtomicBoolean(true);
transactionManager.executeInTransaction(() -> {
int[] segments = tokenStore.fetchSegments(name);
try {
if (segments == null || segments.length == 0) {
logger.info("Initializing segments for processor [{}] ({} segments)", name, initialSegmentCount);
tokenStore.initializeTokenSegments(name, initialSegmentCount, initialToken.apply(messageSource));
}
} catch (Exception e) {
logger.info("Error while initializing the Token Store. " +
"This may simply indicate concurrent attempts to initialize.", e);
result.set(false);
}
});
return result.get();
}

@Override
public void shutDown() {
shutdownAsync().join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,16 @@ void executeWithRetryImmediatelyStopsOnOther() {
@Test
void executeUntilTrueRetries(){
AtomicLong retryCounter = new AtomicLong();
ProcessUtils.executeUntilTrue(() -> retryCounter.getAndIncrement() >= 1, 10L);
ProcessUtils.executeUntilTrue(() -> retryCounter.getAndIncrement() >= 1, 10L, 10L);
assertEquals(2, retryCounter.get());
}

@Test
void executeUntilTrueThrowsWhenMaxRetriesReached(){
AtomicLong retryCounter = new AtomicLong();
assertThrows(AxonRetriesExhaustedException.class, () ->
ProcessUtils.executeUntilTrue(() -> retryCounter.getAndIncrement() >= 100, 1L, 10L)
);
assertEquals(10, retryCounter.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class CoordinatorTest {
private final Segment SEGMENT_ZERO = computeSegment(0);
private final int SEGMENT_ID = 0;
private final int[] SEGMENT_IDS = {SEGMENT_ID};
private final int[] EMPTY_SEGMENT_IDS = {};

private final TokenStore tokenStore = mock(TokenStore.class);
private final ScheduledThreadPoolExecutor executorService = mock(ScheduledThreadPoolExecutor.class);
Expand Down Expand Up @@ -103,6 +104,26 @@ void testIfCoordinationTaskRescheduledAfterTokenReleaseClaimFails() {

//asserts
verify(executorService, times(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
// should be zero since we mock there already is a segment
verify(tokenStore, times(0)).initializeTokenSegments(anyString(), anyInt(), any(TrackingToken.class));
}

@Test
void testIfCoordinationTaskInitializesTokenStoreWhenNeeded() {
//arrange
final GlobalSequenceTrackingToken token = new GlobalSequenceTrackingToken(0);

doReturn(EMPTY_SEGMENT_IDS).when(tokenStore).fetchSegments(PROCESSOR_NAME);
doReturn(token).when(tokenStore).fetchToken(eq(PROCESSOR_NAME), anyInt());
doReturn(SEGMENT_ZERO).when(workPackage).segment();
doAnswer(runTaskSync()).when(executorService).submit(any(Runnable.class));

//act
testSubject.start();

//asserts
verify(executorService, times(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
verify(tokenStore, times(1)).initializeTokenSegments(anyString(), anyInt(), isNull());
}

@SuppressWarnings("rawtypes") // Mockito cannot deal with the wildcard generics of the TrackedEventMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ void testRetriesWhenTokenInitializationInitiallyFails() {
.collect(Collectors.toList());
events.forEach(stubMessageSource::publishMessage);
mockEventHandlerInvoker();

testSubject.start();

assertTrue(testSubject.isRunning());
Expand Down Expand Up @@ -730,14 +729,14 @@ void testResetTokens() {

// Start and stop the processor to initialize the tracking tokens
testSubject.start();
assertWithin(2, TimeUnit.SECONDS, () -> assertEquals(tokenStore.fetchSegments(PROCESSOR_NAME).length, expectedSegmentCount));
testSubject.shutDown();

testSubject.resetTokens();

verify(stubEventHandler).performReset(null);

int[] segments = tokenStore.fetchSegments(PROCESSOR_NAME);
assertEquals(expectedSegmentCount, segments.length);
// The token stays the same, as the original and token after reset are identical.
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
Expand All @@ -755,14 +754,14 @@ void testResetTokensWithContext() {

// Start and stop the processor to initialize the tracking tokens
testSubject.start();
assertWithin(2, TimeUnit.SECONDS, () -> assertEquals(tokenStore.fetchSegments(PROCESSOR_NAME).length, expectedSegmentCount));
testSubject.shutDown();

testSubject.resetTokens(expectedContext);

verify(stubEventHandler).performReset(expectedContext);

int[] segments = tokenStore.fetchSegments(PROCESSOR_NAME);
assertEquals(expectedSegmentCount, segments.length);
// The token stays the same, as the original and token after reset are identical.
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
Expand All @@ -781,14 +780,14 @@ void testResetTokensFromDefinedPosition() {

// Start and stop the processor to initialize the tracking tokens
testSubject.start();
assertWithin(2, TimeUnit.SECONDS, () -> assertEquals(tokenStore.fetchSegments(PROCESSOR_NAME).length, expectedSegmentCount));
testSubject.shutDown();

testSubject.resetTokens(StreamableMessageSource::createTailToken);

verify(stubEventHandler).performReset(null);

int[] segments = tokenStore.fetchSegments(PROCESSOR_NAME);
assertEquals(expectedSegmentCount, segments.length);
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
}
Expand All @@ -807,14 +806,14 @@ void testResetTokensFromDefinedPositionAndWithResetContext() {

// Start and stop the processor to initialize the tracking tokens
testSubject.start();
assertWithin(2, TimeUnit.SECONDS, () -> assertEquals(tokenStore.fetchSegments(PROCESSOR_NAME).length, expectedSegmentCount));
testSubject.shutDown();

testSubject.resetTokens(StreamableMessageSource::createTailToken, expectedContext);

verify(stubEventHandler).performReset(expectedContext);

int[] segments = tokenStore.fetchSegments(PROCESSOR_NAME);
assertEquals(expectedSegmentCount, segments.length);
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,6 @@ void testConfigurePooledStreamingEventProcessor() {
assertNotNull(pooledProcessor);
assertEquals(PooledStreamingEventProcessor.class, pooledProcessor.getClass());

int resultInitialSegmentCount = ReflectionUtils.getFieldValue(
PooledStreamingEventProcessor.class.getDeclaredField("initialSegmentCount"), pooledProcessor
);
assertEquals(12, resultInitialSegmentCount);

long resultTokenClaimInterval = ReflectionUtils.getFieldValue(
PooledStreamingEventProcessor.class.getDeclaredField("tokenClaimInterval"), pooledProcessor
);
Expand Down

0 comments on commit 096db9b

Please sign in to comment.