Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2976] Allow to trigger checkpoints manually #1434

Closed
wants to merge 9 commits into from

Conversation

uce
Copy link
Contributor

@uce uce commented Dec 4, 2015

This PR contains documentation: https://github.com/uce/flink/blob/2976-savepoints/docs/apis/savepoints.md

In a nutshell, savepoints (*) are manually triggered checkpoints, which take a snapshot of a program and write it out to an external state backend. This allows you to stop and resume your program without loosing intermediate state.

Why is this nice? Because you don't have to replay everything when you redeploy your long running streaming job after changing it or updating to a newer Flink version.

(*) Initially I wrote it as saFepoints, but then settled on saVepoints after stubmling across a related feature in an Oracle SQL reference. What do you think? 😄 http://doodle.com/poll/2z2cp9hxu7eucdsz

Example

Start your stateful streaming program via flink/bin run ....

$ bin/flink list
------------------ Running/Restarting Jobs -------------------
04.12.2015 13:51:10 : 46da86f25ca8daa1bbff8ccae64d53af : Flink Streaming Job (RUNNING)
--------------------------------------------------------------

Wait for some checkpoints to complete:

$ tail -f log/flink-hadoop-client-uce-m.log
...
13:50:59,806 INFO  org.apache.flink.runtime.jobmanager.JobManager - Status of job 46da86f25ca8daa1bbff8ccae64d53af (Flink Streaming Job) changed to RUNNING.
...
13:55:37,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1449150937225
13:55:37,581 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1
13:55:42,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1449150942225
13:55:42,328 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2
...
13:56:27,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 11 @ 1449150987225
13:56:27,237 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 11
...

Trigger a savepoint and cancel the job:

$ bin/flink savepoint 46da86f25ca8daa1bbff8ccae64d53af
Triggering savepoint for job 46da86f25ca8daa1bbff8ccae64d53af. Waiting for response...
Savepoint completed. Path: jobmanager://savepoints/1
You can resume your program from this savepoint with the run command.

$ bin/flink cancel 46da86f25ca8daa1bbff8ccae64d53af

Now you can restart the program from the savepoint:

$ bin/flink run --fromSavepoint jobmanager://savepoints/1 ...

This will resume the application from the state of the savepoint.

Changes to Flink

I focussed on not changing any major Flink component for this. Savepoints use the same checkpointing mechanism as the periodic checkpoints with some plumbing around it.

Savepoint coordinator

In addition to the CheckpointCoordinator, we add another instance of the CheckpointCoordinator called SavepointCoordinator. This class extends the regular coordinator and registers some callbacks on shutdown, fully ack'ed checkpoint, and cancelled checkpoint. For this, I've added three callback methods to the checkpoint coordinator, which are overwritten by the savepoint coordinator. With two separate coordinators, periodic checkpoints and savepoints don't interfere with each other.

The savepoint coordinator manages a map of checkpoint ID => future. The futures are completed when the checkpoint is ack'ed or cancelled (or the coordinator shuts down altogether).

Restoring

Restore happens on job submission if a savepoint path is provided in the JobSnapshottingSettings. The restore mechanism is similar to the regular checkpoint restore, but with some further sanity checks to ensure that the state to task mapping is correct (see below). All state has to be mapped to the restored program.

JobManagerMessages

Added TriggerSavepoint(JobID) and DisposeSavepoint(String) Akka messages to the job manager. They trigger and dispose the savepoints respectively. These operations work asynchronously and respond the the request when the savepoint futures complete. The requests are triggered by the user (see CLI frontend).

Hashing of StreamNodes

The state to task mapping of checkpoints happens via (jobVertexID, subtaskIndex). With this change, the jobVertexIDs of streaming programs are generated deterministically with respect to the structure of the program. This is needed to make sure that a restore with a new program can map the savepoint state to the tasks.

The hash starts from the sources and takes multiple things into account:

  • parallelism
  • user function class
  • hash of the input
  • hash of the outputs
  • stream node ID

The automatic generations makes sure that you can just use the savepoints, but it is actually not recommended, because you cannot change the program in any meaningful way (except changing the user function internals).

That's why the recommended option is to specify a unique ID as input to the hasher on the DataStream API level:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource()).uid("source-id")
  .shuffle()
  // The stateful mapper with ID
  .map(new StatefulMapper()).uid("mapper-id")

// Stateless sink (no specific ID required)
stream.print()

If you give IDs to all stateful operators, you can happily re-arrange and change the topology (except for parallelism, see below).

Application ID and DbStateBackend

Savepoints are pairs of (ApplicationID, CompletedCheckpoint). I've added a new ApplicationID to allow scoping tasks across multiple job submissions (which have changing job IDs). This is for example required to restore state in the DbStateBackend. After consulting with @gyfora I've changed all references to JobID in DbStateBackend to ApplicationID.

The ApplicationID is assigned in the ExecutionGraph only and is reset to the application ID of the savepoint if there is a restore operation. This touches almost nothing of the existing code and it is only propagated to the TaskDeploymentDescriptor and Environment of the Task instances.

State storage

The state storage does not instantiate the regular state backend on the job manager. It is essentially a set of a few helper classes, which allow to put and get some state to the file system or the job manager's heap. I think this is fine for now, because I didn't want to make changes to the central state abstractions, which are kind of in flux right now. But we should think about it in the future.

Configuration and CLIFrontend

This works out of the box if the job is using checkpointing. The default state backend for savepoints is jobmanager, which allows to stop and restore a program while the same job manager is running.

For configuration, there are two new keys:

state.backend.savepoints
state.backend.savepoints.fs.dir

If you don't specify these, the regular state backend configuration is used with the jobmanager as a fallback if no viable config is found.

