From 1fcf92b7c83df901351c4af9b34cf90bc82da37d Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Mon, 8 Jan 2018 11:30:08 +0100 Subject: [PATCH 1/2] [FLINK-8385] [checkpointing] Suppress logging of expected exception during snapshot cancellation. --- .../apache/flink/runtime/state/StateUtil.java | 20 ++++++++++++++++--- .../api/operators/OperatorSnapshotResult.java | 4 +--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java index 09d195add1fde..f129c318110f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java @@ -21,6 +21,11 @@ import org.apache.flink.util.FutureUtil; import org.apache.flink.util.LambdaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.RunnableFuture; /** @@ -28,6 +33,8 @@ */ public class StateUtil { + private static final Logger LOG = LoggerFactory.getLogger(StateUtil.class); + private StateUtil() { throw new AssertionError(); } @@ -63,10 +70,17 @@ public static void bestEffortDiscardAllStateObjects( public static void discardStateFuture(RunnableFuture stateFuture) throws Exception { if (null != stateFuture) { if (!stateFuture.cancel(true)) { - StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture); - if (null != stateObject) { - stateObject.discardState(); + try { + // We attempt to get a result, in case the future completed before cancellation. + StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture); + + if (null != stateObject) { + stateObject.discardState(); + } + } catch (CancellationException | ExecutionException ex) { + LOG.debug("Cancelled execution of snapshot future runnable. Cancellation produced the following " + + "exception, which is expected an can be ignored.", ex); } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java index 8aa76a5f852f7..8c05ae9b9e88a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java @@ -88,9 +88,7 @@ public void cancel() throws Exception { try { StateUtil.discardStateFuture(getKeyedStateManagedFuture()); } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed( - new Exception("Could not properly cancel managed keyed state future.", e), - exception); + exception = new Exception("Could not properly cancel managed keyed state future.", e); } try { From ef519e121896bff6617fd4030cb9468dece951c9 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Mon, 8 Jan 2018 11:31:57 +0100 Subject: [PATCH 2/2] [FLINK-8385] [checkpointing] Avoid RejectedExecutionException in SharedStateRegistry during disposal from async Zookeeper calls. --- .../flink/runtime/state/SharedStateRegistry.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index 458c695590124..664631bbc0fd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; /** * This registry manages state that is shared across (incremental) checkpoints, and is responsible @@ -194,8 +195,17 @@ private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) { // We do the small optimization to not issue discards for placeholders, which are NOPs. if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) { LOG.trace("Scheduled delete of state handle {}.", streamStateHandle); - asyncDisposalExecutor.execute( - new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle)); + AsyncDisposalRunnable asyncDisposalRunnable = new AsyncDisposalRunnable(streamStateHandle); + try { + asyncDisposalExecutor.execute(asyncDisposalRunnable); + } catch (RejectedExecutionException ex) { + // TODO This is a temporary fix for a problem during ZooKeeperCompletedCheckpointStore#shutdown: + // Disposal is issued in another async thread and the shutdown proceeds to close the I/O Executor pool. + // This leads to RejectedExecutionException once the async deletes are triggered by ZK. We need to + // wait for all pending ZK deletes before closing the I/O Executor pool. We can simply call #run() + // because we are already in the async ZK thread that disposes the handles. + asyncDisposalRunnable.run(); + } } }