From 4fe2e18b7df37ca25f71a274d94b7c14a540f698 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Wed, 10 Jun 2015 15:33:58 +0200 Subject: [PATCH] [streaming] Allow force-enabling checkpoints for iterative jobs --- docs/apis/streaming_guide.md | 14 +++++++-- .../StreamExecutionEnvironment.java | 31 +++++++++++++++++++ .../streaming/api/graph/StreamGraph.java | 11 +++++-- .../api/graph/StreamingJobGraphGenerator.java | 1 - .../runtime/tasks/StreamIterationHead.java | 2 ++ .../runtime/tasks/StreamIterationTail.java | 3 ++ .../streaming/runtime/tasks/StreamTask.java | 2 +- .../flink/streaming/api/IterateTest.java | 16 ++++++++++ .../scala/StreamExecutionEnvironment.scala | 19 ++++++++++++ 9 files changed, 93 insertions(+), 6 deletions(-) diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md index de6340e00e3b5..0a7a486022f4d 100644 --- a/docs/apis/streaming_guide.md +++ b/docs/apis/streaming_guide.md @@ -1188,7 +1188,15 @@ Rich functions provide, in addition to the user-defined function (`map()`, `redu Stateful computation ------------ -Flink supports the checkpointing and persistence of user defined state, so in case of a failure this state can be restored to the latest checkpoint and the processing can continue from there. This gives exactly once semantics for anything that is sotred in the state. For example when implementing a rolling count over the stream Flink gives you the possibility to safely store the counter. Another common usacase is when reading from a Kafka source to save the latest committed offset to catch up from. To mark a source for checkpointing it has to implement the `flink.streaming.api.checkpoint.Checkpointed` interface or preferably its special case where the checkpointing can be done asynchronously, `CheckpointedAsynchronously`. For example let us write a reduce function that besides summing the data it also counts have many elements it has seen. +Flink supports the checkpointing and persistence of user defined state, so in case of a failure this state can be restored to the latest checkpoint and the processing can continue from there. This gives exactly once semantics for anything that is stored in the state when the sources are stateful as well and checkpoint their current offset. The `PersistentKafkaSource` provides this stateful functionality for example. + +For example when implementing a rolling count over the stream Flink gives you the possibility to safely store the counter. Another common usecase is when reading from a Kafka source to save the latest committed offset to catch up from. To mark a function for checkpointing it has to implement the `flink.streaming.api.checkpoint.Checkpointed` interface or preferably its special case where the checkpointing can be done asynchronously, `CheckpointedAsynchronously`. + +Checkpointing can be enabled from the `StreamExecutionEnvironment` using the `enableCheckpointing(…)` where additional parameters can be passed to modify the default 5 second checkpoint interval. + +By default state checkpoints will be stored in-memory at the JobManager. Flink also supports storing the checkpoints on any flink-supported file system (such as HDFS or Tachyon) which can be set in the flink-conf.yaml. + +For example let us write a reduce function that besides summing the data it also counts have many elements it has seen. {% highlight java %} public class CounterSum implements ReduceFunction, CheckpointedAsynchronously { @@ -1257,7 +1265,9 @@ public static class CounterSource implements SourceFunction, CheckpointedA } {% endhighlight %} -Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointComitter` interface. +Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointComitter` interface. + +Fink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: `env.enableCheckpointing(interval, force = true)`. [Back to top](#top) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index f009871fd0da5..b43e123eb9689 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -241,6 +241,37 @@ public StreamExecutionEnvironment enableCheckpointing(long interval) { streamGraph.setCheckpointingInterval(interval); return this; } + + /** + * Method for force-enabling fault-tolerance. Activates monitoring and + * backup of streaming operator states even for jobs containing iterations. + * + * Please note that the checkpoint/restore guarantees for iterative jobs are + * only best-effort at the moment. Records inside the loops may be lost + * during failure. + *

+ *

+ * Setting this option assumes that the job is used in production and thus + * if not stated explicitly otherwise with calling with the + * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method + * in case of failure the job will be resubmitted to the cluster + * indefinitely. + * + * @param interval + * Time interval between state checkpoints in millis + * @param force + * If true checkpointing will be enabled for iterative jobs as + * well + */ + @Deprecated + public StreamExecutionEnvironment enableCheckpointing(long interval, boolean force) { + streamGraph.setCheckpointingEnabled(true); + streamGraph.setCheckpointingInterval(interval); + if (force) { + streamGraph.forceCheckpoint(); + } + return this; + } /** * Method for enabling fault-tolerance. Activates monitoring and backup of diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index d559ed3f14303..8ef4ca02e44e8 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -79,6 +79,7 @@ public class StreamGraph extends StreamingPlan { private Map streamLoops; protected Map vertexIDtoLoop; private StateHandleProvider stateHandleProvider; + private boolean forceCheckpoint = false; public StreamGraph(StreamExecutionEnvironment environment) { @@ -118,6 +119,10 @@ public void setCheckpointingEnabled(boolean checkpointingEnabled) { public void setCheckpointingInterval(long checkpointingInterval) { this.checkpointingInterval = checkpointingInterval; } + + public void forceCheckpoint() { + this.forceCheckpoint = true; + } public void setStateHandleProvider(StateHandleProvider provider) { this.stateHandleProvider = provider; @@ -408,9 +413,11 @@ public JobGraph getJobGraph() { public JobGraph getJobGraph(String jobGraphName) { // temporarily forbid checkpointing for iterative jobs - if (isIterative() && isCheckpointingEnabled()) { + if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) { throw new UnsupportedOperationException( - "Checkpointing is currently not supported for iterative jobs!"); + "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. " + + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. " + + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); } setJobName(jobGraphName); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 9e12a682f8da3..6a6e899c31781 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -398,7 +398,6 @@ private void configureCheckpointing() { JobSnapshottingSettings settings = new JobSnapshottingSettings( triggerVertices, ackVertices, commitVertices, interval); - jobGraph.setSnapshotSettings(settings); int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries(); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java index 343f495d063f1..e69f533cb2d71 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java @@ -65,6 +65,7 @@ public void registerInputOutput() { @SuppressWarnings("unchecked") @Override public void invoke() throws Exception { + isRunning = true; if (LOG.isDebugEnabled()) { LOG.debug("Iteration source {} invoked", getName()); } @@ -96,6 +97,7 @@ public void invoke() throws Exception { } finally { // Cleanup + isRunning = false; outputHandler.flushOutputs(); clearBuffers(); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java index 9d3651609026f..5bbae06a65c80 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java @@ -57,6 +57,8 @@ public void registerInputOutput() { @Override public void invoke() throws Exception { + isRunning = true; + if (LOG.isDebugEnabled()) { LOG.debug("Iteration sink {} invoked", getName()); } @@ -74,6 +76,7 @@ public void invoke() throws Exception { } finally { // Cleanup + isRunning = false; clearBuffers(); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index bb641d9ce8d5f..b55272c80ecdb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -229,7 +229,7 @@ public void setInitialState(StateHandle stateHandle) throws Except @Override public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception { - + synchronized (checkpointLock) { if (isRunning) { try { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java index ebc4c93a817ac..9c6ff5c697858 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -207,6 +207,22 @@ public void testWithCheckPointing() throws Exception { } catch (UnsupportedOperationException e) { // expected behaviour } + + + // Test force checkpointing + + try { + env.enableCheckpointing(1, false); + env.execute(); + + // this statement should never be reached + fail(); + } catch (UnsupportedOperationException e) { + // expected behaviour + } + + env.enableCheckpointing(1, true); + env.getStreamGraph().getJobGraph(); } diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 7371c9115fe01..c5030ed1e0f02 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -118,12 +118,31 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** * Method for enabling fault-tolerance. Activates monitoring and backup of streaming * operator states. Time interval between state checkpoints is specified in in millis. + * + * If the force flag is set to true, checkpointing will be enabled for iterative jobs as + * well.Please note that the checkpoint/restore guarantees for iterative jobs are + * only best-effort at the moment. Records inside the loops may be lost during failure. * * Setting this option assumes that the job is used in production and thus if not stated * explicitly otherwise with calling with the * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of * failure the job will be resubmitted to the cluster indefinitely. */ + @deprecated + def enableCheckpointing(interval : Long, force: Boolean) : StreamExecutionEnvironment = { + javaEnv.enableCheckpointing(interval, force) + this + } + + /** + * Method for enabling fault-tolerance. Activates monitoring and backup of streaming + * operator states. Time interval between state checkpoints is specified in in millis. + * + * Setting this option assumes that the job is used in production and thus if not stated + * explicitly otherwise with calling with the + * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of + * failure the job will be resubmitted to the cluster indefinitely. + */ def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = { javaEnv.enableCheckpointing(interval) this