Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a7d2cee
[FLINK-11813] Adds generic interfaces for cleaning up Job-related data
XComp Dec 15, 2021
70e2394
[FLINK-11813] Refactors JobGraphWriter interface to implement Locally…
XComp Dec 15, 2021
0ecaba8
[FLINK-11813] Makes BlobServer implement LocallyCleanableResource and…
XComp Dec 15, 2021
4b7f8ef
[FLINK-11813] Makes HighAvailabilityServices implement LocallyCleanab…
XComp Dec 15, 2021
d2fe489
[FLINK-11813] Makes JobManagerMetricGroup implement LocallyCleanableR…
XComp Dec 15, 2021
34d754f
[FLINK-11813] Removes unused classloader parameter from CheckpointRec…
XComp Nov 25, 2021
7fb38c0
[hotfix] Refactors nested if statements
XComp Dec 8, 2021
e1a02d7
[hotfix] Minor fixes in ApplicationStatus
XComp Dec 9, 2021
fc835b1
[FLINK-11813] Renames ArchivedExecutionGraph.createFromInitializingJo…
XComp Dec 9, 2021
704e010
[FLINK-11813] Adds method to make the TestingJobManagerRunner fail du…
XComp Dec 15, 2021
78e6199
[FLINK-11813] Adds JobManagerRunnerRegistry
XComp Dec 15, 2021
cb1ca1e
[FLINK-11813] Integrates JobManagerRunnerRegistry into Dispatcher
XComp Dec 15, 2021
288bda5
[FLINK-11813] Adds ResourceCleaner interface and default implementation
XComp Dec 15, 2021
a76cdda
[FLINK-11813] Refactors Dispatcher constructor signature
XComp Dec 15, 2021
19c2d60
[FLINK-11813] Moves JobManagerRunnerRegistry instantiation into separ…
XComp Dec 15, 2021
b8c34b3
[FLINK-11813] Integrates ResourceCleaner into Dispatcher
XComp Dec 15, 2021
e19ffcb
[FLINK-11813] Moves CheckpointRecoveryFactory initialization based on…
XComp Nov 26, 2021
6f7a018
[FLINK-11813] Adds CheckpointResourcesCleanupRunner
XComp Dec 9, 2021
fcb20a8
[FLINK-11813] Refactors createJobManagerRunner to be an initializing …
XComp Dec 15, 2021
ba5b891
[FLINK-11813] Integrates CheckpointResourcesCleanupRunner into Dispat…
XComp Dec 15, 2021
09bc067
[FLINK-11813] Simplifies code
XComp Dec 13, 2021
c513b0d
[hotfix] Makes FileSystemBlobStore.(delete|deleteAll) comply to the B…
XComp Dec 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ public KubernetesCheckpointRecoveryFactory(

@Override
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobID, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception {
JobID jobID, int maxNumberOfCheckpointsToRetain) throws Exception {

final String configMapName = getConfigMapNameFunction.apply(jobID);
return KubernetesUtils.createCompletedCheckpointStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource;
import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.ShutdownHookUtil;

Expand Down Expand Up @@ -71,7 +74,12 @@
* the directory structure to store the BLOBs or temporarily cache them.
*/
public class BlobServer extends Thread
implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {
implements BlobService,
BlobWriter,
PermanentBlobService,
TransientBlobService,
LocallyCleanableResource,
GloballyCleanableResource {

/** The log object used for debugging. */
private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
Expand Down Expand Up @@ -824,46 +832,58 @@ public boolean deletePermanent(JobID jobId, PermanentBlobKey key) {
}

/**
* Removes all BLOBs from local and HA store belonging to the given job ID.
* Deletes locally stored artifacts for the job represented by the given {@link JobID}. This
* doesn't touch the job's entry in the {@link BlobStore} to enable recovering.
*
* @param jobId ID of the job this blob belongs to
* @param cleanupBlobStoreFiles True if the corresponding blob store files shall be cleaned up
* as well. Otherwise false.
* @return <tt>true</tt> if the job directory is successfully deleted or non-existing;
* <tt>false</tt> otherwise
* @param jobId The {@code JobID} of the job that is subject to cleanup.
* @throws IOException if the cleanup failed.
*/
public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) {
@Override
public void localCleanup(JobID jobId) throws IOException {
checkNotNull(jobId);

final File jobDir =
new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId));

FileUtils.deleteDirectory(jobDir);

// NOTE on why blobExpiryTimes are not cleaned up:
// Instead of going through blobExpiryTimes, keep lingering entries - they
// will be cleaned up by the timer task which tolerates non-existing files
// If inserted again with the same IDs (via put()), the TTL will be updated
// again.
}

/**
* Removes all BLOBs from local and HA store belonging to the given {@link JobID}.
*
* @param jobId ID of the job this blob belongs to
* @throws Exception if the cleanup fails.
*/
@Override
public void globalCleanup(JobID jobId) throws Exception {
checkNotNull(jobId);

readWriteLock.writeLock().lock();

try {
// delete locally
boolean deletedLocally = false;
try {
FileUtils.deleteDirectory(jobDir);

// NOTE on why blobExpiryTimes are not cleaned up:
// Instead of going through blobExpiryTimes, keep lingering entries - they
// will be cleaned up by the timer task which tolerates non-existing files
// If inserted again with the same IDs (via put()), the TTL will be updated
// again.
Exception exception = null;

deletedLocally = true;
try {
localCleanup(jobId);
} catch (IOException e) {
LOG.warn(
"Failed to locally delete BLOB storage directory at "
+ jobDir.getAbsolutePath(),
e);
exception = e;
}

// delete in HA blob store files
final boolean deletedHA = !cleanupBlobStoreFiles || blobStore.deleteAll(jobId);
if (!blobStore.deleteAll(jobId)) {
exception =
ExceptionUtils.firstOrSuppressed(
new FlinkException(
"Error while cleaning up the BlobStore for job " + jobId),
exception);
}

return deletedLocally && deletedHA;
ExceptionUtils.tryRethrowException(exception);
} finally {
readWriteLock.writeLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,13 @@ private boolean delete(String blobPath) {

Path path = new Path(blobPath);

boolean result = fileSystem.delete(path, true);
boolean result = true;
if (fileSystem.exists(path)) {
result = fileSystem.delete(path, true);
} else {
LOG.debug(
"The given path {} is not present anymore. No deletion is required.", path);
}

// send a call to delete the directory containing the file. This will
// fail (and be ignored) when some files still exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.flink.runtime.checkpoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;

import org.slf4j.Logger;

/** A factory for per Job checkpoint recovery components. */
public interface CheckpointRecoveryFactory {
Expand All @@ -30,12 +34,43 @@ public interface CheckpointRecoveryFactory {
*
* @param jobId Job ID to recover checkpoints for
* @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to retain
* @param userClassLoader User code class loader of the job
* @return {@link CompletedCheckpointStore} instance for the job
*/
CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception;
JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception;

/**
* Instantiates the {@link CompletedCheckpointStore} based on the passed {@code Configuration}.
*
* @param jobId The {@code JobID} for which the {@code CompletedCheckpointStore} shall be
* created.
* @param config The {@code Configuration} that shall be used (see {@link
* CheckpointingOptions#MAX_RETAINED_CHECKPOINTS}.
* @param logger The logger that shall be used internally.
* @return The {@code CompletedCheckpointStore} instance for the given {@code Job}.
* @throws Exception if an error occurs while instantiating the {@code
* CompletedCheckpointStore}.
*/
default CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId, Configuration config, Logger logger) throws Exception {
int maxNumberOfCheckpointsToRetain =
config.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);

if (maxNumberOfCheckpointsToRetain <= 0) {
// warning and use 1 as the default value if the setting in
// state.checkpoints.max-retained-checkpoints is not greater than 0.
logger.warn(
"The setting for '{} : {}' is invalid. Using default value of {}",
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
maxNumberOfCheckpointsToRetain,
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());

maxNumberOfCheckpointsToRetain =
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
}

return this.createRecoveredCompletedCheckpointStore(jobId, maxNumberOfCheckpointsToRetain);
}

/**
* Creates a {@link CheckpointIDCounter} instance for a job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public PerJobCheckpointRecoveryFactory(

@Override
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) {
JobID jobId, int maxNumberOfCheckpointsToRetain) {
return store.compute(
jobId,
(key, previous) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa

@Override
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception {
JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception {

return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public ZooKeeperCheckpointRecoveryFactory(

@Override
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception {
JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception {

return ZooKeeperUtils.createCompletedCheckpoints(
ZooKeeperUtils.useNamespaceAndEnsurePath(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@
/** The status of an application. */
public enum ApplicationStatus {

/** Application finished successfully */
/** Application finished successfully. */
SUCCEEDED(0),

/** Application encountered an unrecoverable failure or error */
/** Application encountered an unrecoverable failure or error. */
FAILED(1443),

/** Application was canceled or killed on request */
/** Application was canceled or killed on request. */
CANCELED(0),

/** Application status is not known */
/** Application status is not known. */
UNKNOWN(1445);

// ------------------------------------------------------------------------
Expand All @@ -50,15 +50,15 @@ public enum ApplicationStatus {
JOB_STATUS_APPLICATION_STATUS_BI_MAP.put(JobStatus.FINISHED, ApplicationStatus.SUCCEEDED);
}

/** The associated process exit code */
/** The associated process exit code. */
private final int processExitCode;

private ApplicationStatus(int exitCode) {
ApplicationStatus(int exitCode) {
this.processExitCode = exitCode;
}

/**
* Gets the process exit code associated with this status
* Gets the process exit code associated with this status.
*
* @return The associated process exit code.
*/
Expand Down
Loading