Skip to content

Commit

Permalink
Retry to initialize the token store on exception for PSEP.
Browse files Browse the repository at this point in the history
  • Loading branch information
gklijs committed Jul 7, 2022
1 parent c636717 commit b04aca6
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 3 deletions.
20 changes: 20 additions & 0 deletions messaging/src/main/java/org/axonframework/common/ProcessUtils.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.axonframework.common;

import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;

/**
Expand Down Expand Up @@ -49,4 +50,23 @@ public static void executeWithRetry(Runnable runnable, Predicate<RuntimeExceptio

throw lastException;
}

/**
* Executes an action, with potential retry in case the result is false. Exception handling should be taken care of
* within the action if needed.
*
* @param runnable action to execute
* @param retryInterval time to wait between retries of the action
*/
public static void executeUntilTrue(BooleanSupplier runnable, long retryInterval) {
boolean result = runnable.getAsBoolean();
while (!result) {
try {
Thread.sleep(retryInterval);
result = runnable.getAsBoolean();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -63,6 +64,8 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.BuilderUtils.assertStrictPositive;
import static org.axonframework.common.ProcessUtils.executeUntilTrue;
import static org.axonframework.common.ProcessUtils.executeWithRetry;

/**
* A {@link StreamingEventProcessor} implementation which pools it's resources to enhance processing speed. It utilizes
Expand Down Expand Up @@ -190,11 +193,12 @@ protected PooledStreamingEventProcessor(PooledStreamingEventProcessor.Builder bu
@Override
public void start() {
logger.info("Starting PooledStreamingEventProcessor [{}].", name);
initializeTokenStore();
executeUntilTrue(this::initializeTokenStore, 100L);
coordinator.start();
}

private void initializeTokenStore() {
private boolean initializeTokenStore() {
AtomicBoolean result = new AtomicBoolean(true);
transactionManager.executeInTransaction(() -> {
int[] segments = tokenStore.fetchSegments(name);
try {
Expand All @@ -205,8 +209,10 @@ private void initializeTokenStore() {
} catch (Exception e) {
logger.info("Error while initializing the Token Store. " +
"This may simply indicate concurrent attempts to initialize.", e);
result.set(false);
}
});
return result.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,11 @@ void executeWithRetryImmediatelyStopsOnOther() {
assertEquals(1, retryCounter.get());
}
}

@Test
void executeUntilTrueRetries(){
AtomicLong retryCounter = new AtomicLong();
ProcessUtils.executeUntilTrue(() -> retryCounter.getAndIncrement() >= 1, 10L);
assertEquals(2, retryCounter.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,33 @@ void tearDown() {
}

@Test
void testStartContinuesWhenTokenInitializationFails() {
void testRetriesWhenTokenInitializationInitiallyFails() {
InMemoryTokenStore spy = spy(tokenStore);
setTestSubject(createTestSubject(b -> b.tokenStore(spy)));

doThrow(new RuntimeException("Simulated failure")).doCallRealMethod()
.when(spy)
.initializeTokenSegments(any(), anyInt(), any());

List<EventMessage<Integer>> events = IntStream.range(0, 100)
.mapToObj(GenericEventMessage::new)
.collect(Collectors.toList());
events.forEach(stubMessageSource::publishMessage);
mockEventHandlerInvoker();

testSubject.start();

assertTrue(testSubject.isRunning());

assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(8, testSubject.processingStatus().size()));
assertWithin(2, TimeUnit.SECONDS, () -> {
long nonNullTokens = IntStream.range(0, 8)
.mapToObj(i -> tokenStore.fetchToken(PROCESSOR_NAME, i))
.filter(Objects::nonNull)
.count();
assertEquals(8, nonNullTokens);
});
assertEquals(8, testSubject.processingStatus().size());
}

@Test
Expand Down

0 comments on commit b04aca6

Please sign in to comment.