-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-9575]: Remove job-related BLOBS only if the job was removed suce… #6322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
} | ||
|
||
jobManagerMetricGroup.removeJob(jobID) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line can also be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @Wosin. I think we only need to make the blobServer.cleanupJob
call dependent on the success of the SubmittedJobGraphStore#removeJobGraph
call.
Furthermore, we should also do the same in the Dispatcher.java:577
.
It would be great to add a test for the cleanup behaviour in the Dispatcher
.
case Some(future) => future.onComplete{ | ||
case scala.util.Success(_) => { | ||
libraryCacheManager.unregisterJob(jobID) | ||
blobServer.cleanupJob(jobID, removeJobFromStateBackend) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we move these this line in the future where we remove the job from the SubmittedJobGraphStore
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically we can, but this changes the return type of the future as cleanupJob
does indeed return something.
case scala.util.Success(_) => { | ||
libraryCacheManager.unregisterJob(jobID) | ||
blobServer.cleanupJob(jobID, removeJobFromStateBackend) | ||
jobManagerMetricGroup.removeJob(jobID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could always execute this call independent of whether the removal from the SubmittedJobGraphStore
was successful or not.
futureOption match { | ||
case Some(future) => future.onComplete{ | ||
case scala.util.Success(_) => { | ||
libraryCacheManager.unregisterJob(jobID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call should also be called if the removal of the job from the SubmittedJobGraphStore
failed because it does not remove any HA files.
Hey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution @Wosin. LGTM. Merging this PR.
…essfully This closes apache#6322.
Wait, I have found an issue with my code. I will update the PR accordingly. |
No worries @Wosin. I've already fixed the problem. You can see it here: tillrohrmann@8b3a849 |
Ok! Thanks. |
…essfully This closes apache#6322.
What is the purpose of the change
Currently flink removes all blobs connected with the job, no matter if the job itself was removed successfully. This is not the desired behavior.
Brief change log
Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation