Skip to content

Conversation

XComp
Copy link
Owner

@XComp XComp commented Dec 2, 2021

What is the purpose of the change

This PR introduces the cleanup of the dirty job results in the Dispatcher. For now, only the ioExecutor is used to trigger the cleanup once.

Brief change log

See the individual commits messages for further details on each change.

Verifying this change

TODO: No extensive tests added, yet.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? TODO (not applicable / docs / JavaDocs / not documented)

XComp added 22 commits December 1, 2021 22:16
… PartialDispatcherServicesWithJobPersistenceComponents
…overyFactory.createRecoveredCompletedCheckpointStore
… JobManager configuration into the interface
@XComp XComp force-pushed the FLINK-11813-cleanup branch from 5ac5028 to 30ac5c1 Compare December 2, 2021 14:03
@XComp XComp force-pushed the FLINK-11813-cleanup branch from 30ac5c1 to d8b8782 Compare December 3, 2021 08:03
return false;
}

private void cleanupDirtyJobs() {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth moving all the Dispatcher's cleanup logic into its own class, e.g. DispatcherCleanup.

@XComp
Copy link
Owner Author

XComp commented Dec 3, 2021

I realized that it's not smart to use the same CheckpointsCleaner instance everywhere. It implements AutoClosable which we should utilize. Hence, providing a CheckpointsCleanerFactory in JobManagerSharedServices would be the better approach.

}));

cleanupTaskResults.add(
CompletableFuture.supplyAsync(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shoud never use async future operations without explicitly providing an executor. This will take the one from a common pool, which could result in weird memory leaks (eg. with thread locals) and other unexpected behaviors.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, that should operate utilizing the ioExecutor

return blobServer.cleanupJob(jobId, jobGraphRemoved);
}

private boolean cleanupHighAvailabilityServices(JobID jobId) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these cleanup methods looks alike. Would it be possible to extract it behind a common interfaces and simplify this? (eg. having something along the lines of List<CleanupStage> ...)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having an interface was my initial approach. I backed off from it because I realized that FutureUtils's retry mechanism only relies on passing in some callback. The interface didn't bring any value - so I left it out for now.

try {
jobResultStore.markResultAsClean(jobId);
} catch (IOException e) {
log.warn("Could not properly mark job {} result as clean.", jobId, e);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the consequences of ignoring the failure here?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of failure, the job should be picked up again for cleanup.

}

startRecoveredJobs();
cleanupDirtyJobs();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about having a separate code path for already terminated jobs as the dispatcher is already fairly complex.

What you think about unifying these and having a custom JobManagerRunner implementation that only performs checkpoint cleanup?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, that's a good point. It should be doable - it just didn't cross my mind. The JobManagerRunner could be in charge of handling the job's lifecycle entirely, i.e. also including the cleanup. I will work on it

Copy link
Owner Author

@XComp XComp Dec 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed the JobManagerRunner approach with Chesnay: We addressed the issue from a user's point of view: We somehow want to visualize the running jobs and the jobs that are completed but still in cleanup phase. Having everything in Dispatcher.runningJobs makes sense if we want to the cleanup phase still being part of the general job lifecycle. Dirty jobs that need to be cleaned up would be listed as not-finally-completed jobs until the cleanup is successful. The JobManagerRunner could be responsible for maintaining this lifecycle (even after the JobMaster finished). That would mean that we would move the cleanup logic into the JobManagerRunner and have it being used by both, the JobMasterServiceLeadershipRunner and the new JobManagerRunner implementation that is in charge of cleaning up the checkpoint-related artefacts.

The flaw of this approach is, that running jobs are meant to have a ExecutionGraphInfo which is provided by the JobMaster. For the cleanup, we don't have this information. It is accessible in the ExecutionGraphInfoStore in the dispatcher, though. We could use that store in the new implementation. But the ExecutionGraphInfoStore is only persisted locally. Hence, the data would be gone in case of failover.

One workaround for that would be moving the ExecutionGraphInfo into the JobResultStore as additional metadata. Essentially, we would merge ExecutionGraphInfoStore and the JobResultStore into a single store. The issue with that approach is, that the ExecutionGraphInfoStore has no requirements to be backwards-compatible right now. Persisting the ExecutionGraph in the JobResultStore would change that which is not what we want, I guess. Having a JobResultStore that stores only a limited amount of metadata per job makes it easier to maintain backwards-compatibility.

Another approach is treating the two phases separately: Dispatcher.runningJobs is actually only listing jobs with a JobMaster providing the ExecutionGraph. The dirty jobs should be handled separately through a separate member (and accessed through a dedicated REST API endpoint as well) like it's currently implemented in this prototyping PR.

We can move the cleanup logic into its own class to remove complexity from the Dispatcher.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the user then informed about problems? There's going to be a dedicated REST endpoint (and new section in the Flink UI) listing the jobs from the JobResultStore. Any dirty jobs can be labeled as "cleanup pending". The Dispatcher tries to clean things up. The user is informed about issues through the logs. We could also think about adding some kind of retry counter that is persisted in the JobResultStore. The Dispatcher tries to clean things up infinitely.

Copy link
Owner Author

@XComp XComp Dec 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the user handle failures? The cluster shutdown can be either blocking until all jobs are cleaned up? Or fail with a non-0 exit code in case of jobs not being able to be cleaned up. That would trigger a restart in case of HA, which would pick up the dirty jobs again for cleanup. Here, it might make sense to set some limit on the retries after a shutdown is triggered.

Copy link
Owner Author

@XComp XComp Dec 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it once more, we could actually follow your approach by extracting a new interface out of the JobManagerRunner interface which does only contain getResultFuture(), start() and getJobID(). The JobManagerRunner could extend this interface. A new implementation of the extracted interface could be used to implement the checkpoint-related cleanup. After that is done, the common cleanup other components can be triggered. We could add a method cancelCleanup() to the new interface to enable explicit cancelling of the cleanup phase. Chesnay voted against reusing JobManagerRunner.cancel() because of different semantics (cancelling the job resulting in a cancelled job vs. cancelling the job cleanup resulting in a still globally-terminated job).

That would enable us to cancel the cleanup of a single job without shutting down the cluster (through a new REST endpoint). We still have to decide, how a running job behaves for which the cancellation of the cleanup is called.

}

@VisibleForTesting
static CompletedCheckpointStore createCompletedCheckpointStore(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the intuition behind moving this into CheckpointRecoveryFactory?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intuition was that I wanted to reuse the CheckpointsCleaner instance in the cleanup of the CompletedCheckpointsStore. But that's not possible as is because the CheckpointsCleaner is closed on the JobMaster. Instead, we should use a CheckpointsCleanerFactory, instead, that is passed into the different components that need a CheckpointsCleaner instance.

@XComp XComp force-pushed the FLINK-11813-move-jrs-init branch 2 times, most recently from e6037a0 to 59ac153 Compare December 9, 2021 17:07
@XComp
Copy link
Owner Author

XComp commented Dec 10, 2021

Closing this draft in favor of PR Draft #3 which implements the JobManagerRunner interface

@XComp XComp closed this Dec 10, 2021
XComp added a commit that referenced this pull request Jan 21, 2022
XComp added a commit that referenced this pull request Jan 21, 2022
…dirty JobResults

We don't want to retrigger jobs that finished already based on the JobResultStore.
XComp added a commit that referenced this pull request Jan 21, 2022
XComp added a commit that referenced this pull request Jan 21, 2022
…dirty JobResults

We don't want to retrigger jobs that finished already based on the JobResultStore.
XComp added a commit that referenced this pull request Jan 22, 2022
…erProcess

This change covers now all cases for recovery.
XComp added a commit that referenced this pull request Jan 23, 2022
XComp added a commit that referenced this pull request Feb 3, 2022
…o be used by both the local and global cleanup
XComp added a commit that referenced this pull request Feb 3, 2022
XComp added a commit that referenced this pull request Feb 3, 2022
XComp added a commit that referenced this pull request Feb 3, 2022
XComp added a commit that referenced this pull request Feb 3, 2022
XComp added a commit that referenced this pull request Oct 21, 2022
XComp pushed a commit that referenced this pull request Nov 2, 2023
XComp added a commit that referenced this pull request Mar 22, 2024
XComp added a commit that referenced this pull request Apr 2, 2024
XComp added a commit that referenced this pull request Apr 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants