Skip to content

Commit

Permalink
Merge pull request #2041 from AxonFramework/enhancement/tep-worker-in…
Browse files Browse the repository at this point in the history
…terruption

Introduce mechanism to interrupt `TrackingEventProcessor` worker threads
  • Loading branch information
smcvb committed Dec 21, 2021
2 parents e8b2d38 + 696d8f5 commit 07b88d2
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1906,6 +1906,37 @@ void testIsReplayingWhenNotCaughtUp() throws Exception {
);
}

@Test
void testShutdownTerminatesWorkersAfterConfiguredWorkerTerminationTimeout() throws Exception {
int numberOfEvents = 5;
List<String> handled = new CopyOnWriteArrayList<>();
int testWorkerTerminationTimeout = 50;
List<Thread> createdThreads = new CopyOnWriteArrayList<>();
// A higher event availability timeout will block a worker thread that should shut down
initProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2)
.andInitialSegmentsCount(2)
.andBatchSize(100)
.andThreadFactory(n -> r -> {
Thread thread = new Thread(r, n);
createdThreads.add(thread);
return thread;
})
.andWorkerTerminationTimeout(testWorkerTerminationTimeout)
.andEventAvailabilityTimeout(20, TimeUnit.SECONDS));

testSubject.start();

// sleep a little to reach the EventAvailabilityTimeout usage on the Event Stream
Thread.sleep(500);

assertEquals(2, createdThreads.size());

CompletableFuture<Void> result = testSubject.shutdownAsync();
assertWithin(testWorkerTerminationTimeout * 2, TimeUnit.MILLISECONDS, () -> assertTrue(result.isDone()));
assertFalse(createdThreads.get(0).isAlive());
assertFalse(createdThreads.get(1).isAlive());
}

private void waitForStatus(String description,
long time,
TimeUnit unit,
Expand Down

0 comments on commit 07b88d2

Please sign in to comment.