Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry to initialize the token store correctly on exception for PSEP. #2277

Merged
merged 2 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.common;

/**
* Exception indicating that an action was retried a maximum times, without a good result.
*
* @author Gerard Klijs
* @since 4.5.13
*/
public class ProcessRetriesExhaustedException extends AxonException {

private static final long serialVersionUID = 6920545831431694106L;

/**
* Initializes the exception using the given {@code message}.
*
* @param message The message describing the exception
*/
public ProcessRetriesExhaustedException(String message) {
super(message);
}

/**
* Initializes the exception using the given {@code message} and {@code cause}.
*
* @param message The message describing the exception
* @param cause The underlying cause of the exception
*/
public ProcessRetriesExhaustedException(String message, Throwable cause) {
super(message, cause);
}
}
28 changes: 28 additions & 0 deletions messaging/src/main/java/org/axonframework/common/ProcessUtils.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.axonframework.common;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;

/**
Expand Down Expand Up @@ -49,4 +51,30 @@ public static void executeWithRetry(Runnable runnable, Predicate<RuntimeExceptio

throw lastException;
}

/**
* Executes an action, with potential retry in case the result is false. Exception handling should be taken care of
* within the action if needed.
*
* @param runnable action to execute, will be executed till the result is true, or max tries is reached
* @param retryInterval time to wait between retries of the action
* @param maxTries maximum number of times the action is invoked
*/
public static void executeUntilTrue(BooleanSupplier runnable, long retryInterval, long maxTries) {
AtomicLong totalTriesCounter = new AtomicLong();
boolean result = runnable.getAsBoolean();
while (!result) {
if (totalTriesCounter.incrementAndGet() >= maxTries){
throw new ProcessRetriesExhaustedException(String.format(
"Tried invoking the action for %d times, without the result being true",
maxTries));
}
try {
Thread.sleep(retryInterval);
result = runnable.getAsBoolean();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static org.axonframework.common.ProcessUtils.executeUntilTrue;
import static org.axonframework.common.io.IOUtils.closeQuietly;

/**
Expand Down Expand Up @@ -89,6 +91,8 @@ class Coordinator {
private final long claimExtensionThreshold;
private final Clock clock;
private final int maxClaimedSegments;
private final int initialSegmentCount;
private final Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken;
gklijs marked this conversation as resolved.
Show resolved Hide resolved

private final Map<Integer, WorkPackage> workPackages = new ConcurrentHashMap<>();
private final AtomicReference<RunState> runState;
Expand Down Expand Up @@ -121,6 +125,8 @@ private Coordinator(Builder builder) {
this.claimExtensionThreshold = builder.claimExtensionThreshold;
this.clock = builder.clock;
this.maxClaimedSegments = builder.maxClaimedSegments;
this.initialSegmentCount = builder.initialSegmentCount;
this.initialToken = builder.initialToken;
this.runState = new AtomicReference<>(RunState.initial(builder.shutdownAction));
}

Expand All @@ -133,6 +139,7 @@ public void start() {
if (newState.wasStarted()) {
logger.debug("Starting Coordinator for Processor [{}].", name);
try {
executeUntilTrue(Coordinator.this::initializeTokenStore, 100L, 30L);
CoordinationTask task = new CoordinationTask();
executorService.submit(task);
this.coordinationTask.set(task);
Expand Down Expand Up @@ -253,6 +260,24 @@ public CompletableFuture<Boolean> mergeSegment(int segmentId) {
return result;
}

private boolean initializeTokenStore() {
AtomicBoolean tokenStoreInitialized = new AtomicBoolean(false);
transactionManager.executeInTransaction(() -> {
int[] segments = tokenStore.fetchSegments(name);
try {
if (segments == null || segments.length == 0) {
logger.info("Initializing segments for processor [{}] ({} segments)", name, initialSegmentCount);
tokenStore.initializeTokenSegments(name, initialSegmentCount, initialToken.apply(messageSource));
}
tokenStoreInitialized.set(true);
} catch (Exception e) {
logger.info("Error while initializing the Token Store. " +
"This may simply indicate concurrent attempts to initialize.", e);
}
});
return tokenStoreInitialized.get();
}

/**
* Status holder for this service. Defines whether it is running, has been started (to ensure double {@link
* #start()} invocations do not restart this coordinator) and maintains a shutdown handler to complete
Expand Down Expand Up @@ -353,6 +378,9 @@ static class Builder {
private long claimExtensionThreshold = 5000;
private Clock clock = GenericEventMessage.clock;
private int maxClaimedSegments;
private int initialSegmentCount = 16;
private Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken =
StreamableMessageSource::createTailToken;
private Runnable shutdownAction = () -> {
};

Expand Down Expand Up @@ -510,6 +538,33 @@ Builder maxClaimedSegments(int maxClaimedSegments) {
return this;
}

/**
* Sets the initial segment count used to create segments on start up. Defaults to 16.
*
* @param initialSegmentCount an {@code int} specifying the initial segment count used to create segments on
* start up
* @return the current Builder instance, for fluent interfacing
*/
public Builder initialSegmentCount(int initialSegmentCount) {
this.initialSegmentCount = initialSegmentCount;
return this;
}

/**
* Specifies the {@link Function} used to generate the initial {@link TrackingToken}s. Defaults to
* {@link StreamableMessageSource::createTailToken}
*
* @param initialToken a {@link Function} generating the initial {@link TrackingToken} based on a given
* {@link StreamableMessageSource}
* @return the current Builder instance, for fluent interfacing
*/
public Builder initialToken(
Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken
) {
this.initialToken = initialToken;
return this;
}

/**
* Registers an action to perform when the coordinator shuts down. Will override any previously registered
* actions. Defaults to a no-op.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public class PooledStreamingEventProcessor extends AbstractEventProcessor implem
private final TransactionManager transactionManager;
private final ScheduledExecutorService workerExecutor;
private final Coordinator coordinator;
private final int initialSegmentCount;
private final Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken;
private final long tokenClaimInterval;
private final int maxClaimedSegments;
Expand Down Expand Up @@ -161,7 +160,6 @@ protected PooledStreamingEventProcessor(PooledStreamingEventProcessor.Builder bu
this.tokenStore = builder.tokenStore;
this.transactionManager = builder.transactionManager;
this.workerExecutor = builder.workerExecutorBuilder.apply(name);
this.initialSegmentCount = builder.initialSegmentCount;
this.initialToken = builder.initialToken;
this.tokenClaimInterval = builder.tokenClaimInterval;
this.maxClaimedSegments = builder.maxClaimedSegments;
Expand All @@ -183,32 +181,18 @@ protected PooledStreamingEventProcessor(PooledStreamingEventProcessor.Builder bu
.claimExtensionThreshold(claimExtensionThreshold)
.clock(clock)
.maxClaimedSegments(maxClaimedSegments)
.initialSegmentCount(builder.initialSegmentCount)
.initialToken(initialToken)
.build();
}

@StartHandler(phase = Phase.INBOUND_EVENT_CONNECTORS)
@Override
public void start() {
logger.info("Starting PooledStreamingEventProcessor [{}].", name);
initializeTokenStore();
coordinator.start();
}

private void initializeTokenStore() {
transactionManager.executeInTransaction(() -> {
int[] segments = tokenStore.fetchSegments(name);
try {
if (segments == null || segments.length == 0) {
logger.info("Initializing segments for processor [{}] ({} segments)", name, initialSegmentCount);
tokenStore.initializeTokenSegments(name, initialSegmentCount, initialToken.apply(messageSource));
}
} catch (Exception e) {
logger.info("Error while initializing the Token Store. " +
"This may simply indicate concurrent attempts to initialize.", e);
}
});
}

@Override
public void shutDown() {
shutdownAsync().join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,20 @@ void executeWithRetryImmediatelyStopsOnOther() {
assertEquals(1, retryCounter.get());
}
}

@Test
void executeUntilTrueRetries(){
AtomicLong retryCounter = new AtomicLong();
ProcessUtils.executeUntilTrue(() -> retryCounter.getAndIncrement() >= 1, 10L, 10L);
assertEquals(2, retryCounter.get());
}

@Test
void executeUntilTrueThrowsWhenMaxRetriesReached(){
AtomicLong retryCounter = new AtomicLong();
assertThrows(ProcessRetriesExhaustedException.class, () ->
ProcessUtils.executeUntilTrue(() -> retryCounter.getAndIncrement() >= 100, 1L, 10L)
);
assertEquals(10, retryCounter.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class CoordinatorTest {
private final Segment SEGMENT_ZERO = computeSegment(0);
private final int SEGMENT_ID = 0;
private final int[] SEGMENT_IDS = {SEGMENT_ID};
private final int[] EMPTY_SEGMENT_IDS = {};

private final TokenStore tokenStore = mock(TokenStore.class);
private final ScheduledThreadPoolExecutor executorService = mock(ScheduledThreadPoolExecutor.class);
Expand Down Expand Up @@ -103,6 +104,26 @@ void testIfCoordinationTaskRescheduledAfterTokenReleaseClaimFails() {

//asserts
verify(executorService, times(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
// should be zero since we mock there already is a segment
verify(tokenStore, times(0)).initializeTokenSegments(anyString(), anyInt(), any(TrackingToken.class));
}

@Test
void testIfCoordinationTaskInitializesTokenStoreWhenNeeded() {
//arrange
final GlobalSequenceTrackingToken token = new GlobalSequenceTrackingToken(0);

doReturn(EMPTY_SEGMENT_IDS).when(tokenStore).fetchSegments(PROCESSOR_NAME);
doReturn(token).when(tokenStore).fetchToken(eq(PROCESSOR_NAME), anyInt());
doReturn(SEGMENT_ZERO).when(workPackage).segment();
doAnswer(runTaskSync()).when(executorService).submit(any(Runnable.class));

//act
testSubject.start();

//asserts
verify(executorService, times(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
verify(tokenStore, times(1)).initializeTokenSegments(anyString(), anyInt(), isNull());
}

@SuppressWarnings("rawtypes") // Mockito cannot deal with the wildcard generics of the TrackedEventMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,32 @@ void tearDown() {
}

@Test
void testStartContinuesWhenTokenInitializationFails() {
void testRetriesWhenTokenInitializationInitiallyFails() {
gklijs marked this conversation as resolved.
Show resolved Hide resolved
InMemoryTokenStore spy = spy(tokenStore);
setTestSubject(createTestSubject(b -> b.tokenStore(spy)));

doThrow(new RuntimeException("Simulated failure")).doCallRealMethod()
.when(spy)
.initializeTokenSegments(any(), anyInt(), any());

List<EventMessage<Integer>> events = IntStream.range(0, 100)
.mapToObj(GenericEventMessage::new)
.collect(Collectors.toList());
events.forEach(stubMessageSource::publishMessage);
mockEventHandlerInvoker();
testSubject.start();

assertTrue(testSubject.isRunning());

assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(8, testSubject.processingStatus().size()));
assertWithin(2, TimeUnit.SECONDS, () -> {
long nonNullTokens = IntStream.range(0, 8)
.mapToObj(i -> tokenStore.fetchToken(PROCESSOR_NAME, i))
.filter(Objects::nonNull)
.count();
assertEquals(8, nonNullTokens);
});
assertEquals(8, testSubject.processingStatus().size());
}

@Test
Expand Down Expand Up @@ -714,14 +729,14 @@ void testResetTokens() {

// Start and stop the processor to initialize the tracking tokens
testSubject.start();
assertWithin(2, TimeUnit.SECONDS, () -> assertEquals(tokenStore.fetchSegments(PROCESSOR_NAME).length, expectedSegmentCount));
testSubject.shutDown();

testSubject.resetTokens();

verify(stubEventHandler).performReset(null);

int[] segments = tokenStore.fetchSegments(PROCESSOR_NAME);
assertEquals(expectedSegmentCount, segments.length);
// The token stays the same, as the original and token after reset are identical.
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
Expand All @@ -739,14 +754,14 @@ void testResetTokensWithContext() {

// Start and stop the processor to initialize the tracking tokens
testSubject.start();
assertWithin(2, TimeUnit.SECONDS, () -> assertEquals(tokenStore.fetchSegments(PROCESSOR_NAME).length, expectedSegmentCount));
testSubject.shutDown();

testSubject.resetTokens(expectedContext);

verify(stubEventHandler).performReset(expectedContext);

int[] segments = tokenStore.fetchSegments(PROCESSOR_NAME);
assertEquals(expectedSegmentCount, segments.length);
// The token stays the same, as the original and token after reset are identical.
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
Expand All @@ -765,14 +780,14 @@ void testResetTokensFromDefinedPosition() {

// Start and stop the processor to initialize the tracking tokens
testSubject.start();
assertWithin(2, TimeUnit.SECONDS, () -> assertEquals(tokenStore.fetchSegments(PROCESSOR_NAME).length, expectedSegmentCount));
testSubject.shutDown();

testSubject.resetTokens(StreamableMessageSource::createTailToken);

verify(stubEventHandler).performReset(null);

int[] segments = tokenStore.fetchSegments(PROCESSOR_NAME);
assertEquals(expectedSegmentCount, segments.length);
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
}
Expand All @@ -791,14 +806,14 @@ void testResetTokensFromDefinedPositionAndWithResetContext() {

// Start and stop the processor to initialize the tracking tokens
testSubject.start();
assertWithin(2, TimeUnit.SECONDS, () -> assertEquals(tokenStore.fetchSegments(PROCESSOR_NAME).length, expectedSegmentCount));
testSubject.shutDown();

testSubject.resetTokens(StreamableMessageSource::createTailToken, expectedContext);

verify(stubEventHandler).performReset(expectedContext);

int[] segments = tokenStore.fetchSegments(PROCESSOR_NAME);
assertEquals(expectedSegmentCount, segments.length);
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
assertEquals(expectedToken, tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
}
Expand Down