-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-24086][checkpoint] Rebuilding the SharedStateRegistry only when the restore method is called for the first time. #17179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 12f5359 (Tue Sep 07 09:22:58 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
12f5359 to
203711c
Compare
|
Hi, @dawidwys, I'm sorry to update it for so long because I have been busy with other things recently. |
…n the restore method is called for the first time. From FLINK-22483, the CompletedCheckpointStore will not change during task failover, so we only need to rebuild the SharedStateRegistry once, which can reduce the recovery time during failover.
| JobID jobId, | ||
| int maxNumberOfCheckpointsToRetain, | ||
| ClassLoader userClassLoader, | ||
| SharedStateRegistry sharedStateRegistry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is a good idea. It does not define a clear contract for the SharedStateRegistry. Is it empty? Does it have entries? What should we do about it if it is not empty?
It should be up to the CheckpointRecoveryFactory to tell where does the SharedStateRegistry comes from.
| */ | ||
| boolean requiresExternalizedCheckpoints(); | ||
|
|
||
| void registerSharedState(Map<OperatorID, OperatorState> operatorStates); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mixes responsibilities of the two classes/interfaces (CompletedCheckpointStore & SharedStateRegistry). I am not against coupling those two (as they're lifecycles are coupled already), but not in this way.
Maybe it would make sense to add getSharedStateRegistry(). I guess we would need to extract an interface from SharedStateRegistry then.
|
@dmvk Do you have any opinion about the direction we're going for here? |
|
@dawidwys I agree with the sentiment of this PR. It is basically a follow up to https://issues.apache.org/jira/browse/FLINK-22483, that could further reduce the time needed for restoring the state. I'll try to look at the PR later, but from a quick look, I agree with your comments. We should try to keep the |
|
btw, good job @liming30, this is a great improvement |
|
Hi, @dawidwys, I checked the registration of the shared state again. Maybe the shared state in |
dawidwys
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I must apologize for the delay.
Generally speaking I quite like the structure, that the SharedStateRegistry comes out of CompletedCheckpointStore. I had some inline comments that need to be addressed.
Lastly, I'd really like to hear from @dmvk and/or @rkhachatryan before we merge it.
| } | ||
|
|
||
| @Test | ||
| public void testSharedStateRegistrationWithoutRebuildSharedStateRegistry() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test tests way more than it claims to do. Moreover I don't think it tests the no-rebuilding at all. None of the methods from CheckpointRecoveryFactory are used here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Dawid, it tests the existing functionality but the one that was changed.
I think CheckpointRecoveryFactory implementations should be unit-tested. And ideally, their use by schedulers too (maybe using TestingCheckpointRecoveryFactory).
WDYT?
| JobID jobId, | ||
| int maxNumberOfCheckpointsToRetain, | ||
| ClassLoader userClassLoader, | ||
| SharedStateRegistryFactory sharedStateRegistryFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we need the SharedStateRegistryFactory parameter. I'd say it should be up to the CompletedCheckpointStore/CheckpointRecoveryFactory to decide about the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it could also be passed as a constructor parameter to CheckpointRecoveryFactory implementations, but such a factory reduces coupling (as opposed to directly constructing the registry inside CheckpointRecoveryFactory).
| this.maxRetainedCheckpoints = maxRetainedCheckpoints; | ||
| this.checkpoints.addAll(initialCheckpoints); | ||
|
|
||
| for (CompletedCheckpoint completedCheckpoint : this.checkpoints) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks wrong. If the sharedStateRegistry is restored, we will register checkpoints twice. If the SharedStateRegistry was passed from the outside we should assume it's properly populated already.
| CheckpointRecoveryFactory withoutCheckpointStoreRecovery(IntFunction<T> storeFn) { | ||
| return new PerJobCheckpointRecoveryFactory<>( | ||
| (maxCheckpoints, previous) -> { | ||
| (maxCheckpoints, previous, sharedStateRegistry) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need this extra parameter? You have the sharedStateRegistry inside of the previous already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess previous is optional during the 1st invokation?
rkhachatryan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @liming30 .
LGTM in general, I've left some remarks, PTAL.
| JobID jobId, | ||
| int maxNumberOfCheckpointsToRetain, | ||
| ClassLoader userClassLoader, | ||
| SharedStateRegistryFactory sharedStateRegistryFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it could also be passed as a constructor parameter to CheckpointRecoveryFactory implementations, but such a factory reduces coupling (as opposed to directly constructing the registry inside CheckpointRecoveryFactory).
| CheckpointRecoveryFactory withoutCheckpointStoreRecovery(IntFunction<T> storeFn) { | ||
| return new PerJobCheckpointRecoveryFactory<>( | ||
| (maxCheckpoints, previous) -> { | ||
| (maxCheckpoints, previous, sharedStateRegistry) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess previous is optional during the 1st invokation?
| } | ||
|
|
||
| @Test | ||
| public void testSharedStateRegistrationWithoutRebuildSharedStateRegistry() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Dawid, it tests the existing functionality but the one that was changed.
I think CheckpointRecoveryFactory implementations should be unit-tested. And ideally, their use by schedulers too (maybe using TestingCheckpointRecoveryFactory).
WDYT?
| // We create a new shared state registry object, so that all pending async disposal | ||
| // requests from previous runs will go against the old object (were they can do no | ||
| // harm). This must happen under the checkpoint lock. | ||
| sharedStateRegistry.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this call now missing and the old registry isn't closed?
|
Hi @liming30 , are you planning to merge this PR in 1.15? If you aim at a different release then I could implement those changes in my PR (or take over this PR if you don't mind). |
|
We're also preparing some follow-up PRs that build on top of this one. It would be great if we could finish it soon ;) |
pushing SharedStateRegistry creation down the stack and passing checkpoints to it
by pushing SharedStateRegistry creation down the stack and passing checkpoints to it
by closing SharedStateRegistry on CompletedCheckpointStore.shutdown
|
I've opened #18001 to address the feedback above, rebase and merge it. |
by pushing SharedStateRegistry creation down the stack and passing checkpoints to it
by closing SharedStateRegistry on CompletedCheckpointStore.shutdown
by pushing SharedStateRegistry creation down the stack and passing checkpoints to it
by closing SharedStateRegistry on CompletedCheckpointStore.shutdown
|
superseded by #18001 |
[FLINK-24086][checkpoint] Rebuilding the SharedStateRegistry only when the restore method is called for the first time.
From FLINK-22483, the CompletedCheckpointStore will not change during task failover, so we only need to rebuild the SharedStateRegistry once, which can reduce the recovery time during failover.
What is the purpose of the change
Move
SharedStateRigistrytoCompleteCheckpointStoreto make the life cycle of them consistent, so that we only need to re-register the shared state once when theCompleteCheckpointStoreis restored, which can reduce the time of task failover.Brief change log
SharedStateRegistryto thecreateRecoveredCompletedCheckpointStoremethod ofCheckpointRecoveryFactory, and letCompletedCheckpointStoremanageSharedStateRegistryby itself.CheckpointCoordinatorno longer managesSharedStateRegistry.CompletedCheckpointStoreis recovering, it will also register the shared state of the recoveredCompletedCheckpoint.CompletedCheckpointStoreadds theregisterSharedStatemethod, which is used to provide an interface for theCheckpointCoordinatorto register the shared state.Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation