Skip to content

Commit

Permalink
Ensure threads are interrupted after wait timeout
Browse files Browse the repository at this point in the history
Trigger an interrupt() when a thread fails to complete within the configured timeout.

(cherry picked from commit 696d8f5)
  • Loading branch information
abuijze authored and smcvb committed Jul 14, 2022
1 parent 6445c1d commit 8987e73
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1917,29 +1917,30 @@ void testShutdownTerminatesWorkersAfterConfiguredWorkerTerminationTimeout() thro
int numberOfEvents = 5;
List<String> handled = new CopyOnWriteArrayList<>();
int testWorkerTerminationTimeout = 50;

List<Thread> createdThreads = new CopyOnWriteArrayList<>();
// A higher event availability timeout will block a worker thread that should shut down
initProcessor(TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
initProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2)
.andInitialSegmentsCount(2)
.andBatchSize(100)
.andThreadFactory(n -> r -> {
Thread thread = new Thread(r, n);
createdThreads.add(thread);
return thread;
})
.andWorkerTerminationTimeout(testWorkerTerminationTimeout)
.andEventAvailabilityTimeout(20, TimeUnit.SECONDS));

doAnswer(i -> {
EventMessage<?> event = i.getArgument(0);
handled.add(event.getIdentifier());
return null;
}).when(mockHandler).handle(any());

// ensure some events have been handled by the TEP
eventBus.publish(createEvents(numberOfEvents));
testSubject.start();
assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(numberOfEvents, handled.size()));

// sleep a little to reach the EventAvailabilityTimeout usage on the Event Stream
Thread.sleep(500);

assertEquals(2, createdThreads.size());

CompletableFuture<Void> result = testSubject.shutdownAsync();
assertWithin(testWorkerTerminationTimeout * 2, TimeUnit.MILLISECONDS, () -> assertTrue(result.isDone()));
assertFalse(createdThreads.get(0).isAlive());
assertFalse(createdThreads.get(1).isAlive());
}

private void waitForStatus(String description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,10 @@ private void processBatch(Segment segment, BlockingStream<TrackedEventMessage<?>
}
checkSegmentCaughtUp(segment, eventStream);
} catch (InterruptedException e) {
logger.error(String.format("Event processor [%s] was interrupted. Shutting down.", getName()), e);
this.shutDown();
if (isRunning()) {
logger.error(String.format("Event processor [%s] was interrupted. Shutting down.", getName()), e);
setShutdownState();
}
Thread.currentThread().interrupt();
}
}
Expand Down Expand Up @@ -689,7 +691,12 @@ private CompletableFuture<Void> awaitTermination() {
.stream()
.map(worker -> CompletableFuture.runAsync(() -> {
try {
worker.getValue().join(workerTerminationTimeout);
Thread workerThread = worker.getValue();
workerThread.join(workerTerminationTimeout);
if (workerThread.isAlive()) {
workerThread.interrupt();
workerThread.join(workerTerminationTimeout);
}
} catch (InterruptedException e) {
logger.info(
"Thread was interrupted waiting for TrackingProcessor Worker '{}' shutdown.",
Expand Down

0 comments on commit 8987e73

Please sign in to comment.