From 3c6f0f3377abb35ed5b04fdcdb599705bc0af162 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Wed, 22 Nov 2017 17:52:35 +0100 Subject: [PATCH] [FLINK-5465] [streaming] Wait for pending timer threads to finish or to exceed a time limit in exceptional stream task shutdown --- .../runtime/tasks/ProcessingTimeService.java | 12 ++++ .../streaming/runtime/tasks/StreamTask.java | 15 ++++- .../tasks/SystemProcessingTimeService.java | 6 ++ .../tasks/TestProcessingTimeService.java | 6 ++ .../SystemProcessingTimeServiceTest.java | 65 +++++++++++++++++++ 5 files changed, 102 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java index b2382529e1c12..a4586d435477e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * Defines the current processing time and handles all related actions, @@ -93,4 +94,15 @@ public abstract class ProcessingTimeService { * will result in a hard exception. */ public abstract void shutdownService(); + + /** + * Shuts down and clean up the timer service provider hard and immediately. This does wait + * for all timer to complete or until the time limit is exceeded. Any call to + * {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method. + * @param time time to wait for termination. + * @param timeUnit time unit of parameter time. + * @return {@code true} if this timer service and all pending timers are terminated and + * {@code false} if the timeout elapsed before this happened. + */ + public abstract boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 36e67485650da..3bc5a681e5dc4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -69,6 +69,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -122,6 +123,8 @@ public abstract class StreamTask> /** The logger used by the StreamTask and its subclasses. */ private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); + private static final long TIMER_SERVICE_TERMINATION_AWAIT_MS = 7500L; + // ------------------------------------------------------------------------ /** @@ -319,9 +322,17 @@ public final void invoke() throws Exception { isRunning = false; // stop all timers and threads - if (timerService != null) { + if (timerService != null && !timerService.isTerminated()) { try { - timerService.shutdownService(); + + // wait for a reasonable time for all pending timer threads to finish + boolean timerShutdownComplete = + timerService.shutdownAndAwaitPending(TIMER_SERVICE_TERMINATION_AWAIT_MS, TimeUnit.MILLISECONDS); + + if (!timerShutdownComplete) { + LOG.warn("Timer service shutdown exceeded time limit while waiting for pending timers. " + + "Will continue with shutdown procedure."); + } } catch (Throwable t) { // catch and log the exception to not replace the original exception diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java index 71bfdf6f7d5f6..be8b23c7ee861 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java @@ -191,6 +191,12 @@ public void shutdownService() { } } + @Override + public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException { + shutdownService(); + return timerService.awaitTermination(time, timeUnit); + } + // safety net to destroy the thread pool @Override protected void finalize() throws Throwable { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index 080eeb55ddc1d..2081f193cfcb1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -134,6 +134,12 @@ public void shutdownService() { this.isTerminated = true; } + @Override + public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException { + shutdownService(); + return true; + } + public int getNumActiveTimers() { int count = 0; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index 4c105d3c903a1..01fd7789b4682 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -22,11 +22,13 @@ import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -442,4 +444,67 @@ public void onProcessingTime(long timestamp) throws Exception { latch.await(); assertTrue(exceptionWasThrown.get()); } + + @Test + public void testShutdownAndWaitPending() { + + final Object lock = new Object(); + final OneShotLatch waitUntilTimerStarted = new OneShotLatch(); + final OneShotLatch blockUntilTerminationInterrupts = new OneShotLatch(); + final OneShotLatch blockUntilTriggered = new OneShotLatch(); + final AtomicBoolean check = new AtomicBoolean(true); + + final SystemProcessingTimeService timeService = new SystemProcessingTimeService( + (message, exception) -> { + }, + lock); + + timeService.scheduleAtFixedRate( + timestamp -> { + + waitUntilTimerStarted.trigger(); + + try { + blockUntilTerminationInterrupts.await(); + check.set(false); + } catch (InterruptedException ignore) { + } + + try { + blockUntilTriggered.await(); + } catch (InterruptedException ignore) { + check.set(false); + } + }, + 0L, + 10L); + + try { + waitUntilTimerStarted.await(); + } catch (InterruptedException e) { + Assert.fail(); + } + + Assert.assertFalse(timeService.isTerminated()); + + // Check that we wait for the timer to terminate. As the timer blocks on the second latch, this should time out. + try { + Assert.assertFalse(timeService.shutdownAndAwaitPending(1, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Assert.fail("Unexpected interruption."); + } + + // Let the timer proceed. + blockUntilTriggered.trigger(); + + // Now we should succeed in terminating the timer. + try { + Assert.assertTrue(timeService.shutdownAndAwaitPending(60, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Assert.fail("Unexpected interruption."); + } + + Assert.assertTrue(check.get()); + Assert.assertTrue(timeService.isTerminated()); + } }