New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8777][state]Improve resource release for local recovery #5578
[FLINK-8777][state]Improve resource release for local recovery #5578
Conversation
@StefanRRichter Could you please have a look at this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for re-introducing the cleanup, I had some comments in-line.
@@ -90,6 +92,10 @@ | |||
@GuardedBy("lock") | |||
private boolean disposed; | |||
|
|||
/** Whether to discard the useless state when retrieve local checkpoint state. */ | |||
@Nonnull |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This annotation does not fit here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed.
@@ -90,6 +92,10 @@ | |||
@GuardedBy("lock") | |||
private boolean disposed; | |||
|
|||
/** Whether to discard the useless state when retrieve local checkpoint state. */ | |||
@Nonnull | |||
private boolean retrieveWithDiscard; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make this a general default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed.
} | ||
|
||
@Override | ||
@Nullable | ||
public TaskStateSnapshot retrieveLocalState(long checkpointID) { | ||
synchronized (lock) { | ||
TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID); | ||
|
||
Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move all the cleanup logic in a separate method that is just invoked here to separate the concerns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed.
@@ -285,4 +316,9 @@ public String toString() { | |||
", localRecoveryConfig=" + localRecoveryConfig + | |||
'}'; | |||
} | |||
|
|||
@VisibleForTesting | |||
void setRetrieveWithDiscard(@Nonnull boolean retrieveWithDiscard) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove @Nonnull
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed.
|
||
if (retrieveWithDiscard) { | ||
// Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others | ||
final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can de-duplicate code by extracting a method with the code from confirmCheckpoint(...)
, maybe called pruneCheckpoints(...)
. We can do the comparison for both use-cases as entryCheckpointId != checkpointID
and have a boolean parameter which determines if we break the iteration in the else
case or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 addressed.
1207165
to
c4c6987
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me, had a few suggestions.
* Pruning the useless checkpoints. | ||
*/ | ||
private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to add an assert that the thread holds lock
and document that this method should be called only when holding the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long checkpointID) { | |||
TaskStateSnapshot snapshot; | |||
synchronized (lock) { | |||
snapshot = storedTaskStateByCheckpointID.get(checkpointID); | |||
|
|||
if (retrieveWithDiscard) { | |||
// Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is no longer required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -90,6 +92,9 @@ | |||
@GuardedBy("lock") | |||
private boolean disposed; | |||
|
|||
/** Whether to discard the useless state when retrieve local checkpoint state. */ | |||
private boolean retrieveWithDiscard = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? Is there any case for not doing the cleanup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, this is just for passing the existing test case in TaskLocalStateStoreImplTest
...
private void checkStoredAsExpected(List<TaskStateSnapshot> history, int off, int len) throws Exception {
for (int i = off; i < len; ++i) {
TaskStateSnapshot expected = history.get(i);
Assert.assertTrue(expected == taskLocalStateStore.retrieveLocalState(i));
Mockito.verify(expected, Mockito.never()).discardState();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then there are two better options in my opinion, because the flag is pure boilerplate:
- Change the test to check what we are doing now, because that is what happens in the real use-case.
- Maybe even better: split the method
retrieveLocalState
further: one method for pruning, one package-private method that does all the pure retrieval, logging, andnull
transformation. In the oldretrieveLocalState
, do the cleanup first, then the pure retrieval/logging. Call the package private method in the test to avoid the cleanup.
Maybe the test should then also just do both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed and I prefer the second option.
Map.Entry<Long, TaskStateSnapshot> snapshotEntry = entryIterator.next(); | ||
long entryCheckpointId = snapshotEntry.getKey(); | ||
|
||
if (entryCheckpointId != checkpointID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a second though, while I think this code is currently correct, the case with breaking looks a bit dangerous. Potentially, if the checkpoint id is not there, this would not stop and prune ongoing checkpoints. I wonder if we should make the if
a bit more complex, but safer (checking that the breaking case never exceeds the checkpoint id). What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you that the breaking case looks a bit dangerous ... I think maybe we could pass a Predicate
for the if
and let the caller side pass the Predicate
into this function. This could make it cleaner from the caller side and don't need to mass the logic into the if
to make it complex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is fine, from my point of view that is just one way of making the if
more complex.
@StefanRRichter I have addressed your suggestions, except the one that to make the |
/** | ||
* Pruning the useless checkpoints, it should be called only when holding the {@link #lock}. | ||
*/ | ||
private void pruneCheckpoints(Predicate<Long> pruningChecker, boolean breakOnceCheckerFalse) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could either use LongPredicate
instead of Predicate<Long>
or not convert snapshotEntry.getKey()
to a primitive long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I prefer to use LongPredicate
, addressing ...
Thanks, I will merge this once the final points are addressed :-) |
Ok, I took another look at the complete picture and from the test got the feeling that retrieval and pruning should be two separated concerns and that not only should we have two internal methods, but maybe also expose them as different methods. For the sake to keep this short, I made a proposal in this branch, it is the last commit: https://github.com/StefanRRichter/flink/tree/improve_resource_release_for_local_recovery If you like the change, I would squash it and commit under your name, because you did all of the important parts. What do you think? |
@StefanRRichter Thanks, I like it! It looks very good. |
What is the purpose of the change
This PR fixes FLINK-8777. When recovery from failed,
TaskLocalStateStoreImpl.retrieveLocalState()
will be invoked, we can release all entry fromstoredTaskStateByCheckpointID
that does not satisfyentry.checkpointID == checkpointID
, this can prevent the resource leak when job loop inlocal checkpoint completed => failed => local checkpoint completed => failed ...
.Brief change log
Verifying this change
This changes can be verified by the exists tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation