-
Notifications
You must be signed in to change notification settings - Fork 1
[FLINK-11813] Introduces basic JobManagerRunner implementation #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
c91336e
to
0dbdeff
Compare
private boolean delete(final File f) throws IOException { | ||
if (!f.exists()) { | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a failing test case for this? Ideally one that covers all filesystems (I'm not sure whether we have such abstract test in place)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I still have to add that 👍
} catch (IOException e) { | ||
log.warn("Could not properly mark job {} result as clean.", jobId, e); | ||
} | ||
private void cleanUpJobResult(JobID jobId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commitJobResult
?
|
||
private void initializeCompletedCheckpointStore() { | ||
try { | ||
this.completedCheckpointStore = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup executor uses a fixed thread pool [1], so we're not guaranteed to always have a same thread here.
public CompletableFuture<Acknowledge> cancel(Time timeout) { | ||
Preconditions.checkState( | ||
resultFuture != null, "The CheckpointJobDataCleanupRunner was not started, yet."); | ||
if (resultFuture.cancel(true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the use-case for cancelling here? This will fail the future, but wouldn't the cleanup still keep executing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I have to look into it more closely.
return new ExecutionGraphInfo( | ||
ArchivedExecutionGraph.createSparseArchivedExecutionGraph( | ||
jobResult.getJobId(), | ||
"we might want to move the job name into the job result", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we also need this for the web ui?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that was my understanding. we need to provide some dummy response for now
@Override | ||
public void localCleanup(JobID jobId) throws IOException { | ||
checkNotNull(jobId); | ||
deleteLocalData(jobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also obtain a RW lock here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the pointer, I will verify it.
deleteLocalData(jobId); | ||
} | ||
|
||
private void deleteLocalData(JobID jobId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need an extra method? can the globalCleanup simply call the localCleanup instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do that. I just thought that it makes the reading of the code easier...
|
||
@Override | ||
public void releaseJobGraph(JobID jobId) {} | ||
public void localCleanup(JobID jobId) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems unnecessary
try { | ||
return createCompletedCheckpointStore( | ||
configuration, userCodeLoader, checkpointRecoveryFactory, log, jobId); | ||
return checkpointRecoveryFactory.createRecoveredCompletedCheckpointStore( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a big fan of moving this into the CheckpointRecoveryFactory interface. What is the intuition / benefit behind it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to instantiate the CompletedCheckpointStore
in different locations (i.e. in SchedulerBase
and in CheckpointJobDataCleanupRunner
). I didn't want to duplicate the logic for accessing the configuration to get the number of retained checkpoints.
But instead, we could also move the static method from SchedulerUtils
into some other utils class
* | ||
* @param jobId specifying the job to release the locks for | ||
* @throws Exception if the locks cannot be released | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we intentionally keep the old javadoc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I thought it would be more specific to the actual class.
…CleanableResource and GloballyCleanableResource
… GloballyCleanableResource
…leResource and GloballyCleanableResource
…esource and GloballyCleanableResource
…overyFactory.createRecoveredCompletedCheckpointStore
…b into more generic ArchivedExecutionGraph.createSparseArchivedExecutionGraph
… JobManager configuration into the interface
59ac153
to
ba31513
Compare
…lobStore interface Previously, FileSystemBlobStore's delete methods didn't return true if the corresponding file didn't exist and, therefore, was not deleted. BlobStore.delete and BlobStore.deleteAll have a different contract which states that, in that case, the method should return true. The internal FileSystemBlobStore.delete method got updated accordingly. Additionally, a test class covering the FileSystemBlobStore was added
05155b9
to
c513b0d
Compare
Closed in favor of #4 |
What is the purpose of the change
Two goals are covered by this PR:
GlobalCleanupStage
) and providing alocalCleanup
method in theDispatcher
with now retry mechanism involved, yet!JobManagerRunner
class that only instantiates the checkpoint-related components and implements close logic for them. No retry mechanism is applied, yet.Follow-ups:
RetryStategy
needs to be integratedJobManagerRunner
(i.e.JobMasterServiceLeadershipRunner
andCheckpointJobDataCleanupRunner
) have to be consolidated. General logic (the leader election) should be moved into an abstract classExecutionGraphInfoStore
in theCheckpointJobDataCleanupRunner
as a fallback.Brief change log
See individual commits and their messages for descriptions.
Verifying this change
[TODO] This PR contains only a prototype. Additional efforts (like implementing tests, adding documentation) need to be applied.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation