Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2354] Add job graph and checkpoint recovery #1153

Closed
wants to merge 8 commits into from

Conversation

uce
Copy link
Contributor

@uce uce commented Sep 21, 2015

tl;dr

This PR introduces JobGraph and SuccessfulCheckpoint recovery for submitted programs in case of JobManager failures.

General Idea

The general idea is to persist job graphs and successful checkpoints in ZooKeeper.

We have introduced JobManager high availability via ZooKeeper in #1016. My PR builds on top of this and adds initial support for program recovery. We can recover both programs and successful checkpoints in case of a JobManager failure as soon as a standby job manager is granted leadership.

ZooKeeper's sweet spot is rather small data (in KB range), but job graph and checkpoint state can grow larger. Therefore we don't directly persist the actual metadata, but use the state backend as a layer of indirection. We create state handles for the job graph and completed checkpoints and persist those. The state handle acts as a pointer to the actual data.

At the moment, only the file system state backend is supported for this. The state handles need to be accessible from both task and job managers (e.g. a DFS).

Configuration

The minimal required configuration:

recovery.mode: ZOOKEEPER
ha.zookeeper.quorum: <ZooKeeper quroum peers>
state.backend: FILESYSTEM
state.backend.fs.dir.recovery: /path/to/recovery

I don't like the current configuration keys. Until the next release, I would like a more consistent naming, e.g. prefix everything with recovery.zookeeper.

ZooKeeper Nodes Overview

Overview of ZNodes and components managing them:

O- /flink
|
+----O /flink/jobgraphs (SubmittedJobGraphs)
|    |
|    +----O /flink/jobgraphs/<job-id>
|
+----O /flink/checkpoints  (CompletedCheckpoints)
|    |
|    +----O /flink/checkpoints/<job-id>
|    .    |
|    .    +----O /flink/checkpoints/<job-id>/1
|    .    |
|    .    +----O /flink/checkpoints/<job-id>/N
|
+----O /flink/checkpoint-counter (CheckpointIDCounter)
     |
     +----O /flink/checkpoints/<job-id>

Implementation

Submission vs. Recovery (JobManager and SubmittedJobGraphs)

  • ZooKeeperSubmittedJobGraphs manages SubmittedJobGraph state handles in ZooKeeper
  • Submission and recovery follow mostly the same code paths (see JobManager#submitJob()).
  • On (initial) submission:
    • After writing to ZooKeeper the JM checks synchronously whether she is still leader.
    • If not, the job is not scheduled for execution, but kept in ZooKeeper. Future leading JobManagers need to recover it. The client currently sees this as successful submission. The job is not removed in this case, because another job manager might recover between the write and remove. In such a case, a job would be running without being in ZooKeeper and without being acked to the client.
  • On recovery:
    • Recovery is triggered on granted leadership via the configured execution delay between retries.
    • All available jobs are scheduled for execution.
  • The ZNode for job graphs is monitored for modifications during operations. This way, a job manager can (eventually) detect if another job manager adds/removes a job and react to it.

CompletedCheckpoints

  • ZooKeeperCompletedCheckpoints manages SuccessfulCheckpoint state handles in ZooKeeper (per job). Note that a SuccessfulCheckpoint has pointers to further state handles in most cases. In this case, we add another layer of indirection.
  • Every completed checkpoint is added to ZooKeeper and identified by its checkpoint ID.
  • On recovery, the latest checkpoint is recovered. If more than one checkpoint is available, we still only recover one in order to make sure that the history of checkpoints is consistent (currently we retain only 1 checkpoint anyways, but if we ever chose to retain more) in corner cases, where multiple job managers run the same job with checkpointing for some time.

CheckpointIDCounter

  • ZooKeeperCheckpointIDCounter manages a shared counter in ZooKeeper (per job).
  • The Checkpointed interface requires ascending checkpoint IDs for each checkpoint.
  • We use a shared counter (per job) via a Curator recipe for this.

Akka messages

  • This PR introduces two new JobManager message types:
    • RecoverAllJobs
    • RecoverJob(JobID)
  • The ZooKeeper operations are blocking and all JobManager actor calls needs to make sure to not block the JobManager. I've tried to cover all cases, where a ZooKeeper operation is triggered.
  • For tests, I didn't manage to stop the JobManager actor w/o running the postStop method. Because this method has some cleanup logic (removing job graphs and checkpoints), all JobManager recovery tests run the JobManager as a separate JobManagerProcess. This is quite heavy weight. If someone knows a way to stop the actor w/o the postStop being called, it would be great to refactor this.

Next Steps

  • Behaviour on recovery via fixed delay is too simplistic.
  • Client is not fully integrated and submits jobs in detached mode if recovery mode is set to ZooKeeper.

Tests

There was a Travis/AWS outage yesterday and I couldn't run as many builds as we should yet. I would like to run a couple of runs before we merge this.

@tillrohrmann
Copy link
Contributor

Massive PR @uce :-) Reviewing it will take me 1-2 days I guess.

