Skip to content

Commit

Permalink
Retry to initialize the token store on exception for PSEP.
Browse files Browse the repository at this point in the history
  • Loading branch information
gklijs committed Jul 7, 2022
1 parent c636717 commit 609d63c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ private void initializeTokenStore() {
} catch (Exception e) {
logger.info("Error while initializing the Token Store. " +
"This may simply indicate concurrent attempts to initialize.", e);
initializeTokenStore();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,36 @@ void testStartContinuesWhenTokenInitializationFails() {
assertTrue(testSubject.isRunning());
}

@Test
void testRetriesWhenTokenInitializationInitiallyFails() {
InMemoryTokenStore spy = spy(tokenStore);
setTestSubject(createTestSubject(b -> b.tokenStore(spy)));

doThrow(new RuntimeException("Simulated failure")).doCallRealMethod()
.when(spy)
.initializeTokenSegments(any(), anyInt(), any());

List<EventMessage<Integer>> events = IntStream.range(0, 100)
.mapToObj(GenericEventMessage::new)
.collect(Collectors.toList());
events.forEach(stubMessageSource::publishMessage);
mockEventHandlerInvoker();

testSubject.start();

assertTrue(testSubject.isRunning());

assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(8, testSubject.processingStatus().size()));
assertWithin(2, TimeUnit.SECONDS, () -> {
long nonNullTokens = IntStream.range(0, 8)
.mapToObj(i -> tokenStore.fetchToken(PROCESSOR_NAME, i))
.filter(Objects::nonNull)
.count();
assertEquals(8, nonNullTokens);
});
assertEquals(8, testSubject.processingStatus().size());
}

@Test
void testStartShutsDownImmediatelyIfCoordinatorExecutorThrowsAnException() {
ScheduledExecutorService spiedCoordinatorExecutor = spy(coordinatorExecutor);
Expand Down

0 comments on commit 609d63c

Please sign in to comment.