Skip to content

Commit

Permalink
Merge pull request #2277 from AxonFramework/fix_issue_2274
Browse files Browse the repository at this point in the history
Retry to initialize the token store correctly on exception for PSEP.
  • Loading branch information
gklijs committed Jul 12, 2022
2 parents d21a1d9 + 0744e39 commit 7dc550f
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.common;

/**
* Exception indicating that an action was retried a maximum times, without a good result.
*
* @author Gerard Klijs
* @since 4.5.13
*/
public class ProcessRetriesExhaustedException extends AxonException {

private static final long serialVersionUID = 6920545831431694106L;

/**
* Initializes the exception using the given {@code message}.
*
* @param message The message describing the exception
*/
public ProcessRetriesExhaustedException(String message) {
super(message);
}

/**
* Initializes the exception using the given {@code message} and {@code cause}.
*
* @param message The message describing the exception
* @param cause The underlying cause of the exception
*/
public ProcessRetriesExhaustedException(String message, Throwable cause) {
super(message, cause);
}
}
28 changes: 28 additions & 0 deletions messaging/src/main/java/org/axonframework/common/ProcessUtils.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.axonframework.common;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;

/**
Expand Down Expand Up @@ -49,4 +51,30 @@ public static void executeWithRetry(Runnable runnable, Predicate<RuntimeExceptio

throw lastException;
}

/**
* Executes an action, with potential retry in case the result is false. Exception handling should be taken care of
* within the action if needed.
*
* @param runnable action to execute, will be executed till the result is true, or max tries is reached
* @param retryInterval time to wait between retries of the action
* @param maxTries maximum number of times the action is invoked
*/
public static void executeUntilTrue(BooleanSupplier runnable, long retryInterval, long maxTries) {
AtomicLong totalTriesCounter = new AtomicLong();
boolean result = runnable.getAsBoolean();
while (!result) {
if (totalTriesCounter.incrementAndGet() >= maxTries){
throw new ProcessRetriesExhaustedException(String.format(
"Tried invoking the action for %d times, without the result being true",
maxTries));
}
try {
Thread.sleep(retryInterval);
result = runnable.getAsBoolean();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static org.axonframework.common.ProcessUtils.executeUntilTrue;
import static org.axonframework.common.io.IOUtils.closeQuietly;

/**
Expand Down Expand Up @@ -89,6 +91,8 @@ class Coordinator {
private final long claimExtensionThreshold;
private final Clock clock;
private final int maxClaimedSegments;
private final int initialSegmentCount;
private final Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken;

private final Map<Integer, WorkPackage> workPackages = new ConcurrentHashMap<>();
private final AtomicReference<RunState> runState;
Expand Down Expand Up @@ -121,6 +125,8 @@ private Coordinator(Builder builder) {
this.claimExtensionThreshold = builder.claimExtensionThreshold;
this.clock = builder.clock;
this.maxClaimedSegments = builder.maxClaimedSegments;
this.initialSegmentCount = builder.initialSegmentCount;
this.initialToken = builder.initialToken;
this.runState = new AtomicReference<>(RunState.initial(builder.shutdownAction));
}

Expand All @@ -133,6 +139,7 @@ public void start() {
if (newState.wasStarted()) {
logger.debug("Starting Coordinator for Processor [{}].", name);
try {
executeUntilTrue(Coordinator.this::initializeTokenStore, 100L, 30L);
CoordinationTask task = new CoordinationTask();
executorService.submit(task);
this.coordinationTask.set(task);
Expand Down Expand Up @@ -253,6 +260,24 @@ public CompletableFuture<Boolean> mergeSegment(int segmentId) {
return result;
}

private boolean initializeTokenStore() {
AtomicBoolean tokenStoreInitialized = new AtomicBoolean(false);
transactionManager.executeInTransaction(() -> {
int[] segments = tokenStore.fetchSegments(name);
try {
if (segments == null || segments.length == 0) {
logger.info("Initializing segments for processor [{}] ({} segments)", name, initialSegmentCount);
tokenStore.initializeTokenSegments(name, initialSegmentCount, initialToken.apply(messageSource));
}
tokenStoreInitialized.set(true);
} catch (Exception e) {
logger.info("Error while initializing the Token Store. " +
"This may simply indicate concurrent attempts to initialize.", e);
}
});
return tokenStoreInitialized.get();
}

/**
* Status holder for this service. Defines whether it is running, has been started (to ensure double {@link
* #start()} invocations do not restart this coordinator) and maintains a shutdown handler to complete
Expand Down Expand Up @@ -353,6 +378,9 @@ static class Builder {
private long claimExtensionThreshold = 5000;
private Clock clock = GenericEventMessage.clock;
private int maxClaimedSegments;
private int initialSegmentCount = 16;
private Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken =
StreamableMessageSource::createTailToken;
private Runnable shutdownAction = () -> {
};

Expand Down Expand Up @@ -510,6 +538,33 @@ Builder maxClaimedSegments(int maxClaimedSegments) {
return this;
}

/**
* Sets the initial segment count used to create segments on start up. Defaults to 16.
*
* @param initialSegmentCount an {@code int} specifying the initial segment count used to create segments on
* start up
* @return the current Builder instance, for fluent interfacing
*/
public Builder initialSegmentCount(int initialSegmentCount) {
this.initialSegmentCount = initialSegmentCount;
return this;
}

/**
* Specifies the {@link Function} used to generate the initial {@link TrackingToken}s. Defaults to
* {@link StreamableMessageSource::createTailToken}
*
* @param initialToken a {@link Function} generating the initial {@link TrackingToken} based on a given
* {@link StreamableMessageSource}
* @return the current Builder instance, for fluent interfacing
*/
public Builder initialToken(
Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken
) {
this.initialToken = initialToken;
return this;
}

/**
* Registers an action to perform when the coordinator shuts down. Will override any previously registered
* actions. Defaults to a no-op.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public class PooledStreamingEventProcessor extends AbstractEventProcessor implem
private final TransactionManager transactionManager;
private final ScheduledExecutorService workerExecutor;
private final Coordinator coordinator;
private final int initialSegmentCount;
private final Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken;
private final long tokenClaimInterval;
private final int maxClaimedSegments;
Expand Down Expand Up @@ -161,7 +160,6 @@ protected PooledStreamingEventProcessor(PooledStreamingEventProcessor.Builder bu
this.tokenStore = builder.tokenStore;
this.transactionManager = builder.transactionManager;
this.workerExecutor = builder.workerExecutorBuilder.apply(name);
this.initialSegmentCount = builder.initialSegmentCount;
this.initialToken = builder.initialToken;
this.tokenClaimInterval = builder.tokenClaimInterval;
this.maxClaimedSegments = builder.maxClaimedSegments;
Expand All @@ -183,32 +181,18 @@ protected PooledStreamingEventProcessor(PooledStreamingEventProcessor.Builder bu
.claimExtensionThreshold(claimExtensionThreshold)
.clock(clock)
.maxClaimedSegments(maxClaimedSegments)
.initialSegmentCount(builder.initialSegmentCount)
.initialToken(initialToken)
.build();
}

@StartHandler(phase = Phase.INBOUND_EVENT_CONNECTORS)
@Override
public void start() {
logger.info("Starting PooledStreamingEventProcessor [{}].", name);
initializeTokenStore();
coordinator.start();
}

private void initializeTokenStore() {
transactionManager.executeInTransaction(() -> {
int[] segments = tokenStore.fetchSegments(name);
try {
if (segments == null || segments.length == 0) {
logger.info("Initializing segments for processor [{}] ({} segments)", name, initialSegmentCount);
tokenStore.initializeTokenSegments(name, initialSegmentCount, initialToken.apply(messageSource));
}
} catch (Exception e) {
logger.info("Error while initializing the Token Store. " +
"This may simply indicate concurrent attempts to initialize.", e);
}
});
}

@Override
public void shutDown() {
shutdownAsync().join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,20 @@ void executeWithRetryImmediatelyStopsOnOther() {
assertEquals(1, retryCounter.get());
}
}

@Test
void executeUntilTrueRetries(){
AtomicLong retryCounter = new AtomicLong();
ProcessUtils.executeUntilTrue(() -> retryCounter.getAndIncrement() >= 1, 10L, 10L);
assertEquals(2, retryCounter.get());
}

@Test
void executeUntilTrueThrowsWhenMaxRetriesReached(){
AtomicLong retryCounter = new AtomicLong();
assertThrows(ProcessRetriesExhaustedException.class, () ->
ProcessUtils.executeUntilTrue(() -> retryCounter.getAndIncrement() >= 100, 1L, 10L)
);
assertEquals(10, retryCounter.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class CoordinatorTest {
private final Segment SEGMENT_ZERO = computeSegment(0);
private final int SEGMENT_ID = 0;
private final int[] SEGMENT_IDS = {SEGMENT_ID};
private final int[] EMPTY_SEGMENT_IDS = {};

private final TokenStore tokenStore = mock(TokenStore.class);
private final ScheduledThreadPoolExecutor executorService = mock(ScheduledThreadPoolExecutor.class);
Expand Down Expand Up @@ -103,6 +104,26 @@ void testIfCoordinationTaskRescheduledAfterTokenReleaseClaimFails() {

//asserts
verify(executorService, times(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
// should be zero since we mock there already is a segment
verify(tokenStore, times(0)).initializeTokenSegments(anyString(), anyInt(), any(TrackingToken.class));
}

@Test
void testIfCoordinationTaskInitializesTokenStoreWhenNeeded() {
//arrange
final GlobalSequenceTrackingToken token = new GlobalSequenceTrackingToken(0);

doReturn(EMPTY_SEGMENT_IDS).when(tokenStore).fetchSegments(PROCESSOR_NAME);
doReturn(token).when(tokenStore).fetchToken(eq(PROCESSOR_NAME), anyInt());
doReturn(SEGMENT_ZERO).when(workPackage).segment();
doAnswer(runTaskSync()).when(executorService).submit(any(Runnable.class));

//act
testSubject.start();

//asserts
verify(executorService, times(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
verify(tokenStore, times(1)).initializeTokenSegments(anyString(), anyInt(), isNull());
}

@SuppressWarnings("rawtypes") // Mockito cannot deal with the wildcard generics of the TrackedEventMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,32 @@ void tearDown() {
}

@Test
void testStartContinuesWhenTokenInitializationFails() {
void testRetriesWhenTokenInitializationInitiallyFails() {
InMemoryTokenStore spy = spy(tokenStore);
setTestSubject(createTestSubject(b -> b.tokenStore(spy)));

doThrow(new RuntimeException("Simulated failure")).doCallRealMethod()
.when(spy)
.initializeTokenSegments(any(), anyInt(), any());

List<EventMessage<Integer>> events = IntStream.range(0, 100)
.mapToObj(GenericEventMessage::new)
.collect(Collectors.toList());
events.forEach(stubMessageSource::publishMessage);
mockEventHandlerInvoker();
testSubject.start();

assertTrue(testSubject.isRunning());

assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(8, testSubject.processingStatus().size()));
assertWithin(2, TimeUnit.SECONDS, () -> {
long nonNullTokens = IntStream.range(0, 8)
.mapToObj(i -> tokenStore.fetchToken(PROCESSOR_NAME, i))
.filter(Objects::nonNull)
.count();
assertEquals(8, nonNullTokens);
});
assertEquals(8, testSubject.processingStatus().size());
}

@Test
Expand Down Expand Up @@ -714,14 +729,14 @@ void testResetTokens() {

// Start and stop the processor to initialize the tracking tokens
testSubject.start();
assertWithin(2, TimeUnit.SECONDS, () -> assertEquals(tokenStore.fetchSegments(PROCESSOR_NAME).length, expectedSegmentCount));
testSubject.shutDown();

testSubject.resetTokens();

verify(stubEventHandler).performReset(null);

int[] segments = tokenStore.fetchSegments(PROCESSOR_NAME);
assertEquals(expectedSegmentCount, segments.length);
// The token stays the same, as the original and token after reset are identical.
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
Expand All @@ -739,14 +754,14 @@ void testResetTokensWithContext() {

// Start and stop the processor to initialize the tracking tokens
testSubject.start();
assertWithin(2, TimeUnit.SECONDS, () -> assertEquals(tokenStore.fetchSegments(PROCESSOR_NAME).length, expectedSegmentCount));
testSubject.shutDown();

testSubject.resetTokens(expectedContext);

verify(stubEventHandler).performReset(expectedContext);

int[] segments = tokenStore.fetchSegments(PROCESSOR_NAME);
assertEquals(expectedSegmentCount, segments.length);
// The token stays the same, as the original and token after reset are identical.
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
Expand All @@ -765,14 +780,14 @@ void testResetTokensFromDefinedPosition() {

// Start and stop the processor to initialize the tracking tokens
testSubject.start();
assertWithin(2, TimeUnit.SECONDS, () -> assertEquals(tokenStore.fetchSegments(PROCESSOR_NAME).length, expectedSegmentCount));
testSubject.shutDown();

testSubject.resetTokens(StreamableMessageSource::createTailToken);

verify(stubEventHandler).performReset(null);

int[] segments = tokenStore.fetchSegments(PROCESSOR_NAME);
assertEquals(expectedSegmentCount, segments.length);
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
}
Expand All @@ -791,14 +806,14 @@ void testResetTokensFromDefinedPositionAndWithResetContext() {

// Start and stop the processor to initialize the tracking tokens
testSubject.start();
assertWithin(2, TimeUnit.SECONDS, () -> assertEquals(tokenStore.fetchSegments(PROCESSOR_NAME).length, expectedSegmentCount));
testSubject.shutDown();

testSubject.resetTokens(StreamableMessageSource::createTailToken, expectedContext);

verify(stubEventHandler).performReset(expectedContext);

int[] segments = tokenStore.fetchSegments(PROCESSOR_NAME);
assertEquals(expectedSegmentCount, segments.length);
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
}
Expand Down

0 comments on commit 7dc550f

Please sign in to comment.