Concerning the problem with the postStop method, you could start a JobManager where you simply overwrite the postStop method to do nothing.

@uce uce force-pushed the recovery-2354 branch 6 times, most recently from 2d4fbc0 to ae19782 Compare September 23, 2015 18:01
@uce
Copy link
Contributor Author

uce commented Sep 23, 2015

@tillrohrmann I've rebased this on the current master and fixed two issues (the two new commits).

Session management has been added recently to the master. I don't think it works with HA at this point. I haven't checked this yet and would postpone until session management is exposed to the user.

@tillrohrmann
Copy link
Contributor

Good to hear. I'll try to review it then. Won't probably before Monday,
because tomorrow I'm on vacation.

On Wed, Sep 23, 2015 at 8:03 PM, Ufuk Celebi notifications@github.com
wrote:

@tillrohrmann https://github.com/tillrohrmann I've rebased this on the
current master and fixed two issues (the two new commits).

Session management has been added recently to the master. I don't think it
works with HA at this point. I haven't checked this yet and would postpone
until session management is exposed to the user.


Reply to this email directly or view it on GitHub
#1153 (comment).

@StephanEwen
Copy link
Contributor

The implication of sessions would only be that Jobs are kept in the "currentJobs" map, even once they are finished. That should transparently work with HA - the jobs would not be removes from ZooKeeper until they are disposed.

@uce
Copy link
Contributor Author

uce commented Sep 24, 2015

Currently, ZooKeeper only stores the latest added job graph (existing ones are overwritten), whereas in the JobManager the JobGraph is attached to the existing ExecutionGraph. In case of recovery, only the latest "attached" JobGraph will be recovered.

@uce
Copy link
Contributor Author

uce commented Sep 30, 2015

I've rebased this o the current master and added a manual ChaosMonkeyTest. The test is friendly in the sense that it waits for task managers to reconnect to the job manager before continuing to stop a jobmanager/taskmanager.

@uce
Copy link
Contributor Author

uce commented Oct 2, 2015

While working on FLINK-2804 I've noticed an issue: the user jars are only uploaded to the leading job manager at submission time and then are not available to the other job managers on recovery. A simple solution is to make the user jars available in the file state backend as well.


// some sanity checks
if (job == null || tasksToTrigger == null ||
tasksToWaitFor == null || tasksToCommitTo == null) {
throw new NullPointerException();
}
if (numSuccessfulCheckpointsToRetain < 1) {
throw new IllegalArgumentException("Must retain at least one successful checkpoint");
}
if (checkpointTimeout < 1) {
throw new IllegalArgumentException("Checkpoint timeout must be larger than zero");
}

this.job = job;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could harmonize the null checking as you've done it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've changed it in some places and not in others

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

@uce
Copy link
Contributor Author

uce commented Oct 8, 2015

Till found another issue in one of is Travis runs, which has been addressed in e54a86c.

This is now rebased on the current master.

tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Oct 8, 2015
tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Oct 8, 2015
tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Oct 8, 2015
tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Oct 9, 2015
@uce
Copy link
Contributor Author

uce commented Oct 9, 2015

Rebased on the current master and incorporated the job manager state modification fix. Thanks for that!

Can we merge this after Travis gives the green light?

Internal actor states must only be modified within the actor thread.
This avoids all the well-known issues coming with concurrency.

Fix RemoveCachedJob by introducing RemoveJob

Fix JobManagerITCase
@tillrohrmann
Copy link
Contributor

I made some more fixes for the shading of the curator dependency. Once Travis gives green light, I'll merge it.

tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Oct 11, 2015
tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Oct 15, 2015
tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Oct 16, 2015
tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Oct 19, 2015
tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Oct 19, 2015
@asfgit asfgit closed this in 73c73e9 Oct 20, 2015
cfmcgrady pushed a commit to cfmcgrady/flink that referenced this pull request Oct 23, 2015
lofifnc pushed a commit to lofifnc/flink that referenced this pull request Oct 23, 2015
@uce uce deleted the recovery-2354 branch December 24, 2015 17:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants