[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start#4410
Merged
asfgit merged 3 commits intoapache:masterfrom Aug 15, 2017
Merged
Conversation
Contributor
Author
|
CC @aljoscha |
This was referenced Jul 27, 2017
Closed
Contributor
fdc84c0 to
10adb9d
Compare
…Case This helps tease out races, for example the recently discovered one in cleanup of incremental state handles at the SharedStateRegistry. (cherry picked from commit d7683cc)
10adb9d to
d29bed3
Compare
tedyu
reviewed
Aug 23, 2017
| sharedStateRegistry.close(); | ||
| sharedStateRegistry = sharedStateRegistryFactory.create(executor); | ||
|
|
||
| // Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery |
Contributor
There was a problem hiding this comment.
If we use highAvailabilityServices.getJobManagerLeaderRetriever(), Job Id is required.
Can Job Id be obtained from JobVertexID ?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
This PR fixes FLINK-7268. The problem was that
ZookeeperCompletedCheckpointStoredeletes checkpoints asynchronously. When this happens parallel to a restart, it could happen that the async delete performed shared state de-registration.Before this PR, the old
SharedStateRegistrywas kept after restart and the counts where updated from the completed checkpoint store. In the described race, a checkpoint that has a pending delete will not contribute to the new count, but it can still decrement the count once the shared state is unregistered in the async deletion thread. This can accidentally drop counts below 1 and lead to data loss.The core idea behind the PR is to scope the
SharedStateRegistryper (re-)start, so that old pending deletes cannot influence the current count.SharedStateRegistryis now created via a factory that is passed into theCheckpointCoordinatorto simplify testing.The PR also introduces additional tests and improves the debug/trace logging of incremental checkpointing.
Verifying this change
This change added tests and can be verified as follows:
Run a job with keyed state, using incremental checkpoints and HA mode. Kill TMs to trigger recovery. After a couple of attempts, the problematic condition should be triggered, leading to an infinite recovery loop due to
FileNotFoundException. This should be fixed in the PR.Additional tests:
HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCaseCheckpointCoordinatortest::testSharedStateRegistrationOnRestoreIncrementalKeyedStateHandleTest::testSharedStateReRegistrationDoes this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation