From 3c6f0f3377abb35ed5b04fdcdb599705bc0af162 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Wed, 22 Nov 2017 17:52:35 +0100 Subject: [PATCH 1/2] [FLINK-5465] [streaming] Wait for pending timer threads to finish or to exceed a time limit in exceptional stream task shutdown --- .../runtime/tasks/ProcessingTimeService.java | 12 ++++ .../streaming/runtime/tasks/StreamTask.java | 15 ++++- .../tasks/SystemProcessingTimeService.java | 6 ++ .../tasks/TestProcessingTimeService.java | 6 ++ .../SystemProcessingTimeServiceTest.java | 65 +++++++++++++++++++ 5 files changed, 102 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java index b2382529e1c12..a4586d435477e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * Defines the current processing time and handles all related actions, @@ -93,4 +94,15 @@ public abstract class ProcessingTimeService { * will result in a hard exception. */ public abstract void shutdownService(); + + /** + * Shuts down and clean up the timer service provider hard and immediately. This does wait + * for all timer to complete or until the time limit is exceeded. Any call to + * {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method. + * @param time time to wait for termination. + * @param timeUnit time unit of parameter time. + * @return {@code true} if this timer service and all pending timers are terminated and + * {@code false} if the timeout elapsed before this happened. + */ + public abstract boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 36e67485650da..3bc5a681e5dc4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -69,6 +69,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -122,6 +123,8 @@ public abstract class StreamTask> /** The logger used by the StreamTask and its subclasses. */ private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); + private static final long TIMER_SERVICE_TERMINATION_AWAIT_MS = 7500L; + // ------------------------------------------------------------------------ /** @@ -319,9 +322,17 @@ public final void invoke() throws Exception { isRunning = false; // stop all timers and threads - if (timerService != null) { + if (timerService != null && !timerService.isTerminated()) { try { - timerService.shutdownService(); + + // wait for a reasonable time for all pending timer threads to finish + boolean timerShutdownComplete = + timerService.shutdownAndAwaitPending(TIMER_SERVICE_TERMINATION_AWAIT_MS, TimeUnit.MILLISECONDS); + + if (!timerShutdownComplete) { + LOG.warn("Timer service shutdown exceeded time limit while waiting for pending timers. " + + "Will continue with shutdown procedure."); + } } catch (Throwable t) { // catch and log the exception to not replace the original exception diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java index 71bfdf6f7d5f6..be8b23c7ee861 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java @@ -191,6 +191,12 @@ public void shutdownService() { } } + @Override + public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException { + shutdownService(); + return timerService.awaitTermination(time, timeUnit); + } + // safety net to destroy the thread pool @Override protected void finalize() throws Throwable { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index 080eeb55ddc1d..2081f193cfcb1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -134,6 +134,12 @@ public void shutdownService() { this.isTerminated = true; } + @Override + public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException { + shutdownService(); + return true; + } + public int getNumActiveTimers() { int count = 0; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index 4c105d3c903a1..01fd7789b4682 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -22,11 +22,13 @@ import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -442,4 +444,67 @@ public void onProcessingTime(long timestamp) throws Exception { latch.await(); assertTrue(exceptionWasThrown.get()); } + + @Test + public void testShutdownAndWaitPending() { + + final Object lock = new Object(); + final OneShotLatch waitUntilTimerStarted = new OneShotLatch(); + final OneShotLatch blockUntilTerminationInterrupts = new OneShotLatch(); + final OneShotLatch blockUntilTriggered = new OneShotLatch(); + final AtomicBoolean check = new AtomicBoolean(true); + + final SystemProcessingTimeService timeService = new SystemProcessingTimeService( + (message, exception) -> { + }, + lock); + + timeService.scheduleAtFixedRate( + timestamp -> { + + waitUntilTimerStarted.trigger(); + + try { + blockUntilTerminationInterrupts.await(); + check.set(false); + } catch (InterruptedException ignore) { + } + + try { + blockUntilTriggered.await(); + } catch (InterruptedException ignore) { + check.set(false); + } + }, + 0L, + 10L); + + try { + waitUntilTimerStarted.await(); + } catch (InterruptedException e) { + Assert.fail(); + } + + Assert.assertFalse(timeService.isTerminated()); + + // Check that we wait for the timer to terminate. As the timer blocks on the second latch, this should time out. + try { + Assert.assertFalse(timeService.shutdownAndAwaitPending(1, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Assert.fail("Unexpected interruption."); + } + + // Let the timer proceed. + blockUntilTriggered.trigger(); + + // Now we should succeed in terminating the timer. + try { + Assert.assertTrue(timeService.shutdownAndAwaitPending(60, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Assert.fail("Unexpected interruption."); + } + + Assert.assertTrue(check.get()); + Assert.assertTrue(timeService.isTerminated()); + } } From 8f5dde095d466328c18877a55054434afc2d70a5 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Thu, 23 Nov 2017 12:24:56 +0100 Subject: [PATCH 2/2] cluster-configurable timeout --- .../configuration/TimerServiceOptions.java | 38 +++++++++++++++++++ .../runtime/tasks/ProcessingTimeService.java | 2 +- .../streaming/runtime/tasks/StreamTask.java | 13 ++++--- 3 files changed, 46 insertions(+), 7 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java new file mode 100644 index 0000000000000..835adce4035c0 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.configuration; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** + * Timer service configuration options. + */ +@PublicEvolving +public class TimerServiceOptions { + + /** + * This configures how long we wait for the {@link org.apache.flink.streaming.runtime.tasks.ProcessingTimeService} + * to finish all pending timer threads when the stream task performs a failover shutdown. See FLINK-5465. + */ + public static final ConfigOption TIMER_SERVICE_TERMINATION_AWAIT_MS = ConfigOptions + .key("timerservice.exceptional.shutdown.timeout") + .defaultValue(7500L); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java index a4586d435477e..251629987b365 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java @@ -97,7 +97,7 @@ public abstract class ProcessingTimeService { /** * Shuts down and clean up the timer service provider hard and immediately. This does wait - * for all timer to complete or until the time limit is exceeded. Any call to + * for all timers to complete or until the time limit is exceeded. Any call to * {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method. * @param time time to wait for termination. * @param timeUnit time unit of parameter time. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 3bc5a681e5dc4..eff8a291fb791 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -51,6 +51,7 @@ import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.configuration.TimerServiceOptions; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; @@ -123,8 +124,6 @@ public abstract class StreamTask> /** The logger used by the StreamTask and its subclasses. */ private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); - private static final long TIMER_SERVICE_TERMINATION_AWAIT_MS = 7500L; - // ------------------------------------------------------------------------ /** @@ -221,7 +220,6 @@ public final void invoke() throws Exception { LOG.debug("Initializing {}.", getName()); asyncOperationsThreadPool = Executors.newCachedThreadPool(); - configuration = new StreamConfig(getTaskConfiguration()); CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory(); @@ -325,13 +323,16 @@ public final void invoke() throws Exception { if (timerService != null && !timerService.isTerminated()) { try { + final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration(). + getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS); + // wait for a reasonable time for all pending timer threads to finish boolean timerShutdownComplete = - timerService.shutdownAndAwaitPending(TIMER_SERVICE_TERMINATION_AWAIT_MS, TimeUnit.MILLISECONDS); + timerService.shutdownAndAwaitPending(timeoutMs, TimeUnit.MILLISECONDS); if (!timerShutdownComplete) { - LOG.warn("Timer service shutdown exceeded time limit while waiting for pending timers. " + - "Will continue with shutdown procedure."); + LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " + + "timers. Will continue with shutdown procedure.", timeoutMs); } } catch (Throwable t) {