@uce uce force-pushed the 2976-savepoints branch 3 times, most recently from d86f5c1 to 25e6a08 Compare December 4, 2015 18:53
@rmetzger
Copy link
Contributor

rmetzger commented Dec 5, 2015

Awesome change! I think its a feature many users are asking for because its crucial for production use!
I'll try to review it soon.

@aljoscha
Copy link
Contributor

aljoscha commented Dec 7, 2015

I think save point is the way to go, see here: https://en.wikipedia.org/wiki/Saved_game#Save_point 😄

@aljoscha
Copy link
Contributor

aljoscha commented Dec 7, 2015

Very nice, I haven't yet gone through the code in detail but from the description and doc it looks very good.

One thing I was confused about is the naming of the config options:

state.backend.savepoints: filesystem
state.backend.savepoints.fs.dir: hdfs:///flink/savepoints

I think this could confuse users into thinking that a different state backend is used on the task manager to store the actual checkpoint data. If I'm not mistaken this is purely a JobManager thing, though. Also, before the naming scheme for stuff in state.backend was that state.backend.x.y was the setting of option y for state backend x. Now it looks like there is a state backend savepoints. Maybe we could change this to:

savepoints.state.backend: filesystem
savepoints.state.backend.fs.dir: hdfs:///flink/savepoints

to clarify that this is changing the state backend settings for the savepoints on the JobManager.

@uce
Copy link
Contributor Author

uce commented Dec 7, 2015

@aljoscha I like your proposal. I will rename it accordingly.

under the License.
-->

Programs written in the [Data Stream API]({{ site.baseurl }}/apis/streaming_guide.html) can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without loosing any state. This page covers all steps to trigger, restore, and dispose savepoints. For more details on how Flink handles state and failures, check out the [State in Streaming Programs]({{ site.baseurl }}/apis/state_backends.html) and [Fault Tolerance]({{ site.baseurl }}/apis/fault_tolerance.html) pages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: loosing --> losing


**Note**: If you don't configure a specific directory, the checkpoint directory (config key `state.backend.fs.checkpointdir`) will be used.

**Important**: A savepoint is a pointer to completed checkpoint. That means that the state of a savepoint is not only found in the savepoint file itself, but also needs the actual checkpoint data (e.g. in a set of further files).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can say here more prominently, that state.backend.savepoints: filesystem and state.backend: jobmanager does not make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Let's also add it as a sanity check in the code


// Failure, respond with the cause
case scala.util.Failure(t) =>
senderRef ! TriggerSavepointFailure(jobId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you break argument lists, then every argument is written in its own line.

@tillrohrmann
Copy link
Contributor

Really good work @uce. I only had a comment concerning the hash generation and compatibility between topologies and some minor style comments. Once these are addressed +1 for merging.

@gyfora
Copy link
Contributor

gyfora commented Dec 13, 2015

How do I increase the timeout for the savepoint? I rebased on the latest version but it still times out on 100000 ms.

Otherwise awesome work :)

@uce
Copy link
Contributor Author

uce commented Dec 14, 2015

@tillrohrmann Thanks for the review. I will address the remaining points and get back.

@gyfora I changed the client timeout to INF, but it somehow got back in. I will address this. I think it's fine to have it as INF and let the checkpoint timeout handle it (default 10 mins). The user can just break out of the client (which will not cancel the savepoint though – something we can address as a follow up).

@gyfora
Copy link
Contributor

gyfora commented Dec 14, 2015

I think we should first check whether the savepoint exists or not before trying to restore from it. Currently an invalid savepoint will send the job into a restart loop.

@uce
Copy link
Contributor Author

uce commented Dec 16, 2015

I've addressed the last comments.

Regarding the deterministic hashing and ordering of the graph: changing anything but the user function changes the hash (if not specified manually).

If we don't want this, we will run into collisions when you have the same transformations more than once in the graph. I think it is OK to be aggressive with this in the beginning. I've tried to make this point clearer in the docs.

@aljoscha
Copy link
Contributor

I think this is good behavior, it is conservative and if users want it differently they can manually specify the ID.

@gyfora
Copy link
Contributor

gyfora commented Dec 30, 2015

Should we merge this?

@gyfora
Copy link
Contributor

gyfora commented Jan 4, 2016

I rebased your branch on the current master, if you want you can use this :)

https://github.com/gyfora/flink/commits/ufuk-2976-rebased

@uce
Copy link
Contributor Author

uce commented Jan 11, 2016

Thanks Gyula! I will rebase and merge this if there are no objections.

…on stream node hash

[comments] Remove unused argument to method

[comments] Add more comments to stream node hashing

Add name to hash in order to detect swapped nodes when they have names

Improve error message on non-unique user-specified IDs

[comments] Add comment to stream node hashing
@uce
Copy link
Contributor Author

uce commented Jan 11, 2016

I've rebased this. Waiting for Travis and then merging...

[comments] Rename config keys

[comments] Fix docs and don't overload savepoint backend configuration with checkpoint backend configuration

[comments] Use ConcurrentMap in HeapStateStore

[comments] Fix typo and add missing serialVersionUID

[comments] Fix Scala style

[comments] Fix Scala style

[docs] Emphasize dangers and recommended approaches

Add test to show inf restart loop on submission with unknown savepoint path

[comments] Suppress resart of savepoint recovery failure
…instead of JobID

[comments] Set larger timeout for future when triggering savepoint
@gyfora
Copy link
Contributor

gyfora commented Jan 11, 2016

👍

@asfgit asfgit closed this in d739ee2 Jan 11, 2016
@uce uce deleted the 2976-savepoints branch January 21, 2016 09:35
@coveralls
Copy link

Coverage Status

Changes Unknown when pulling d974334 on uce:2976-savepoints into ** on apache:master**.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants