diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java index f8a661f9a085f..2b39904cea375 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java @@ -71,8 +71,7 @@ public KubernetesCheckpointRecoveryFactory( @Override public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( - JobID jobID, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) - throws Exception { + JobID jobID, int maxNumberOfCheckpointsToRetain) throws Exception { final String configMapName = getConfigMapNameFunction.apply(jobID); return KubernetesUtils.createCompletedCheckpointStore( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 6989a52dd44f9..46bf4cf411239 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -25,9 +25,12 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource; +import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.NetUtils; import org.apache.flink.util.ShutdownHookUtil; @@ -71,7 +74,12 @@ * the directory structure to store the BLOBs or temporarily cache them. */ public class BlobServer extends Thread - implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService { + implements BlobService, + BlobWriter, + PermanentBlobService, + TransientBlobService, + LocallyCleanableResource, + GloballyCleanableResource { /** The log object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class); @@ -824,46 +832,58 @@ public boolean deletePermanent(JobID jobId, PermanentBlobKey key) { } /** - * Removes all BLOBs from local and HA store belonging to the given job ID. + * Deletes locally stored artifacts for the job represented by the given {@link JobID}. This + * doesn't touch the job's entry in the {@link BlobStore} to enable recovering. * - * @param jobId ID of the job this blob belongs to - * @param cleanupBlobStoreFiles True if the corresponding blob store files shall be cleaned up - * as well. Otherwise false. - * @return true if the job directory is successfully deleted or non-existing; - * false otherwise + * @param jobId The {@code JobID} of the job that is subject to cleanup. + * @throws IOException if the cleanup failed. */ - public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) { + @Override + public void localCleanup(JobID jobId) throws IOException { checkNotNull(jobId); final File jobDir = new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId)); + FileUtils.deleteDirectory(jobDir); + + // NOTE on why blobExpiryTimes are not cleaned up: + // Instead of going through blobExpiryTimes, keep lingering entries - they + // will be cleaned up by the timer task which tolerates non-existing files + // If inserted again with the same IDs (via put()), the TTL will be updated + // again. + } + + /** + * Removes all BLOBs from local and HA store belonging to the given {@link JobID}. + * + * @param jobId ID of the job this blob belongs to + * @throws Exception if the cleanup fails. + */ + @Override + public void globalCleanup(JobID jobId) throws Exception { + checkNotNull(jobId); + readWriteLock.writeLock().lock(); try { - // delete locally - boolean deletedLocally = false; - try { - FileUtils.deleteDirectory(jobDir); - - // NOTE on why blobExpiryTimes are not cleaned up: - // Instead of going through blobExpiryTimes, keep lingering entries - they - // will be cleaned up by the timer task which tolerates non-existing files - // If inserted again with the same IDs (via put()), the TTL will be updated - // again. + Exception exception = null; - deletedLocally = true; + try { + localCleanup(jobId); } catch (IOException e) { - LOG.warn( - "Failed to locally delete BLOB storage directory at " - + jobDir.getAbsolutePath(), - e); + exception = e; } - // delete in HA blob store files - final boolean deletedHA = !cleanupBlobStoreFiles || blobStore.deleteAll(jobId); + if (!blobStore.deleteAll(jobId)) { + exception = + ExceptionUtils.firstOrSuppressed( + new FlinkException( + "Error while cleaning up the BlobStore for job " + jobId), + exception); + } - return deletedLocally && deletedHA; + ExceptionUtils.tryRethrowException(exception); } finally { readWriteLock.writeLock().unlock(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java index a5dfff8e2c729..442c1715344db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java @@ -156,7 +156,13 @@ private boolean delete(String blobPath) { Path path = new Path(blobPath); - boolean result = fileSystem.delete(path, true); + boolean result = true; + if (fileSystem.exists(path)) { + result = fileSystem.delete(path, true); + } else { + LOG.debug( + "The given path {} is not present anymore. No deletion is required.", path); + } // send a call to delete the directory containing the file. This will // fail (and be ignored) when some files still exist. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java index 88c2a5689d604..81f54b1687965 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java @@ -19,6 +19,10 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; + +import org.slf4j.Logger; /** A factory for per Job checkpoint recovery components. */ public interface CheckpointRecoveryFactory { @@ -30,12 +34,43 @@ public interface CheckpointRecoveryFactory { * * @param jobId Job ID to recover checkpoints for * @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to retain - * @param userClassLoader User code class loader of the job * @return {@link CompletedCheckpointStore} instance for the job */ CompletedCheckpointStore createRecoveredCompletedCheckpointStore( - JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) - throws Exception; + JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception; + + /** + * Instantiates the {@link CompletedCheckpointStore} based on the passed {@code Configuration}. + * + * @param jobId The {@code JobID} for which the {@code CompletedCheckpointStore} shall be + * created. + * @param config The {@code Configuration} that shall be used (see {@link + * CheckpointingOptions#MAX_RETAINED_CHECKPOINTS}. + * @param logger The logger that shall be used internally. + * @return The {@code CompletedCheckpointStore} instance for the given {@code Job}. + * @throws Exception if an error occurs while instantiating the {@code + * CompletedCheckpointStore}. + */ + default CompletedCheckpointStore createRecoveredCompletedCheckpointStore( + JobID jobId, Configuration config, Logger logger) throws Exception { + int maxNumberOfCheckpointsToRetain = + config.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS); + + if (maxNumberOfCheckpointsToRetain <= 0) { + // warning and use 1 as the default value if the setting in + // state.checkpoints.max-retained-checkpoints is not greater than 0. + logger.warn( + "The setting for '{} : {}' is invalid. Using default value of {}", + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), + maxNumberOfCheckpointsToRetain, + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()); + + maxNumberOfCheckpointsToRetain = + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue(); + } + + return this.createRecoveredCompletedCheckpointStore(jobId, maxNumberOfCheckpointsToRetain); + } /** * Creates a {@link CheckpointIDCounter} instance for a job. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java index 5b91bb8974ae7..02b3e95336f6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java @@ -68,7 +68,7 @@ public PerJobCheckpointRecoveryFactory( @Override public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( - JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) { + JobID jobId, int maxNumberOfCheckpointsToRetain) { return store.compute( jobId, (key, previous) -> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java index c323256ca3d86..8ad1849f03bd5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java @@ -26,8 +26,7 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa @Override public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( - JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) - throws Exception { + JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception { return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java index 6d66594739f1d..e2fbaa17b5595 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java @@ -47,8 +47,7 @@ public ZooKeeperCheckpointRecoveryFactory( @Override public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( - JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) - throws Exception { + JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception { return ZooKeeperUtils.createCompletedCheckpoints( ZooKeeperUtils.useNamespaceAndEnsurePath( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java index b38b8ceaffa77..eef90969046d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java @@ -26,16 +26,16 @@ /** The status of an application. */ public enum ApplicationStatus { - /** Application finished successfully */ + /** Application finished successfully. */ SUCCEEDED(0), - /** Application encountered an unrecoverable failure or error */ + /** Application encountered an unrecoverable failure or error. */ FAILED(1443), - /** Application was canceled or killed on request */ + /** Application was canceled or killed on request. */ CANCELED(0), - /** Application status is not known */ + /** Application status is not known. */ UNKNOWN(1445); // ------------------------------------------------------------------------ @@ -50,15 +50,15 @@ public enum ApplicationStatus { JOB_STATUS_APPLICATION_STATUS_BI_MAP.put(JobStatus.FINISHED, ApplicationStatus.SUCCEEDED); } - /** The associated process exit code */ + /** The associated process exit code. */ private final int processExitCode; - private ApplicationStatus(int exitCode) { + ApplicationStatus(int exitCode) { this.processExitCode = exitCode; } /** - * Gets the process exit code associated with this status + * Gets the process exit code associated with this status. * * @return The associated process exit code. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 33b8c530b1208..4a648cac1d9e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -27,10 +27,14 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.Checkpoints; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.client.DuplicateJobSubmissionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory; +import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleaner; +import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory; import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -40,6 +44,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.jobmaster.CheckpointResourcesCleanupRunner; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; @@ -66,6 +71,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcServiceUtils; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -87,6 +93,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -118,7 +125,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint runningJobs; + private final JobManagerRunnerRegistry jobManagerRunnerRegistry; private final Collection recoveredJobs; @@ -146,6 +153,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint recoveredJobs, + Collection globallyTerminatedJobs, + DispatcherBootstrapFactory dispatcherBootstrapFactory, + DispatcherServices dispatcherServices, + JobManagerRunnerRegistry jobManagerRunnerRegistry) + throws Exception { + this( + rpcService, + fencingToken, + recoveredJobs, + globallyTerminatedJobs, + dispatcherServices.getConfiguration(), + dispatcherServices.getHighAvailabilityServices(), + dispatcherServices.getResourceManagerGatewayRetriever(), + dispatcherServices.getHeartbeatServices(), + dispatcherServices.getBlobServer(), + dispatcherServices.getFatalErrorHandler(), + dispatcherServices.getJobGraphWriter(), + dispatcherServices.getJobResultStore(), + dispatcherServices.getJobManagerMetricGroup(), + dispatcherServices.getMetricQueryServiceAddress(), + dispatcherServices.getIoExecutor(), + dispatcherServices.getHistoryServerArchivist(), + dispatcherServices.getArchivedExecutionGraphStore(), + dispatcherServices.getJobManagerRunnerFactory(), + dispatcherBootstrapFactory, + dispatcherServices.getOperationCaches(), + jobManagerRunnerRegistry, + new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, dispatcherServices)); + } + + private Dispatcher( + RpcService rpcService, + DispatcherId fencingToken, + Collection recoveredJobs, + Collection globallyTerminatedJobs, + Configuration configuration, + HighAvailabilityServices highAvailabilityServices, + GatewayRetriever resourceManagerGatewayRetriever, + HeartbeatServices heartbeatServices, + BlobServer blobServer, + FatalErrorHandler fatalErrorHandler, + JobGraphWriter jobGraphWriter, + JobResultStore jobResultStore, + JobManagerMetricGroup jobManagerMetricGroup, + String metricServiceQueryAddress, + Executor ioExecutor, + HistoryServerArchivist historyServerArchivist, + ExecutionGraphInfoStore executionGraphInfoStore, + JobManagerRunnerFactory jobManagerRunnerFactory, + DispatcherBootstrapFactory dispatcherBootstrapFactory, + DispatcherOperationCaches dispatcherOperationCaches, + JobManagerRunnerRegistry jobManagerRunnerRegistry, + ResourceCleanerFactory resourceCleanerFactory) + throws Exception { super(rpcService, RpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken); - checkNotNull(dispatcherServices); - - this.configuration = dispatcherServices.getConfiguration(); - this.highAvailabilityServices = dispatcherServices.getHighAvailabilityServices(); - this.resourceManagerGatewayRetriever = - dispatcherServices.getResourceManagerGatewayRetriever(); - this.heartbeatServices = dispatcherServices.getHeartbeatServices(); - this.blobServer = dispatcherServices.getBlobServer(); - this.fatalErrorHandler = dispatcherServices.getFatalErrorHandler(); - this.jobGraphWriter = dispatcherServices.getJobGraphWriter(); - this.jobResultStore = dispatcherServices.getJobResultStore(); - this.jobManagerMetricGroup = dispatcherServices.getJobManagerMetricGroup(); - this.metricServiceQueryAddress = dispatcherServices.getMetricQueryServiceAddress(); - this.ioExecutor = dispatcherServices.getIoExecutor(); + + this.configuration = checkNotNull(configuration); + this.highAvailabilityServices = checkNotNull(highAvailabilityServices); + this.resourceManagerGatewayRetriever = checkNotNull(resourceManagerGatewayRetriever); + this.heartbeatServices = checkNotNull(heartbeatServices); + this.blobServer = checkNotNull(blobServer); + this.fatalErrorHandler = checkNotNull(fatalErrorHandler); + this.jobGraphWriter = checkNotNull(jobGraphWriter); + this.jobResultStore = checkNotNull(jobResultStore); + this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup); + this.metricServiceQueryAddress = checkNotNull(metricServiceQueryAddress); + this.ioExecutor = checkNotNull(ioExecutor); this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration( configuration, blobServer, fatalErrorHandler); - runningJobs = new HashMap<>(16); + this.jobManagerRunnerRegistry = checkNotNull(jobManagerRunnerRegistry); - this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist(); + this.historyServerArchivist = checkNotNull(historyServerArchivist); - this.executionGraphInfoStore = dispatcherServices.getArchivedExecutionGraphStore(); + this.executionGraphInfoStore = checkNotNull(executionGraphInfoStore); - this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory(); + this.jobManagerRunnerFactory = checkNotNull(jobManagerRunnerFactory); this.jobManagerRunnerTerminationFutures = new HashMap<>(2); @@ -200,9 +276,12 @@ public Dispatcher( this.dispatcherCachedOperationsHandler = new DispatcherCachedOperationsHandler( - dispatcherServices.getOperationCaches(), + checkNotNull(dispatcherOperationCaches), this::triggerSavepointAndGetLocation, this::stopWithSavepointAndGetLocation); + + this.localResourceCleaner = resourceCleanerFactory.createLocalResourceCleaner(); + this.globalResourceCleaner = resourceCleanerFactory.createGlobalResourceCleaner(); } // ------------------------------------------------------ @@ -230,6 +309,8 @@ public void onStart() throws Exception { } startRecoveredJobs(); + startCleanupRetries(); + this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create( getSelfGateway(DispatcherGateway.class), @@ -255,7 +336,7 @@ private void startRecoveredJobs() { private void runRecoveredJob(final JobGraph recoveredJob) { checkNotNull(recoveredJob); try { - runJob(recoveredJob, ExecutionType.RECOVERY); + initializeAndStartJobManagerRunner(recoveredJob, ExecutionType.RECOVERY); } catch (Throwable throwable) { onFatalError( new DispatcherException( @@ -265,6 +346,26 @@ private void runRecoveredJob(final JobGraph recoveredJob) { } } + private void startCleanupRetries() { + globallyTerminatedJobs.forEach(this::runCleanupRetry); + globallyTerminatedJobs.clear(); + } + + private void runCleanupRetry(final JobResult jobResult) { + checkNotNull(jobResult); + + try { + initializeAndStartCheckpointJobDataCleanupRunner(jobResult); + } catch (Throwable throwable) { + onFatalError( + new DispatcherException( + String.format( + "Could not start cleanup retry for job %s.", + jobResult.getJobId()), + throwable)); + } + } + private void handleStartDispatcherServicesException(Exception e) throws Exception { try { stopDispatcherServices(); @@ -345,7 +446,7 @@ public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) * @throws FlinkException if the job scheduling status cannot be retrieved */ private boolean isDuplicateJob(JobID jobId) throws FlinkException { - return isInGloballyTerminalState(jobId) || runningJobs.containsKey(jobId); + return isInGloballyTerminalState(jobId) || jobManagerRunnerRegistry.isRegistered(jobId); } /** @@ -394,7 +495,16 @@ private CompletableFuture internalSubmitJob(JobGraph jobGraph) { return persistAndRunFuture.handleAsync( (acknowledge, throwable) -> { if (throwable != null) { - cleanUpHighAvailabilityJobData(jobGraph.getJobID()); + globalResourceCleaner + .cleanupAsync(jobGraph.getJobID()) + .exceptionally( + t -> { + log.warn( + "Cleanup didn't succeed after job submission failed for job " + + jobGraph.getJobID(), + t); + return null; + }); ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable); final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); @@ -414,18 +524,30 @@ private CompletableFuture internalSubmitJob(JobGraph jobGraph) { private void persistAndRunJob(JobGraph jobGraph) throws Exception { jobGraphWriter.putJobGraph(jobGraph); - runJob(jobGraph, ExecutionType.SUBMISSION); + initializeAndStartJobManagerRunner(jobGraph, ExecutionType.SUBMISSION); } - private void runJob(JobGraph jobGraph, ExecutionType executionType) throws Exception { - Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID())); - long initializationTimestamp = System.currentTimeMillis(); - JobManagerRunner jobManagerRunner = - createJobManagerRunner(jobGraph, initializationTimestamp); + private void initializeAndStartJobManagerRunner(JobGraph jobGraph, ExecutionType executionType) + throws Exception { + Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(jobGraph.getJobID())); + final JobManagerRunner jobManagerRunner = initializeJobManagerRunner(jobGraph); + runJob(jobManagerRunner, executionType); + } - runningJobs.put(jobGraph.getJobID(), jobManagerRunner); + private void initializeAndStartCheckpointJobDataCleanupRunner(JobResult jobResult) + throws Exception { + Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(jobResult.getJobId())); + final JobManagerRunner checkpointJobDataCleanupRunner = + initializeCheckpointJobDataCleanupRunner(jobResult); + runJob(checkpointJobDataCleanupRunner, ExecutionType.RECOVERY); + } + + private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionType) + throws Exception { + jobManagerRunner.start(); + jobManagerRunnerRegistry.register(jobManagerRunner); - final JobID jobId = jobGraph.getJobID(); + final JobID jobId = jobManagerRunner.getJobID(); final CompletableFuture cleanupJobStateFuture = jobManagerRunner @@ -433,7 +555,7 @@ private void runJob(JobGraph jobGraph, ExecutionType executionType) throws Excep .handleAsync( (jobManagerRunnerResult, throwable) -> { Preconditions.checkState( - runningJobs.get(jobId) == jobManagerRunner, + jobManagerRunnerRegistry.get(jobId) == jobManagerRunner, "The job entry in runningJobs must be bound to the lifetime of the JobManagerRunner."); if (jobManagerRunnerResult != null) { @@ -446,9 +568,8 @@ private void runJob(JobGraph jobGraph, ExecutionType executionType) throws Excep getMainThreadExecutor()); final CompletableFuture jobTerminationFuture = - cleanupJobStateFuture - .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState)) - .thenCompose(Function.identity()); + cleanupJobStateFuture.thenCompose( + cleanupJobState -> removeJob(jobId, cleanupJobState)); FutureUtils.handleUncaughtException( jobTerminationFuture, @@ -458,28 +579,18 @@ private void runJob(JobGraph jobGraph, ExecutionType executionType) throws Excep private CleanupJobState handleJobManagerRunnerResult( JobManagerRunnerResult jobManagerRunnerResult, ExecutionType executionType) { - if (jobManagerRunnerResult.isInitializationFailure()) { - if (executionType == ExecutionType.RECOVERY) { - return jobManagerRunnerFailed( - jobManagerRunnerResult.getExecutionGraphInfo().getJobId(), - jobManagerRunnerResult.getInitializationFailure()); - } else { - return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo()); - } - } else { - return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo()); + if (jobManagerRunnerResult.isInitializationFailure() + && executionType == ExecutionType.RECOVERY) { + return jobManagerRunnerFailed( + jobManagerRunnerResult.getExecutionGraphInfo().getJobId(), + jobManagerRunnerResult.getInitializationFailure()); } + return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo()); } enum CleanupJobState { - LOCAL(false), - GLOBAL(true); - - final boolean cleanupHAData; - - CleanupJobState(boolean cleanupHAData) { - this.cleanupHAData = cleanupHAData; - } + LOCAL, + GLOBAL } private CleanupJobState jobManagerRunnerFailed(JobID jobId, Throwable throwable) { @@ -487,29 +598,37 @@ private CleanupJobState jobManagerRunnerFailed(JobID jobId, Throwable throwable) return CleanupJobState.LOCAL; } - JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) - throws Exception { + JobManagerRunner initializeJobManagerRunner(JobGraph jobGraph) throws Exception { final RpcService rpcService = getRpcService(); - JobManagerRunner runner = - jobManagerRunnerFactory.createJobManagerRunner( - jobGraph, - configuration, - rpcService, - highAvailabilityServices, - heartbeatServices, - jobManagerSharedServices, - new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), - fatalErrorHandler, - initializationTimestamp); - runner.start(); - return runner; + return jobManagerRunnerFactory.createJobManagerRunner( + jobGraph, + configuration, + rpcService, + highAvailabilityServices, + heartbeatServices, + jobManagerSharedServices, + new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), + fatalErrorHandler, + System.currentTimeMillis()); + } + + JobManagerRunner initializeCheckpointJobDataCleanupRunner(JobResult jobResult) + throws Exception { + return new CheckpointResourcesCleanupRunner( + jobResult, + highAvailabilityServices.getCheckpointRecoveryFactory(), + new CheckpointsCleaner(), + SharedStateRegistry.DEFAULT_FACTORY, + configuration, + ioExecutor, + System.currentTimeMillis()); } @Override public CompletableFuture> listJobs(Time timeout) { return CompletableFuture.completedFuture( - Collections.unmodifiableSet(new HashSet<>(runningJobs.keySet()))); + Collections.unmodifiableSet(jobManagerRunnerRegistry.getRunningJobIds())); } @Override @@ -655,7 +774,7 @@ public CompletableFuture requestExecutionGraphInfo( @Override public CompletableFuture requestJobResult(JobID jobId, Time timeout) { - JobManagerRunner job = runningJobs.get(jobId); + JobManagerRunner job = jobManagerRunnerRegistry.get(jobId); if (job == null) { final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId); @@ -802,82 +921,31 @@ private void registerJobManagerRunnerTerminationFuture( } private CompletableFuture removeJob(JobID jobId, CleanupJobState cleanupJobState) { - final JobManagerRunner job = checkNotNull(runningJobs.remove(jobId)); - return CompletableFuture.supplyAsync( - () -> cleanUpJobGraph(jobId, cleanupJobState.cleanupHAData), ioExecutor) - .thenCompose( - jobGraphRemoved -> job.closeAsync().thenApply(ignored -> jobGraphRemoved)) - .thenAcceptAsync( - jobGraphRemoved -> { - cleanUpRemainingJobData(jobId, jobGraphRemoved); - cleanUpJobResult(jobId, jobGraphRemoved); - }, - ioExecutor); - } - - /** - * Clean up job graph from {@link org.apache.flink.runtime.jobmanager.JobGraphStore}. - * - * @param jobId Reference to the job that we want to clean. - * @param cleanupHA Flag signalling whether we should remove (we're done with the job) or just - * release the job graph. - * @return True if we have removed the job graph. This means we can clean other HA-related - * services as well. - */ - private boolean cleanUpJobGraph(JobID jobId, boolean cleanupHA) { - if (cleanupHA) { - try { - jobGraphWriter.removeJobGraph(jobId); - return true; - } catch (Exception e) { - log.warn( - "Could not properly remove job {} from submitted job graph store.", - jobId, - e); - return false; - } - } - try { - jobGraphWriter.releaseJobGraph(jobId); - } catch (Exception e) { - log.warn("Could not properly release job {} from submitted job graph store.", jobId, e); - } - return false; - } - - private void cleanUpRemainingJobData(JobID jobId, boolean jobGraphRemoved) { - jobManagerMetricGroup.removeJob(jobId); - if (jobGraphRemoved) { - try { - highAvailabilityServices.cleanupJobData(jobId); - } catch (Exception e) { - log.warn( - "Could not properly clean data for job {} stored by ha services", jobId, e); - } + switch (cleanupJobState) { + case LOCAL: + return localResourceCleaner.cleanupAsync(jobId); + case GLOBAL: + return globalResourceCleaner + .cleanupAsync(jobId) + .thenRun(() -> cleanUpJobResult(jobId)); + default: + throw new IllegalStateException("Invalid cleanup state: " + cleanupJobState); } - blobServer.cleanupJob(jobId, jobGraphRemoved); } - private void cleanUpJobResult(JobID jobId, boolean jobGraphRemoved) { - if (jobGraphRemoved) { - try { - jobResultStore.markResultAsClean(jobId); - } catch (IOException e) { - log.warn("Could not properly mark job {} result as clean.", jobId, e); - } + private void cleanUpJobResult(JobID jobId) { + try { + jobResultStore.markResultAsClean(jobId); + } catch (IOException e) { + log.warn("Could not properly mark job {} result as clean.", jobId, e); } } - private void cleanUpHighAvailabilityJobData(JobID jobId) { - final boolean jobGraphRemoved = cleanUpJobGraph(jobId, true); - cleanUpRemainingJobData(jobId, jobGraphRemoved); - } - /** Terminate all currently running {@link JobManagerRunner}s. */ private void terminateRunningJobs() { log.info("Stopping all currently running jobs of dispatcher {}.", getAddress()); - final HashSet jobsToRemove = new HashSet<>(runningJobs.keySet()); + final Set jobsToRemove = jobManagerRunnerRegistry.getRunningJobIds(); for (JobID jobId : jobsToRemove) { terminateJob(jobId); @@ -885,7 +953,7 @@ private void terminateRunningJobs() { } private void terminateJob(JobID jobId) { - final JobManagerRunner jobManagerRunner = runningJobs.get(jobId); + final JobManagerRunner jobManagerRunner = jobManagerRunnerRegistry.get(jobId); if (jobManagerRunner != null) { jobManagerRunner.closeAsync(); @@ -988,7 +1056,7 @@ private void jobMasterFailed(JobID jobId, Throwable cause) { /** Ensures that the JobMasterGateway is available. */ private CompletableFuture getJobMasterGateway(JobID jobId) { - JobManagerRunner job = runningJobs.get(jobId); + JobManagerRunner job = jobManagerRunnerRegistry.get(jobId); if (job == null) { return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } @@ -1012,7 +1080,7 @@ private CompletableFuture getResourceManagerGateway() { } private Optional getJobManagerRunner(JobID jobId) { - return Optional.ofNullable(runningJobs.get(jobId)); + return Optional.ofNullable(jobManagerRunnerRegistry.get(jobId)); } private CompletableFuture runResourceManagerCommand( @@ -1034,9 +1102,9 @@ private List>> queryJobMastersForInformation( Function> queryFunction) { List>> optionalJobInformation = - new ArrayList<>(runningJobs.size()); + new ArrayList<>(jobManagerRunnerRegistry.size()); - for (JobManagerRunner job : runningJobs.values()) { + for (JobManagerRunner job : jobManagerRunnerRegistry.getJobManagerRunners()) { final CompletableFuture> queryResult = queryFunction .apply(job) @@ -1070,7 +1138,7 @@ private CompletableFuture waitForTerminatingJob( } CompletableFuture getJobTerminationFuture(JobID jobId) { - if (runningJobs.containsKey(jobId)) { + if (jobManagerRunnerRegistry.isRegistered(jobId)) { return FutureUtils.completedExceptionally( new DispatcherException( String.format("Job with job id %s is still running.", jobId))); @@ -1081,7 +1149,8 @@ CompletableFuture getJobTerminationFuture(JobID jobId) { } private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) { - jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS, () -> (long) runningJobs.size()); + jobManagerMetricGroup.gauge( + MetricNames.NUM_RUNNING_JOBS, () -> (long) jobManagerRunnerRegistry.size()); } public CompletableFuture onRemovedJobGraph(JobID jobId) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java new file mode 100644 index 0000000000000..be3c9943ac604 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource; +import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** {@code JobManagerRunner} collects running jobs represented by {@link JobManagerRunner}. */ +public class JobManagerRunnerRegistry + implements LocallyCleanableResource, GloballyCleanableResource { + + @VisibleForTesting final Map jobManagerRunners; + + public JobManagerRunnerRegistry(int initialCapacity) { + Preconditions.checkArgument(initialCapacity > 0); + jobManagerRunners = new HashMap<>(initialCapacity); + } + + public boolean isRegistered(JobID jobId) { + return jobManagerRunners.containsKey(jobId); + } + + public void register(JobManagerRunner jobManagerRunner) { + Preconditions.checkArgument( + !isRegistered(jobManagerRunner.getJobID()), + "A job with the ID %s is already registered.", + jobManagerRunner.getJobID()); + this.jobManagerRunners.put(jobManagerRunner.getJobID(), jobManagerRunner); + } + + /** + * Returns the {@link JobManagerRunner} for the given {@code JobID}. + * + * @throws NoSuchElementException if the passed {@code JobID} does not belong to a registered + * {@code JobManagerRunner}. + * @see #isRegistered(JobID) + */ + public JobManagerRunner get(JobID jobId) { + assertJobRegistered(jobId); + return this.jobManagerRunners.get(jobId); + } + + public int size() { + return this.jobManagerRunners.size(); + } + + public Set getRunningJobIds() { + return this.jobManagerRunners.keySet(); + } + + public Collection getJobManagerRunners() { + return this.jobManagerRunners.values(); + } + + @Override + public void globalCleanup(JobID jobId) throws Exception { + cleanup(jobId); + } + + @Override + public CompletableFuture globalCleanupAsync(JobID jobId, Executor unusedExecutor) { + return cleanupAsync(jobId); + } + + @Override + public void localCleanup(JobID jobId) throws Exception { + cleanup(jobId); + } + + @Override + public CompletableFuture localCleanupAsync(JobID jobId, Executor unusedExecutor) { + return cleanupAsync(jobId); + } + + private void cleanup(JobID jobId) throws Exception { + unregister(jobId).close(); + } + + private CompletableFuture cleanupAsync(JobID jobId) { + return unregister(jobId).closeAsync(); + } + + public JobManagerRunner unregister(JobID jobId) { + assertJobRegistered(jobId); + return this.jobManagerRunners.remove(jobId); + } + + private void assertJobRegistered(JobID jobId) { + if (!isRegistered(jobId)) { + throw new NoSuchElementException( + "There is no running job registered for the job ID " + jobId); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java index f2c450abf868d..b063b4bcad505 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java @@ -64,16 +64,6 @@ public void putJobGraph(JobGraph jobGraph) throws Exception { } } - @Override - public void removeJobGraph(JobID jobId) { - // ignore - } - - @Override - public void releaseJobGraph(JobID jobId) { - // ignore - } - @Override public Collection getJobIds() { return Collections.singleton(jobGraph.getJobID()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java new file mode 100644 index 0000000000000..4ef1bc1d156d9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.concurrent.FutureUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** {@code DefaultResourceCleaner} is the default implementation of {@link ResourceCleaner}. */ +public class DefaultResourceCleaner implements ResourceCleaner { + + private final Collection>> + jobRelatedCleanups = new ArrayList<>(); + private final Executor cleanupExecutor; + + DefaultResourceCleaner(Executor cleanupExecutor) { + this.cleanupExecutor = cleanupExecutor; + } + + DefaultResourceCleaner withCleanupOf( + BiFunction> cleanupCallback) { + jobRelatedCleanups.add(cleanupCallback); + return this; + } + + @Override + public CompletableFuture cleanupAsync(JobID jobId) { + return FutureUtils.completeAll( + jobRelatedCleanups.stream() + .map(c -> c.apply(jobId, cleanupExecutor)) + .collect(Collectors.toList())); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java new file mode 100644 index 0000000000000..75f19bdfce6d1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.dispatcher.DispatcherServices; +import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.util.Preconditions; + +import java.util.Arrays; +import java.util.concurrent.Executor; + +/** + * {@code DispatcherResourceCleanerFactory} instantiates {@link ResourceCleaner} instances that + * clean cleanable resources from the {@link org.apache.flink.runtime.dispatcher.Dispatcher}. + */ +public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory { + + private final Executor cleanupExecutor; + private final JobManagerRunnerRegistry jobManagerRunnerRegistry; + private final JobGraphWriter jobGraphWriter; + private final BlobServer blobServer; + private final HighAvailabilityServices highAvailabilityServices; + private final JobManagerMetricGroup jobManagerMetricGroup; + + public DispatcherResourceCleanerFactory( + JobManagerRunnerRegistry jobManagerRunnerRegistry, + DispatcherServices dispatcherServices) { + this( + dispatcherServices.getIoExecutor(), + jobManagerRunnerRegistry, + dispatcherServices.getJobGraphWriter(), + dispatcherServices.getBlobServer(), + dispatcherServices.getHighAvailabilityServices(), + dispatcherServices.getJobManagerMetricGroup()); + } + + private DispatcherResourceCleanerFactory( + Executor cleanupExecutor, + JobManagerRunnerRegistry jobManagerRunnerRegistry, + JobGraphWriter jobGraphWriter, + BlobServer blobServer, + HighAvailabilityServices highAvailabilityServices, + JobManagerMetricGroup jobManagerMetricGroup) { + this.cleanupExecutor = Preconditions.checkNotNull(cleanupExecutor); + this.jobManagerRunnerRegistry = Preconditions.checkNotNull(jobManagerRunnerRegistry); + this.jobGraphWriter = Preconditions.checkNotNull(jobGraphWriter); + this.blobServer = Preconditions.checkNotNull(blobServer); + this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); + this.jobManagerMetricGroup = Preconditions.checkNotNull(jobManagerMetricGroup); + } + + public ResourceCleaner createLocalResourceCleaner() { + final DefaultResourceCleaner resourceCleaner = new DefaultResourceCleaner(cleanupExecutor); + for (LocallyCleanableResource locallyCleanableResource : + Arrays.asList( + jobManagerRunnerRegistry, + jobGraphWriter, + blobServer, + highAvailabilityServices, + jobManagerMetricGroup)) { + resourceCleaner.withCleanupOf(locallyCleanableResource::localCleanupAsync); + } + + return resourceCleaner; + } + + public ResourceCleaner createGlobalResourceCleaner() { + final DefaultResourceCleaner resourceCleaner = new DefaultResourceCleaner(cleanupExecutor); + for (GloballyCleanableResource locallyCleanableResource : + Arrays.asList( + jobManagerRunnerRegistry, + jobGraphWriter, + blobServer, + highAvailabilityServices, + jobManagerMetricGroup)) { + resourceCleaner.withCleanupOf(locallyCleanableResource::globalCleanupAsync); + } + + return resourceCleaner; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java new file mode 100644 index 0000000000000..80047dbe9856f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * {@code GloballyCleanableResource} is supposed to be used by any class that provides artifacts for + * a given job that can be cleaned up globally. + * + * @see LocallyCleanableResource + */ +public interface GloballyCleanableResource { + + void globalCleanup(JobID jobId) throws Exception; + + default CompletableFuture globalCleanupAsync(JobID jobId, Executor cleanupExecutor) { + return CompletableFuture.runAsync( + () -> { + try { + globalCleanup(jobId); + } catch (Exception e) { + throw new CompletionException("Asynchronous global cleanup failed", e); + } + }, + cleanupExecutor); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java new file mode 100644 index 0000000000000..27d9c40527b76 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * {@code LocallyCleanableResource} is supposed to be used by any class that provides artifacts for + * a given job that can be cleaned up locally. + * + * @see GloballyCleanableResource + */ +public interface LocallyCleanableResource { + + void localCleanup(JobID jobId) throws Exception; + + default CompletableFuture localCleanupAsync(JobID jobId, Executor cleanupExecutor) { + return CompletableFuture.runAsync( + () -> { + try { + localCleanup(jobId); + } catch (Exception e) { + throw new CompletionException("Asynchronous local cleanup failed", e); + } + }, + cleanupExecutor); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleaner.java new file mode 100644 index 0000000000000..da32346c0605f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleaner.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; + +/** {@code ResourceCleaner} executes instances on the given {@code JobID}. */ +public interface ResourceCleaner { + + CompletableFuture cleanupAsync(JobID jobId); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleanerFactory.java new file mode 100644 index 0000000000000..91584dc0a2e31 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleanerFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.Executor; + +/** + * {@code ResourceCleanerFactory} provides methods to create {@link ResourceCleaner} for local and + * global cleanup. + * + * @see GloballyCleanableResource + * @see LocallyCleanableResource + */ +public interface ResourceCleanerFactory { + + /** + * Creates {@link ResourceCleaner} that initiates {@link + * LocallyCleanableResource#localCleanupAsync(JobID, Executor)} calls. + */ + ResourceCleaner createLocalResourceCleaner(); + + /** + * Creates {@link ResourceCleaner} that initiates {@link + * GloballyCleanableResource#globalCleanupAsync(JobID, Executor)} calls. + */ + ResourceCleaner createGlobalResourceCleaner(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java index c0b8795793c63..638afc16ac441 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java @@ -337,7 +337,7 @@ public static ArchivedExecutionGraph createFrom( * Create a sparse ArchivedExecutionGraph for a job while it is still initializing. Most fields * will be empty, only job status and error-related fields are set. */ - public static ArchivedExecutionGraph createFromInitializingJob( + public static ArchivedExecutionGraph createSparseArchivedExecutionGraph( JobID jobId, String jobName, JobStatus jobStatus, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java index 13bc4586217d6..9b8e711136ad7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java @@ -206,7 +206,7 @@ public void closeAndCleanupAllData() throws Exception { } @Override - public void cleanupJobData(JobID jobID) throws Exception { + public void globalCleanup(JobID jobID) throws Exception { logger.info("Clean up the high availability data for job {}.", jobID); internalCleanupJobData(jobID); logger.info("Finished cleaning up the high availability data for job {}.", jobID); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index 8df9227ce22b6..672973239ca72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -21,6 +21,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blob.BlobStore; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource; +import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource; import org.apache.flink.runtime.jobmanager.JobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -43,7 +45,10 @@ *
  • Naming of RPC endpoints * */ -public interface HighAvailabilityServices extends ClientHighAvailabilityServices { +public interface HighAvailabilityServices + extends ClientHighAvailabilityServices, + LocallyCleanableResource, + GloballyCleanableResource { // ------------------------------------------------------------------------ // Constants @@ -239,11 +244,9 @@ default LeaderRetrievalService getClusterRestEndpointLeaderRetriever() { */ void closeAndCleanupAllData() throws Exception; - /** - * Deletes all data for specified job stored by these services in external stores. - * - * @param jobID The identifier of the job to cleanup. - * @throws Exception Thrown, if an exception occurred while cleaning data stored by them. - */ - default void cleanupJobData(JobID jobID) throws Exception {} + @Override + default void globalCleanup(JobID jobId) throws Exception {} + + @Override + default void localCleanup(JobID jobId) throws Exception {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java index 832f9c92baf97..5416bdae4261e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java @@ -238,7 +238,7 @@ public void putJobGraph(JobGraph jobGraph) throws Exception { } @Override - public void removeJobGraph(JobID jobId) throws Exception { + public void globalCleanup(JobID jobId) throws Exception { checkNotNull(jobId, "Job ID"); String name = jobGraphStoreUtil.jobIDToName(jobId); @@ -262,7 +262,7 @@ public void removeJobGraph(JobID jobId) throws Exception { } @Override - public void releaseJobGraph(JobID jobId) throws Exception { + public void localCleanup(JobID jobId) throws Exception { checkNotNull(jobId, "Job ID"); LOG.debug("Releasing job graph {} from {}.", jobId, jobGraphStateHandleStore); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java index 23542a330b923..63d644a813c07 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java @@ -19,10 +19,12 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource; +import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource; import org.apache.flink.runtime.jobgraph.JobGraph; /** Allows to store and remove job graphs. */ -public interface JobGraphWriter { +public interface JobGraphWriter extends LocallyCleanableResource, GloballyCleanableResource { /** * Adds the {@link JobGraph} instance. * @@ -30,9 +32,6 @@ public interface JobGraphWriter { */ void putJobGraph(JobGraph jobGraph) throws Exception; - /** Removes the {@link JobGraph} with the given {@link JobID} if it exists. */ - void removeJobGraph(JobID jobId) throws Exception; - /** * Releases the locks on the specified {@link JobGraph}. * @@ -42,5 +41,9 @@ public interface JobGraphWriter { * @param jobId specifying the job to release the locks for * @throws Exception if the locks cannot be released */ - void releaseJobGraph(JobID jobId) throws Exception; + @Override + default void localCleanup(JobID jobId) throws Exception {} + + @Override + default void globalCleanup(JobID jobId) throws Exception {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStore.java index 656df2ffd3607..f7d8135136aa4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStore.java @@ -47,16 +47,6 @@ public void putJobGraph(JobGraph jobGraph) { // Nothing to do } - @Override - public void removeJobGraph(JobID jobId) { - // Nothing to do - } - - @Override - public void releaseJobGraph(JobID jobId) { - // nothing to do - } - @Override public Collection getJobIds() { return Collections.emptyList(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java index 224b7fbd224f5..8bf9aa0c45d2f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.jobmanager; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobGraph; /** {@link JobGraphWriter} implementation which does not allow to store {@link JobGraph}. */ @@ -29,10 +28,4 @@ public enum ThrowingJobGraphWriter implements JobGraphWriter { public void putJobGraph(JobGraph jobGraph) { throw new UnsupportedOperationException("Cannot store job graphs."); } - - @Override - public void removeJobGraph(JobID jobId) {} - - @Override - public void releaseJobGraph(JobID jobId) {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/CheckpointResourcesCleanupRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/CheckpointResourcesCleanupRunner.java new file mode 100644 index 0000000000000..6c026cc5cb978 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/CheckpointResourcesCleanupRunner.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.SharedStateRegistryFactory; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * {@code CheckpointResourcesCleanupRunner} implements {@link JobManagerRunner} in a way, that only + * the checkpoint-related resources are instantiated. It triggers any job-specific cleanup that's + * usually performed by the {@link JobMaster} without rebuilding the corresponding {@link + * org.apache.flink.runtime.executiongraph.ExecutionGraph}. + */ +public class CheckpointResourcesCleanupRunner implements JobManagerRunner { + + private static final Logger LOG = + LoggerFactory.getLogger(CheckpointResourcesCleanupRunner.class); + + private final JobResult jobResult; + private final CheckpointRecoveryFactory checkpointRecoveryFactory; + private final CheckpointsCleaner checkpointsCleaner; + private final SharedStateRegistryFactory sharedStateRegistryFactory; + private final Configuration jobManagerConfiguration; + private final Executor cleanupExecutor; + + private final long initializationTimestamp; + + private CompletableFuture resultFuture; + + private CompletedCheckpointStore completedCheckpointStore; + private CheckpointIDCounter checkpointIDCounter; + + public CheckpointResourcesCleanupRunner( + JobResult jobResult, + CheckpointRecoveryFactory checkpointRecoveryFactory, + CheckpointsCleaner checkpointsCleaner, + SharedStateRegistryFactory sharedStateRegistryFactory, + Configuration jobManagerConfiguration, + Executor cleanupExecutor, + long initializationTimestamp) { + this.jobResult = Preconditions.checkNotNull(jobResult); + this.checkpointRecoveryFactory = Preconditions.checkNotNull(checkpointRecoveryFactory); + this.checkpointsCleaner = Preconditions.checkNotNull(checkpointsCleaner); + this.sharedStateRegistryFactory = Preconditions.checkNotNull(sharedStateRegistryFactory); + this.jobManagerConfiguration = Preconditions.checkNotNull(jobManagerConfiguration); + this.cleanupExecutor = Preconditions.checkNotNull(cleanupExecutor); + this.initializationTimestamp = initializationTimestamp; + } + + @Override + public CompletableFuture closeAsync() { + return resultFuture.thenApply( + result -> { + Exception exception = null; + try { + completedCheckpointStore.shutdown(getJobStatus(), checkpointsCleaner); + } catch (Exception e) { + exception = e; + } + + try { + checkpointIDCounter.shutdown(getJobStatus()); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw new CompletionException(exception); + } + + return null; + }); + } + + @Override + public void start() throws Exception { + resultFuture = + CompletableFuture.runAsync(this::initializeAccessingComponents, cleanupExecutor); + } + + private void initializeAccessingComponents() { + initializeCompletedCheckpointStore(); + initializeCheckpointIDCounter(); + } + + private void initializeCompletedCheckpointStore() { + try { + this.completedCheckpointStore = + checkpointRecoveryFactory.createRecoveredCompletedCheckpointStore( + getJobID(), jobManagerConfiguration, LOG); + + loadSharedStateFromCheckpoints(completedCheckpointStore); + } catch (Exception e) { + throw new CompletionException( + "Error occurred while initializing the CompletedCheckpointStore access.", e); + } + } + + private void loadSharedStateFromCheckpoints(CompletedCheckpointStore completedCheckpointStore) + throws Exception { + try (SharedStateRegistry sharedStateRegistry = + sharedStateRegistryFactory.create(cleanupExecutor)) { + // register all (shared) states from the checkpoint store with the new registry + for (CompletedCheckpoint completedCheckpoint : + completedCheckpointStore.getAllCheckpoints()) { + completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry); + } + } + } + + private void initializeCheckpointIDCounter() { + try { + this.checkpointIDCounter = + checkpointRecoveryFactory.createCheckpointIDCounter(getJobID()); + } catch (Exception e) { + throw new CompletionException( + "Error occurred while initializing the CheckpointIDCounter access.", e); + } + } + + @Override + public CompletableFuture getJobMasterGateway() { + return FutureUtils.completedExceptionally( + new UnavailableDispatcherOperationException( + "Unable to get JobMasterGateway for job in cleanup phase. The requested operation is not available in that stage.")); + } + + @Override + public CompletableFuture getResultFuture() { + return CompletableFuture.completedFuture( + JobManagerRunnerResult.forSuccess(createExecutionGraphInfoFromJobResult())); + } + + @Override + public JobID getJobID() { + return jobResult.getJobId(); + } + + @Override + public CompletableFuture cancel(Time timeout) { + Preconditions.checkState( + resultFuture != null, "The CheckpointResourcesCleanupRunner was not started, yet."); + if (resultFuture.cancel(true)) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + return FutureUtils.completedExceptionally( + new FlinkException("Cleanup task couldn't be cancelled.")); + } + + @Override + public CompletableFuture requestJobStatus(Time timeout) { + return CompletableFuture.completedFuture(getJobStatus()); + } + + @Override + public CompletableFuture requestJobDetails(Time timeout) { + return requestJob(timeout) + .thenApply( + executionGraphInfo -> + JobDetails.createDetailsForJob( + executionGraphInfo.getArchivedExecutionGraph())); + } + + @Override + public CompletableFuture requestJob(Time timeout) { + return CompletableFuture.completedFuture(createExecutionGraphInfoFromJobResult()); + } + + @Override + public boolean isInitialized() { + return true; + } + + private ExecutionGraphInfo createExecutionGraphInfoFromJobResult() { + return generateExecutionGraphInfo(jobResult, initializationTimestamp); + } + + private JobStatus getJobStatus() { + return getJobStatus(jobResult); + } + + private static JobStatus getJobStatus(JobResult jobResult) { + return jobResult.getApplicationStatus().deriveJobStatus(); + } + + private static ExecutionGraphInfo generateExecutionGraphInfo( + JobResult jobResult, long initializationTimestamp) { + return new ExecutionGraphInfo( + ArchivedExecutionGraph.createSparseArchivedExecutionGraph( + jobResult.getJobId(), + "we might want to move the job name into the job result", + getJobStatus(jobResult), + jobResult.getSerializedThrowable().orElse(null), + null, + initializationTimestamp)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceProcessFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceProcessFactory.java index 1c60ccf698fc3..cbd7920e69dec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceProcessFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceProcessFactory.java @@ -68,7 +68,7 @@ public JobID getJobId() { @Override public ArchivedExecutionGraph createArchivedExecutionGraph( JobStatus jobStatus, @Nullable Throwable cause) { - return ArchivedExecutionGraph.createFromInitializingJob( + return ArchivedExecutionGraph.createSparseArchivedExecutionGraph( jobId, jobName, jobStatus, cause, checkpointingSettings, initializationTimestamp); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java index 431dab46bcce2..0ac161402ce1d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java @@ -20,6 +20,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource; +import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; @@ -33,7 +35,8 @@ *

    Contains extra logic for adding jobs with tasks, and removing jobs when they do not contain * tasks any more */ -public class JobManagerMetricGroup extends ComponentMetricGroup { +public class JobManagerMetricGroup extends ComponentMetricGroup + implements LocallyCleanableResource, GloballyCleanableResource { private final Map jobs = new HashMap<>(); @@ -84,7 +87,17 @@ public JobManagerJobMetricGroup addJob(JobID jobId, String jobName) { } } - public void removeJob(JobID jobId) { + @Override + public void globalCleanup(JobID jobId) { + cleanup(jobId); + } + + @Override + public void localCleanup(JobID jobId) { + cleanup(jobId); + } + + private void cleanup(JobID jobId) { if (jobId == null) { return; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index fdfa2384dda1b..62b0992f6a474 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -142,7 +142,6 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio jobGraph, ioExecutor, jobMasterConfiguration, - userCodeLoader, checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 9f195d6103064..262424d4dd7f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -161,7 +161,6 @@ public SchedulerBase( final JobGraph jobGraph, final Executor ioExecutor, final Configuration jobMasterConfiguration, - final ClassLoader userCodeLoader, final CheckpointsCleaner checkpointsCleaner, final CheckpointRecoveryFactory checkpointRecoveryFactory, final JobManagerJobMetricGroup jobManagerJobMetricGroup, @@ -185,7 +184,6 @@ public SchedulerBase( SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled( jobGraph, jobMasterConfiguration, - userCodeLoader, checkNotNull(checkpointRecoveryFactory), log); this.checkpointIdCounter = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java index 481baa3bb8516..99126acd52cd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java @@ -18,9 +18,7 @@ package org.apache.flink.runtime.scheduler; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; @@ -44,15 +42,14 @@ private SchedulerUtils() { public static CompletedCheckpointStore createCompletedCheckpointStoreIfCheckpointingIsEnabled( JobGraph jobGraph, Configuration configuration, - ClassLoader userCodeLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, Logger log) throws JobExecutionException { final JobID jobId = jobGraph.getJobID(); if (DefaultExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { try { - return createCompletedCheckpointStore( - configuration, userCodeLoader, checkpointRecoveryFactory, log, jobId); + return checkpointRecoveryFactory.createRecoveredCompletedCheckpointStore( + jobId, configuration, log); } catch (Exception e) { throw new JobExecutionException( jobId, @@ -64,34 +61,6 @@ public static CompletedCheckpointStore createCompletedCheckpointStoreIfCheckpoin } } - @VisibleForTesting - static CompletedCheckpointStore createCompletedCheckpointStore( - Configuration jobManagerConfig, - ClassLoader classLoader, - CheckpointRecoveryFactory recoveryFactory, - Logger log, - JobID jobId) - throws Exception { - int maxNumberOfCheckpointsToRetain = - jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS); - - if (maxNumberOfCheckpointsToRetain <= 0) { - // warning and use 1 as the default value if the setting in - // state.checkpoints.max-retained-checkpoints is not greater than 0. - log.warn( - "The setting for '{} : {}' is invalid. Using default value of {}", - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), - maxNumberOfCheckpointsToRetain, - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()); - - maxNumberOfCheckpointsToRetain = - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue(); - } - - return recoveryFactory.createRecoveredCompletedCheckpointStore( - jobId, maxNumberOfCheckpointsToRetain, classLoader); - } - public static CheckpointIDCounter createCheckpointIDCounterIfCheckpointingIsEnabled( JobGraph jobGraph, CheckpointRecoveryFactory checkpointRecoveryFactory) throws JobExecutionException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 01014e3aa1b4b..f22b02bcbe0bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -242,11 +242,7 @@ public AdaptiveScheduler( this.checkpointsCleaner = checkpointsCleaner; this.completedCheckpointStore = SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled( - jobGraph, - configuration, - userCodeClassLoader, - checkpointRecoveryFactory, - LOG); + jobGraph, configuration, checkpointRecoveryFactory, LOG); this.checkpointIdCounter = SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled( jobGraph, checkpointRecoveryFactory); @@ -746,7 +742,7 @@ private VertexParallelism determineParallelism(SlotAllocator slotAllocator) @Override public ArchivedExecutionGraph getArchivedExecutionGraph( JobStatus jobStatus, @Nullable Throwable cause) { - return ArchivedExecutionGraph.createFromInitializingJob( + return ArchivedExecutionGraph.createSparseArchivedExecutionGraph( jobInformation.getJobID(), jobInformation.getName(), jobStatus, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java index c70c991fe63b2..ccd076c70bb38 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java @@ -44,7 +44,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -313,14 +312,12 @@ public void testPermanentBlobDeferredCleanup() throws IOException, InterruptedEx } @Test - public void testTransientBlobNoJobCleanup() - throws IOException, InterruptedException, ExecutionException { + public void testTransientBlobNoJobCleanup() throws Exception { testTransientBlobCleanup(null); } @Test - public void testTransientBlobForJobCleanup() - throws IOException, InterruptedException, ExecutionException { + public void testTransientBlobForJobCleanup() throws Exception { testTransientBlobCleanup(new JobID()); } @@ -328,8 +325,7 @@ public void testTransientBlobForJobCleanup() * Tests that {@link TransientBlobCache} cleans up after a default TTL and keeps files which are * constantly accessed. */ - private void testTransientBlobCleanup(@Nullable final JobID jobId) - throws IOException, InterruptedException, ExecutionException { + private void testTransientBlobCleanup(@Nullable final JobID jobId) throws Exception { // 1s should be a safe-enough buffer to still check for existence after a BLOB's last access long cleanupInterval = 1L; // in seconds @@ -386,7 +382,7 @@ private void testTransientBlobCleanup(@Nullable final JobID jobId) // files are cached now for the given TTL - remove from server so that they are not // re-downloaded if (jobId != null) { - server.cleanupJob(jobId, true); + server.globalCleanup(jobId); } else { server.deleteFromCache(key1); server.deleteFromCache(key2); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index 21b532f9cd691..3c8676852035d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -271,21 +271,22 @@ private void testDeleteBlobFails(@Nullable final JobID jobId, BlobKey.BlobType b } @Test - public void testJobCleanup() throws IOException { + public void testJobCleanup() throws Exception { testJobCleanup(TRANSIENT_BLOB); } @Test - public void testJobCleanupHa() throws IOException { + public void testJobCleanupHa() throws Exception { testJobCleanup(PERMANENT_BLOB); } /** - * Tests that {@link BlobServer} cleans up after calling {@link BlobServer#cleanupJob}. + * Tests that {@link BlobServer} cleans up after calling {@link + * BlobServer#globalCleanup(JobID)}. * * @param blobType whether the BLOB should become permanent or transient */ - private void testJobCleanup(BlobKey.BlobType blobType) throws IOException { + private void testJobCleanup(BlobKey.BlobType blobType) throws Exception { JobID jobId1 = new JobID(); JobID jobId2 = new JobID(); @@ -314,7 +315,7 @@ private void testJobCleanup(BlobKey.BlobType blobType) throws IOException { verifyContents(server, jobId2, key2, data); checkFileCountForJob(1, jobId2, server); - server.cleanupJob(jobId1, true); + server.globalCleanup(jobId1); verifyDeleted(server, jobId1, key1a); verifyDeleted(server, jobId1, key1b); @@ -322,14 +323,14 @@ private void testJobCleanup(BlobKey.BlobType blobType) throws IOException { verifyContents(server, jobId2, key2, data); checkFileCountForJob(1, jobId2, server); - server.cleanupJob(jobId2, true); + server.globalCleanup(jobId2); checkFileCountForJob(0, jobId1, server); verifyDeleted(server, jobId2, key2); checkFileCountForJob(0, jobId2, server); // calling a second time should not fail - server.cleanupJob(jobId2, true); + server.globalCleanup(jobId2); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java index 57e6e2f2c7e14..0081b02693651 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java @@ -90,7 +90,7 @@ public void testBlobServerRecovery() throws Exception { * @throws IOException in case of failures */ public static void testBlobServerRecovery(final Configuration config, final BlobStore blobStore) - throws IOException { + throws Exception { final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId; @@ -139,8 +139,8 @@ public static void testBlobServerRecovery(final Configuration config, final Blob verifyDeleted(cache1, jobId[0], nonHAKey); // Remove again - server1.cleanupJob(jobId[0], true); - server1.cleanupJob(jobId[1], true); + server1.globalCleanup(jobId[0]); + server1.globalCleanup(jobId[1]); // Verify everything is clean assertTrue("HA storage directory does not exist", fs.exists(new Path(storagePath))); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java new file mode 100644 index 0000000000000..8e4e4b1c06ec2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.testutils.TestFileSystem; +import org.apache.flink.util.Preconditions; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.MessageDigest; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** {@code FileSystemBlobStoreTest} tests the {@link FileSystemBlobStore} implementation. */ +public class FileSystemBlobStoreTest { + + private FileSystemBlobStore testInstance; + private Path storagePath; + + @BeforeEach + public void createTestInstance(@TempDir Path storagePath) throws IOException { + this.testInstance = new FileSystemBlobStore(new TestFileSystem(), storagePath.toString()); + this.storagePath = storagePath; + } + + public void finalizeTestInstance() throws IOException { + testInstance.close(); + } + + @Test + public void testSuccessfulPut() throws IOException { + final Path temporaryFile = createTemporaryFileWithContent("put"); + + final JobID jobId = new JobID(); + final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile); + assertThat(getBlobDirectoryPath()).isEmptyDirectory(); + + final boolean successfullyWritten = + testInstance.put(temporaryFile.toFile(), jobId, blobKey); + assertThat(successfullyWritten).isTrue(); + + assertThat(getPath(jobId)).isDirectory().exists(); + assertThat(getPath(jobId, blobKey)).isNotEmptyFile().hasSameTextualContentAs(temporaryFile); + } + + @Test + public void testMissingFilePut() throws IOException { + assertThatThrownBy( + () -> + testInstance.put( + new File("/not/existing/file"), + new JobID(), + new PermanentBlobKey())) + .isInstanceOf(FileNotFoundException.class); + } + + @Test + public void testSuccessfulGet() throws IOException { + final Path temporaryFile = createTemporaryFileWithContent("get"); + final JobID jobId = new JobID(); + final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile); + + assertThat(testInstance.put(temporaryFile.toFile(), jobId, blobKey)).isTrue(); + + final Path targetFile = Files.createTempFile("filesystemblobstoretest-get-target-", ""); + assertThat(targetFile).isEmptyFile(); + final boolean successfullyGet = testInstance.get(jobId, blobKey, targetFile.toFile()); + assertThat(successfullyGet).isTrue(); + + assertThat(targetFile).hasSameTextualContentAs(temporaryFile); + } + + @Test + public void testGetWithWrongJobId() throws IOException { + final Path temporaryFile = createTemporaryFileWithContent("get"); + final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile); + + assertThat(testInstance.put(temporaryFile.toFile(), new JobID(), blobKey)).isTrue(); + + assertThatThrownBy( + () -> + testInstance.get( + new JobID(), + blobKey, + Files.createTempFile( + "filesystemblobstoretest-get-with-wrong-jobid-", + "") + .toFile())) + .isInstanceOf(FileNotFoundException.class); + } + + @Test + public void testGetWithWrongBlobKey() throws IOException { + final Path temporaryFile = createTemporaryFileWithContent("get"); + + final JobID jobId = new JobID(); + assertThat(testInstance.put(temporaryFile.toFile(), jobId, new PermanentBlobKey())) + .isTrue(); + + assertThatThrownBy( + () -> + testInstance.get( + jobId, + new PermanentBlobKey(), + Files.createTempFile( + "filesystemblobstoretest-get-with-wrong-blobkey-", + "") + .toFile())) + .isInstanceOf(FileNotFoundException.class); + } + + @Test + public void testSuccessfulDeleteOnlyBlob() throws IOException { + final Path temporaryFile = createTemporaryFileWithContent("delete"); + final JobID jobId = new JobID(); + final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile); + + assertThat(testInstance.put(temporaryFile.toFile(), jobId, blobKey)).isTrue(); + + assertThat(getPath(jobId)).isDirectory().exists(); + assertThat(getPath(jobId, blobKey)).isNotEmptyFile(); + + final boolean successfullyDeleted = testInstance.delete(jobId, blobKey); + + assertThat(successfullyDeleted).isTrue(); + assertThat(getPath(jobId)).doesNotExist(); + } + + @Test + public void testSuccessfulDeleteBlob() throws IOException { + final Path temporaryFile = createTemporaryFileWithContent("delete"); + final JobID jobId = new JobID(); + final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile); + final BlobKey otherBlobKey = new PermanentBlobKey(); + + assertThat(testInstance.put(temporaryFile.toFile(), jobId, blobKey)).isTrue(); + // create another artifact to omit deleting the directory + assertThat(testInstance.put(temporaryFile.toFile(), jobId, otherBlobKey)).isTrue(); + + assertThat(getPath(jobId)).isDirectory().exists(); + assertThat(getPath(jobId, blobKey)).isNotEmptyFile(); + assertThat(getPath(jobId, otherBlobKey)).isNotEmptyFile(); + + final boolean successfullyDeleted = testInstance.delete(jobId, blobKey); + + assertThat(successfullyDeleted).isTrue(); + assertThat(getPath(jobId, otherBlobKey)).exists(); + } + + @Test + public void testDeleteWithNotExistingJobId() { + assertThat(testInstance.delete(new JobID(), new PermanentBlobKey())).isTrue(); + } + + @Test + public void testDeleteWithNotExistingBlobKey() throws IOException { + final Path temporaryFile = createTemporaryFileWithContent("delete"); + final JobID jobId = new JobID(); + final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile); + + assertThat(testInstance.put(temporaryFile.toFile(), jobId, blobKey)).isTrue(); + assertThat(testInstance.delete(jobId, new PermanentBlobKey())).isTrue(); + assertThat(getPath(jobId, blobKey)).exists(); + } + + @Test + public void testDeleteAll() throws IOException { + final Path temporaryFile = createTemporaryFileWithContent("delete"); + final JobID jobId = new JobID(); + + assertThat(testInstance.put(temporaryFile.toFile(), jobId, new PermanentBlobKey())) + .isTrue(); + assertThat(testInstance.put(temporaryFile.toFile(), jobId, new PermanentBlobKey())) + .isTrue(); + + assertThat(getPath(jobId)).isDirectory().exists(); + assertThat(getPath(jobId).toFile().listFiles()).hasSize(2); + + assertThat(testInstance.deleteAll(jobId)).isTrue(); + assertThat(getPath(jobId)).doesNotExist(); + } + + @Test + public void testDeleteAllWithNotExistingJobId() { + final JobID jobId = new JobID(); + assertThat(testInstance.deleteAll(jobId)).isTrue(); + assertThat(getPath(jobId)).doesNotExist(); + } + + private Path createTemporaryFileWithContent(String operationLabel) throws IOException { + final String actualContent = + String.format("Content for testing the %s operation", operationLabel); + final Path temporaryFile = + Files.createTempFile( + String.format("filesystemblobstoretest-%s-", operationLabel), ""); + try (BufferedWriter writer = + new BufferedWriter(new FileWriter(temporaryFile.toAbsolutePath().toString()))) { + writer.write(actualContent); + } + + return temporaryFile; + } + + private Path getBlobDirectoryPath() { + return storagePath.resolve(FileSystemBlobStore.BLOB_PATH_NAME); + } + + private Path getPath(JobID jobId) { + return getBlobDirectoryPath().resolve(String.format("job_%s", jobId)); + } + + private Path getPath(JobID jobId, BlobKey blobKey) { + return getPath(jobId).resolve(String.format("blob_%s", blobKey)); + } + + private BlobKey createPermanentBlobKeyFromFile(Path path) throws IOException { + Preconditions.checkArgument(!Files.isDirectory(path)); + Preconditions.checkArgument(Files.exists(path)); + + MessageDigest md = BlobUtils.createMessageDigest(); + try (InputStream is = new FileInputStream(path.toFile())) { + final byte[] buf = new byte[1024]; + int bytesRead = is.read(buf); + while (bytesRead >= 0) { + md.update(buf, 0, bytesRead); + bytesRead = is.read(buf); + } + + return BlobKey.createKey(BlobKey.BlobType.PERMANENT_BLOB, md.digest()); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactoryTest.java new file mode 100644 index 0000000000000..b1c1f03a93c7a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactoryTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code CheckpointRecoveryFactoryTest} tests the default functionality of {@link + * CheckpointRecoveryFactory}. + */ +public class CheckpointRecoveryFactoryTest { + + private static final Logger log = LoggerFactory.getLogger(CheckpointRecoveryFactoryTest.class); + + @ParameterizedTest(name = "actual: {0}; expected: {1}") + @CsvSource({"10,10", "0,1", "-1,1"}) + public void testMaxRemainingCheckpointsParameterSetting(int actualValue, int expectedValue) + throws Exception { + final JobID expectedJobId = new JobID(); + final Configuration jobManagerConfig = new Configuration(); + jobManagerConfig.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, actualValue); + + final TestCheckpointRecoveryFactory testInstance = new TestCheckpointRecoveryFactory(); + assertThat( + testInstance.createRecoveredCompletedCheckpointStore( + expectedJobId, jobManagerConfig, log)) + .isNull(); + + assertThat(testInstance.getActualJobId()).isEqualTo(expectedJobId); + assertThat(testInstance.getActualMaximumNumberOfRetainedCheckpointsParamValue()) + .isEqualTo(expectedValue); + } + + private static class TestCheckpointRecoveryFactory implements CheckpointRecoveryFactory { + + private JobID actualJobId; + private int actualMaximumNumberOfRetainedCheckpointsParamValue; + + @Nullable + @Override + public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( + JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception { + this.actualJobId = jobId; + this.actualMaximumNumberOfRetainedCheckpointsParamValue = + maxNumberOfCheckpointsToRetain; + + return null; + } + + @Override + public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception { + throw new UnsupportedOperationException("createCheckpointIDCounter is not implemented"); + } + + public JobID getActualJobId() { + return actualJobId; + } + + public int getActualMaximumNumberOfRetainedCheckpointsParamValue() { + return actualMaximumNumberOfRetainedCheckpointsParamValue; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java index 6b65d00e4f091..6912299195a6a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java @@ -38,21 +38,16 @@ public void testFactoryWithoutCheckpointStoreRecovery() throws Exception { final CheckpointRecoveryFactory factory = PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery( maxCheckpoints -> store); - final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - final JobID firstJobId = new JobID(); - assertSame( - store, factory.createRecoveredCompletedCheckpointStore(firstJobId, 1, classLoader)); + assertSame(store, factory.createRecoveredCompletedCheckpointStore(firstJobId, 1)); assertThrows( UnsupportedOperationException.class, - () -> factory.createRecoveredCompletedCheckpointStore(firstJobId, 1, classLoader)); + () -> factory.createRecoveredCompletedCheckpointStore(firstJobId, 1)); final JobID secondJobId = new JobID(); - assertSame( - store, - factory.createRecoveredCompletedCheckpointStore(secondJobId, 1, classLoader)); + assertSame(store, factory.createRecoveredCompletedCheckpointStore(secondJobId, 1)); assertThrows( UnsupportedOperationException.class, - () -> factory.createRecoveredCompletedCheckpointStore(secondJobId, 1, classLoader)); + () -> factory.createRecoveredCompletedCheckpointStore(secondJobId, 1)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java index f4e9256922a1f..b5133d523db7e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java @@ -33,7 +33,7 @@ public TestingCheckpointRecoveryFactory( @Override public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( - JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) { + JobID jobId, int maxNumberOfCheckpointsToRetain) { return store; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java index 5bf7affee427b..3680be6ea0b28 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java @@ -103,7 +103,7 @@ public void testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed() final Error jobGraphRemovalError = new Error("Unable to remove job graph."); final TestingJobGraphStore jobGraphStore = TestingJobGraphStore.newBuilder() - .setRemoveJobGraphConsumer( + .setGlobalCleanupConsumer( graph -> { throw jobGraphRemovalError; }) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index 291fd5f1b9afe..7fcf9e2028740 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -55,18 +55,19 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; -import org.apache.flink.runtime.testutils.TestingJobGraphStore; import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.ThrowingConsumer; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -134,7 +135,8 @@ public class DispatcherResourceCleanupTest extends TestLogger { private CompletableFuture storedHABlobFuture; private CompletableFuture deleteAllHABlobsFuture; - private CompletableFuture cleanupJobFuture; + private CompletableFuture localCleanupFuture; + private CompletableFuture globalCleanupFuture; private CompletableFuture cleanupJobHADataFuture; private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE; @@ -157,7 +159,7 @@ public void setup() throws Exception { jobResultStore = new SingleJobResultStore(jobId, clearedJobLatch); highAvailabilityServices.setJobResultStore(jobResultStore); cleanupJobHADataFuture = new CompletableFuture<>(); - highAvailabilityServices.setCleanupJobDataFuture(cleanupJobHADataFuture); + highAvailabilityServices.setGlobalCleanupFuture(cleanupJobHADataFuture); storedHABlobFuture = new CompletableFuture<>(); deleteAllHABlobsFuture = new CompletableFuture<>(); @@ -169,9 +171,15 @@ public void setup() throws Exception { .setDeleteAllFunction(deleteAllHABlobsFuture::complete) .createTestingBlobStore(); - cleanupJobFuture = new CompletableFuture<>(); + globalCleanupFuture = new CompletableFuture<>(); + localCleanupFuture = new CompletableFuture<>(); - blobServer = new TestingBlobServer(configuration, testingBlobStore, cleanupJobFuture); + blobServer = + new TestingBlobServer( + configuration, + testingBlobStore, + jobId -> globalCleanupFuture.complete(jobId), + jobId -> localCleanupFuture.complete(jobId)); // upload a blob to the blob server permanentBlobKey = blobServer.putPermanent(jobId, new byte[256]); @@ -259,8 +267,8 @@ public void testBlobServerCleanupWhenJobFinished() throws Exception { } private void assertThatHABlobsHaveBeenRemoved() - throws InterruptedException, ExecutionException { - assertThat(cleanupJobFuture.get(), equalTo(jobId)); + throws InterruptedException, ExecutionException, TimeoutException { + assertGlobalCleanupTriggered(jobId); // verify that we also cleared the BlobStore assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId)); @@ -284,8 +292,7 @@ public void testBlobServerCleanupWhenJobNotFinished() throws Exception { jobManagerRunnerFactory.takeCreatedJobManagerRunner(); suspendJob(testingJobManagerRunner); - assertThat(cleanupJobFuture.get(), equalTo(jobId)); - + assertLocalCleanupTriggered(jobId); assertThat(blobFile.exists(), is(false)); // verify that we did not clear the BlobStore @@ -322,8 +329,7 @@ public void testBlobServerCleanupWhenClosingDispatcher() throws Exception { dispatcher.closeAsync().get(); - assertThat(cleanupJobFuture.get(), equalTo(jobId)); - + assertLocalCleanupTriggered(jobId); assertThat(blobFile.exists(), is(false)); // verify that we did not clear the BlobStore @@ -366,7 +372,7 @@ public void testHACleanupWhenJobFinishedWhileClosingDispatcher() throws Exceptio // check that no exceptions have been thrown dispatcherTerminationFuture.get(); - assertThat(cleanupJobFuture.get(), is(jobId)); + assertGlobalCleanupTriggered(jobId); assertThat(deleteAllHABlobsFuture.get(), is(jobId)); } @@ -462,7 +468,7 @@ public void testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws Exceptio is(true)); } - assertThatHABlobsHaveNotBeenRemoved(); + assertThatNoCleanupWasTriggered(); } finally { finishJob(testingJobManagerRunnerFactoryNG.takeCreatedJobManagerRunner()); } @@ -513,8 +519,9 @@ private void terminateJobWithState( .build())); } - private void assertThatHABlobsHaveNotBeenRemoved() { - assertThat(cleanupJobFuture.isDone(), is(false)); + private void assertThatNoCleanupWasTriggered() { + assertThat(globalCleanupFuture.isDone(), is(false)); + assertThat(localCleanupFuture.isDone(), is(false)); assertThat(deleteAllHABlobsFuture.isDone(), is(false)); assertThat(blobFile.exists(), is(true)); } @@ -594,33 +601,13 @@ public Collection getDirtyResults() throws IOException { } } + @Ignore @Test - public void testHABlobsAreNotRemovedIfHAJobGraphRemovalFails() throws Exception { - jobGraphWriter = - TestingJobGraphStore.newBuilder() - .setRemoveJobGraphConsumer( - ignored -> { - throw new Exception("Failed to Remove future"); - }) - .withAutomaticStart() - .build(); - - final TestingJobManagerRunnerFactory jobManagerRunnerFactory = - startDispatcherAndSubmitJob(); - - ArchivedExecutionGraph executionGraph = - new ArchivedExecutionGraphBuilder() - .setJobID(jobId) - .setState(JobStatus.CANCELED) - .build(); - - final TestingJobManagerRunner testingJobManagerRunner = - jobManagerRunnerFactory.takeCreatedJobManagerRunner(); - testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph)); + public void testLocalCleanup() {} - assertThat(cleanupJobFuture.get(), equalTo(jobId)); - assertThat(deleteAllHABlobsFuture.isDone(), is(false)); - } + @Ignore + @Test + public void testGlobalCleanup() {} @Test public void testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws Exception { @@ -637,37 +624,60 @@ public void testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws Exception jobManagerRunnerFactory.takeCreatedJobManagerRunner(); testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph)); - assertThat(cleanupJobFuture.get(), equalTo(jobId)); + assertGlobalCleanupTriggered(jobId); assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId)); } + private void assertLocalCleanupTriggered(JobID jobId) + throws ExecutionException, InterruptedException, TimeoutException { + assertThat(localCleanupFuture.get(100, TimeUnit.MILLISECONDS), equalTo(jobId)); + assertThat(globalCleanupFuture.isDone(), is(false)); + } + + private void assertGlobalCleanupTriggered(JobID jobId) + throws ExecutionException, InterruptedException, TimeoutException { + assertThat(localCleanupFuture.get(100, TimeUnit.MILLISECONDS), equalTo(jobId)); + assertThat(globalCleanupFuture.get(100, TimeUnit.MILLISECONDS), equalTo(jobId)); + } + private static final class TestingBlobServer extends BlobServer { - private final CompletableFuture cleanupJobFuture; + private final ThrowingConsumer globalCleanupConsumer; + private final ThrowingConsumer localCleanupConsumer; /** * Instantiates a new BLOB server and binds it to a free network port. * * @param config Configuration to be used to instantiate the BlobServer * @param blobStore BlobStore to store blobs persistently - * @param cleanupJobFuture + * @param globalCleanupConsumer The consumer called along the actual {@link + * #globalCleanup(JobID)} call. + * @param localCleanupConsumer The consumer called along the actual {@link + * #localCleanup(JobID)} call. * @throws IOException thrown if the BLOB server cannot bind to a free network port or if * the (local or distributed) file storage cannot be created or is not usable */ public TestingBlobServer( Configuration config, BlobStore blobStore, - CompletableFuture cleanupJobFuture) + ThrowingConsumer globalCleanupConsumer, + ThrowingConsumer localCleanupConsumer) throws IOException { super(config, blobStore); - this.cleanupJobFuture = cleanupJobFuture; + this.globalCleanupConsumer = globalCleanupConsumer; + this.localCleanupConsumer = localCleanupConsumer; + } + + @Override + public void globalCleanup(JobID jobId) throws Exception { + super.globalCleanup(jobId); + globalCleanupConsumer.accept(jobId); } @Override - public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) { - final boolean result = super.cleanupJob(jobId, cleanupBlobStoreFiles); - cleanupJobFuture.complete(jobId); - return result; + public void localCleanup(JobID jobId) throws IOException { + super.localCleanup(jobId); + localCleanupConsumer.accept(jobId); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index b5434f1c50c58..5023125277e38 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -478,7 +478,7 @@ public void testJobManagerRunnerInitializationFailureFailsJob() throws Exception testingJobManagerRunner.completeResultFuture( JobManagerRunnerResult.forInitializationFailure( new ExecutionGraphInfo( - ArchivedExecutionGraph.createFromInitializingJob( + ArchivedExecutionGraph.createSparseArchivedExecutionGraph( jobId, jobGraph.getName(), JobStatus.FAILED, @@ -640,7 +640,7 @@ public void testFatalErrorIfRecoveredJobsCannotBeStarted() throws Exception { testingJobManagerRunner.completeResultFuture( JobManagerRunnerResult.forInitializationFailure( new ExecutionGraphInfo( - ArchivedExecutionGraph.createFromInitializingJob( + ArchivedExecutionGraph.createSparseArchivedExecutionGraph( jobId, jobGraph.getName(), JobStatus.FAILED, @@ -681,15 +681,15 @@ public void testFailingJobManagerRunnerCleanup() throws Exception { // Track cleanup - ha-services final CompletableFuture cleanupJobData = new CompletableFuture<>(); - haServices.setCleanupJobDataFuture(cleanupJobData); + haServices.setGlobalCleanupFuture(cleanupJobData); cleanupJobData.thenAccept(jobId -> cleanUpEvents.add(CLEANUP_HA_SERVICES)); // Track cleanup - job-graph final TestingJobGraphStore jobGraphStore = TestingJobGraphStore.newBuilder() - .setReleaseJobGraphConsumer( + .setLocalCleanupConsumer( jobId -> cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE)) - .setRemoveJobGraphConsumer( + .setGlobalCleanupConsumer( jobId -> cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE)) .build(); jobGraphStore.start(null); @@ -846,8 +846,8 @@ public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception { final TestingJobGraphStore testingJobGraphStore = TestingJobGraphStore.newBuilder() - .setRemoveJobGraphConsumer(removeJobGraphFuture::complete) - .setReleaseJobGraphConsumer(releaseJobGraphFuture::complete) + .setGlobalCleanupConsumer(removeJobGraphFuture::complete) + .setLocalCleanupConsumer(releaseJobGraphFuture::complete) .build(); testingJobGraphStore.start(null); @@ -907,15 +907,15 @@ private void testJobDataAreCleanedUpInCorrectOrder(JobStatus jobStatus) throws E // Track cleanup - ha-services final CompletableFuture cleanupJobData = new CompletableFuture<>(); - haServices.setCleanupJobDataFuture(cleanupJobData); + haServices.setGlobalCleanupFuture(cleanupJobData); cleanupJobData.thenAccept(jobId -> cleanUpEvents.add(CLEANUP_HA_SERVICES)); // Track cleanup - job-graph final TestingJobGraphStore jobGraphStore = TestingJobGraphStore.newBuilder() - .setReleaseJobGraphConsumer( + .setLocalCleanupConsumer( jobId -> cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE)) - .setRemoveJobGraphConsumer( + .setGlobalCleanupConsumer( jobId -> cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE)) .build(); jobGraphStore.start(null); @@ -1158,7 +1158,7 @@ public TestingJobManagerRunner createJobManagerRunner( CompletableFuture.completedFuture( new ExecutionGraphInfo( ArchivedExecutionGraph - .createFromInitializingJob( + .createSparseArchivedExecutionGraph( jobGraph.getJobID(), jobGraph.getName(), JobStatus.RUNNING, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistryTest.java new file mode 100644 index 0000000000000..3757d15adc6c7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistryTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.concurrent.Executors; +import org.apache.flink.util.function.BiFunctionWithException; + +import org.assertj.core.api.ThrowingConsumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * {@code JobManagerRunnerRegistryTest} tests the functionality of {@link JobManagerRunnerRegistry}. + */ +public class JobManagerRunnerRegistryTest { + + private JobManagerRunnerRegistry testInstance; + + @BeforeEach + public void setup() { + testInstance = new JobManagerRunnerRegistry(4); + } + + @Test + public void testIsRegistered() { + final JobID jobId = new JobID(); + testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId).build()); + assertThat(testInstance.isRegistered(jobId)).isTrue(); + } + + @Test + public void testIsNotRegistered() { + assertThat(testInstance.isRegistered(new JobID())).isFalse(); + } + + @Test + public void testRegister() { + final JobID jobId = new JobID(); + testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId).build()); + assertThat(testInstance.isRegistered(jobId)).isTrue(); + } + + @Test + public void testRegisteringTwiceCausesFailure() { + final JobID jobId = new JobID(); + testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId).build()); + assertThat(testInstance.isRegistered(jobId)).isTrue(); + + assertThatThrownBy( + () -> + testInstance.register( + TestingJobManagerRunner.newBuilder() + .setJobId(jobId) + .build())) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testGet() { + final JobID jobId = new JobID(); + final JobManagerRunner jobManagerRunner = + TestingJobManagerRunner.newBuilder().setJobId(jobId).build(); + testInstance.register(jobManagerRunner); + + assertThat(testInstance.get(jobId)).isEqualTo(jobManagerRunner); + } + + @Test + public void testGetOnNonExistingJobManagerRunner() { + assertThatThrownBy(() -> testInstance.get(new JobID())) + .isInstanceOf(NoSuchElementException.class); + } + + @Test + public void size() { + assertThat(testInstance.size()).isEqualTo(0); + testInstance.register(TestingJobManagerRunner.newBuilder().build()); + assertThat(testInstance.size()).isEqualTo(1); + testInstance.register(TestingJobManagerRunner.newBuilder().build()); + assertThat(testInstance.size()).isEqualTo(2); + } + + @Test + public void testGetRunningJobIds() { + assertThat(testInstance.getRunningJobIds()).isEmpty(); + + final JobID jobId0 = new JobID(); + final JobID jobId1 = new JobID(); + testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId0).build()); + testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId1).build()); + + assertThat(testInstance.getRunningJobIds()).containsExactlyInAnyOrder(jobId0, jobId1); + } + + @Test + public void testGetJobManagerRunners() { + assertThat(testInstance.getJobManagerRunners()).isEmpty(); + + final JobManagerRunner jobManagerRunner0 = TestingJobManagerRunner.newBuilder().build(); + final JobManagerRunner jobManagerRunner1 = TestingJobManagerRunner.newBuilder().build(); + testInstance.register(jobManagerRunner0); + testInstance.register(jobManagerRunner1); + + assertThat(testInstance.getJobManagerRunners()) + .containsExactlyInAnyOrder(jobManagerRunner0, jobManagerRunner1); + } + + @Test + public void testSuccessfulGlobalCleanup() throws Throwable { + testSuccessfulSynchronousCleanup(testInstance::globalCleanup); + } + + @Test + public void testSuccessfulLocalCleanup() throws Throwable { + testSuccessfulSynchronousCleanup(testInstance::localCleanup); + } + + private void testSuccessfulSynchronousCleanup(ThrowingConsumer callback) + throws Throwable { + final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner(); + + callback.acceptThrows(jobManagerRunner.getJobID()); + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + assertThat(jobManagerRunner.getTerminationFuture()).isCompleted(); + } + + @Test + public void testFailingGlobalCleanup() { + testFailingSynchronousCleanup(testInstance::globalCleanup); + } + + @Test + public void testFailingLocalCleanup() { + testFailingSynchronousCleanup(testInstance::localCleanup); + } + + private void testFailingSynchronousCleanup(ThrowingConsumer callback) { + final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner(); + + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue(); + assertThat(jobManagerRunner.getTerminationFuture()).isNotDone(); + + final RuntimeException expectedException = new RuntimeException("Expected exception"); + jobManagerRunner.completeTerminationFutureExceptionally(expectedException); + + assertThatThrownBy(() -> callback.acceptThrows(jobManagerRunner.getJobID())) + .isInstanceOf(FlinkException.class) + .hasCause(expectedException); + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + } + + @Test + public void testSuccessfulGlobalCleanupAsync() throws Exception { + testSuccessfulCleanupAsync(testInstance::globalCleanupAsync); + } + + @Test + public void testSuccessfulLocalCleanupAsync() throws Exception { + testSuccessfulCleanupAsync(testInstance::localCleanupAsync); + } + + private void testSuccessfulCleanupAsync( + BiFunctionWithException, Exception> callback) + throws Exception { + final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner(); + + final CompletableFuture cleanupResult = + callback.apply(jobManagerRunner.getJobID(), Executors.directExecutor()); + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + assertThat(cleanupResult).isCompleted(); + } + + @Test + public void testFailingGlobalCleanupAsync() throws Exception { + testFailingCleanupAsync(testInstance::globalCleanupAsync); + } + + @Test + public void testFailingLocalCleanupAsync() throws Exception { + testFailingCleanupAsync(testInstance::localCleanupAsync); + } + + private void testFailingCleanupAsync( + BiFunctionWithException, Exception> callback) + throws Exception { + final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner(); + + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue(); + assertThat(jobManagerRunner.getTerminationFuture()).isNotDone(); + + final RuntimeException expectedException = new RuntimeException("Expected exception"); + jobManagerRunner.completeTerminationFutureExceptionally(expectedException); + + final CompletableFuture cleanupResult = + callback.apply(jobManagerRunner.getJobID(), Executors.directExecutor()); + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + assertThat(cleanupResult) + .isCompletedExceptionally() + .failsWithin(Duration.ZERO) + .withThrowableOfType(ExecutionException.class) + .withCause(expectedException); + } + + private TestingJobManagerRunner registerTestingJobManagerRunner() { + final TestingJobManagerRunner jobManagerRunner = + TestingJobManagerRunner.newBuilder().build(); + testInstance.register(jobManagerRunner); + + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue(); + assertThat(jobManagerRunner.getTerminationFuture()).isNotDone(); + + return jobManagerRunner; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpJobGraphWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpJobGraphWriter.java index 7a2d156e1945c..063cc2562355d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpJobGraphWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpJobGraphWriter.java @@ -30,8 +30,5 @@ public enum NoOpJobGraphWriter implements JobGraphWriter { public void putJobGraph(JobGraph jobGraph) throws Exception {} @Override - public void removeJobGraph(JobID jobId) throws Exception {} - - @Override - public void releaseJobGraph(JobID jobId) throws Exception {} + public void localCleanup(JobID jobId) throws Exception {} } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java new file mode 100644 index 0000000000000..8e6085d2cb585 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.function.BiFunction; + +import static org.assertj.core.api.Assertions.assertThat; + +/** {@code DefaultResourceCleanerTest} tests {@link DefaultResourceCleaner}. */ +public class DefaultResourceCleanerTest { + + private static final Executor EXECUTOR = Executors.directExecutor(); + private static final JobID JOB_ID = new JobID(); + + private DefaultResourceCleaner testInstance; + private CleanupCallbackWithTrigger cleanup0; + private CleanupCallbackWithTrigger cleanup1; + + @BeforeEach + public void setup() { + cleanup0 = new CleanupCallbackWithTrigger(); + cleanup1 = new CleanupCallbackWithTrigger(); + + testInstance = new DefaultResourceCleaner(EXECUTOR); + testInstance.withCleanupOf(cleanup0).withCleanupOf(cleanup1); + } + + @Test + public void testSuccessfulConcurrentCleanup() { + CompletableFuture cleanupResult = testInstance.cleanupAsync(JOB_ID); + + assertThat(cleanupResult).isNotCompleted(); + assertThat(cleanup0) + .extracting(CleanupCallbackWithTrigger::getProcessedJobId) + .isEqualTo(JOB_ID); + assertThat(cleanup1) + .extracting(CleanupCallbackWithTrigger::getProcessedJobId) + .isEqualTo(JOB_ID); + + cleanup0.completeCleanup(); + assertThat(cleanupResult).isNotCompleted(); + + cleanup1.completeCleanup(); + assertThat(cleanupResult).isCompleted(); + } + + @Test + public void testConcurrentCleanupWithExceptionFirst() { + CompletableFuture cleanupResult = testInstance.cleanupAsync(JOB_ID); + + assertThat(cleanupResult).isNotCompleted(); + assertThat(cleanup0) + .extracting(CleanupCallbackWithTrigger::getProcessedJobId) + .isEqualTo(JOB_ID); + assertThat(cleanup1) + .extracting(CleanupCallbackWithTrigger::getProcessedJobId) + .isEqualTo(JOB_ID); + + final RuntimeException expectedException = new RuntimeException("Expected exception"); + cleanup0.completeCleanupExceptionally(expectedException); + assertThat(cleanupResult).isNotCompleted(); + + cleanup1.completeCleanup(); + assertThat(cleanupResult) + .failsWithin(Duration.ZERO) + .withThrowableOfType(ExecutionException.class) + .withCause(expectedException); + } + + @Test + public void testConcurrentCleanupWithExceptionSecond() { + CompletableFuture cleanupResult = testInstance.cleanupAsync(JOB_ID); + + assertThat(cleanupResult).isNotCompleted(); + assertThat(cleanup0) + .extracting(CleanupCallbackWithTrigger::getProcessedJobId) + .isEqualTo(JOB_ID); + assertThat(cleanup1) + .extracting(CleanupCallbackWithTrigger::getProcessedJobId) + .isEqualTo(JOB_ID); + + cleanup0.completeCleanup(); + assertThat(cleanupResult).isNotCompleted(); + + final RuntimeException expectedException = new RuntimeException("Expected exception"); + cleanup1.completeCleanupExceptionally(expectedException); + assertThat(cleanupResult) + .failsWithin(Duration.ZERO) + .withThrowableOfType(ExecutionException.class) + .withCause(expectedException); + } + + private static class CleanupCallbackWithTrigger + implements BiFunction> { + + private final CompletableFuture resultFuture = new CompletableFuture<>(); + private JobID jobId; + + @Override + public CompletableFuture apply(JobID jobId, Executor executor) { + Preconditions.checkState(this.jobId == null); + this.jobId = jobId; + + return resultFuture; + } + + public JobID getProcessedJobId() { + return jobId; + } + + public void completeCleanup() { + this.resultFuture.complete(null); + } + + public void completeCleanupExceptionally(Throwable expectedException) { + this.resultFuture.completeExceptionally(expectedException); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResourceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResourceTest.java new file mode 100644 index 0000000000000..00fe5641c4500 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResourceTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** + * {@code GloballyCleanableResourceTest} tests the default implementation of {@link + * GloballyCleanableResource}. + */ +public class GloballyCleanableResourceTest { + + @Test + public void testGlobalCleanupAsync() { + final CompletableFuture globalCleanupTriggered = new CompletableFuture<>(); + final GloballyCleanableResource testInstance = + TestingCleanableResource.builder() + .withGlobalCleanupConsumer(globalCleanupTriggered::complete) + .build(); + + final JobID jobId = new JobID(); + testInstance.globalCleanupAsync(jobId, Executors.directExecutor()); + + assertThat(globalCleanupTriggered).isCompletedWithValue(jobId); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResourceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResourceTest.java new file mode 100644 index 0000000000000..8db2841fccfdc --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResourceTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** + * {@code LocallyCleanableResourceTest} tests the default implementation of {@link + * LocallyCleanableResource}. + */ +public class LocallyCleanableResourceTest { + + @Test + public void testLocalCleanupAsync() { + final CompletableFuture localCleanupTriggered = new CompletableFuture<>(); + final LocallyCleanableResource testInstance = + TestingCleanableResource.builder() + .withLocalCleanupConsumer(localCleanupTriggered::complete) + .build(); + + final JobID jobId = new JobID(); + testInstance.localCleanupAsync(jobId, Executors.directExecutor()); + + assertThat(localCleanupTriggered).isCompletedWithValue(jobId); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingCleanableResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingCleanableResource.java new file mode 100644 index 0000000000000..0380e11c4208d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingCleanableResource.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.function.ThrowingConsumer; + +/** + * {@code TestingCleanableResource} provides methods for testing the callbacks of {@link + * LocallyCleanableResource}. + */ +class TestingCleanableResource implements LocallyCleanableResource, GloballyCleanableResource { + + private final ThrowingConsumer localCleanupConsumer; + private final ThrowingConsumer globalCleanupConsumer; + + private TestingCleanableResource( + ThrowingConsumer localCleanupConsumer, + ThrowingConsumer globalCleanupConsumer) { + this.localCleanupConsumer = localCleanupConsumer; + this.globalCleanupConsumer = globalCleanupConsumer; + } + + @Override + public void localCleanup(JobID jobId) throws Exception { + localCleanupConsumer.accept(jobId); + } + + @Override + public void globalCleanup(JobID jobId) throws Exception { + globalCleanupConsumer.accept(jobId); + } + + public static TestingCleanableResource.Builder builder() { + return new Builder(); + } + + static class Builder { + + private ThrowingConsumer localCleanupConsumer = + jobId -> { + throw new UnsupportedOperationException("Local cleanup is not supported."); + }; + private ThrowingConsumer globalCleanupConsumer = + jobId -> { + throw new UnsupportedOperationException("Global cleanup is not supported."); + }; + + private Builder() {} + + public Builder withLocalCleanupConsumer( + ThrowingConsumer localCleanupConsumer) { + this.localCleanupConsumer = localCleanupConsumer; + return this; + } + + public Builder withGlobalCleanupConsumer( + ThrowingConsumer globalCleanupConsumer) { + this.globalCleanupConsumer = globalCleanupConsumer; + return this; + } + + public TestingCleanableResource build() { + return new TestingCleanableResource(localCleanupConsumer, globalCleanupConsumer); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java index 11c1abf19363a..61e2afd8eaf74 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java @@ -360,7 +360,7 @@ public void onRemovedJobGraph_terminatesRunningJob() throws Exception { dispatcherLeaderProcess.getDispatcherGateway().get(); // now remove the Job from the JobGraphStore and notify the dispatcher service - jobGraphStore.removeJobGraph(JOB_GRAPH.getJobID()); + jobGraphStore.globalCleanup(JOB_GRAPH.getJobID()); dispatcherLeaderProcess.onRemovedJobGraph(JOB_GRAPH.getJobID()); assertThat(terminateJobFuture.get(), is(JOB_GRAPH.getJobID())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index b5536e831b427..08e63ce6b78d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -152,7 +152,7 @@ public void testRecoveryRegisterAndDownload() throws Exception { } // Remove blobs again - server[1].cleanupJob(jobId, true); + server[1].globalCleanup(jobId); // Verify everything is clean below recoveryDir/ final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index e751f64ad9d6b..a79592795e24e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -152,7 +152,7 @@ public void testSerialization() throws IOException, ClassNotFoundException { @Test public void testCreateFromInitializingJobForSuspendedJob() { final ArchivedExecutionGraph suspendedExecutionGraph = - ArchivedExecutionGraph.createFromInitializingJob( + ArchivedExecutionGraph.createSparseArchivedExecutionGraph( new JobID(), "TestJob", JobStatus.SUSPENDED, @@ -170,7 +170,7 @@ public void testCheckpointSettingsArchiving() { CheckpointCoordinatorConfiguration.builder().build(); final ArchivedExecutionGraph archivedGraph = - ArchivedExecutionGraph.createFromInitializingJob( + ArchivedExecutionGraph.createSparseArchivedExecutionGraph( new JobID(), "TestJob", JobStatus.INITIALIZING, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java index ea2dfea3c849a..7e5adfe15c941 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java @@ -30,6 +30,7 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.Executors; import org.apache.flink.util.function.RunnableWithException; +import org.apache.flink.util.function.ThrowingConsumer; import org.junit.Test; @@ -39,7 +40,6 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.function.Consumer; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; @@ -129,7 +129,7 @@ public void testCleanupJobData() throws Exception { () -> {}, jobCleanupFuture::complete); - haServices.cleanupJobData(jobID); + haServices.globalCleanup(jobID); JobID jobIDCleaned = jobCleanupFuture.get(); assertThat(jobIDCleaned, is(jobID)); } @@ -184,7 +184,7 @@ private static final class TestingHaServices extends AbstractHaServices { private final Queue closeOperations; private final RunnableWithException internalCleanupRunnable; - private final Consumer internalJobCleanupConsumer; + private final ThrowingConsumer internalJobCleanupConsumer; private TestingHaServices( Configuration config, @@ -192,7 +192,7 @@ private TestingHaServices( BlobStoreService blobStoreService, Queue closeOperations, RunnableWithException internalCleanupRunnable, - Consumer internalJobCleanupConsumer) { + ThrowingConsumer internalJobCleanupConsumer) { super(config, ioExecutor, blobStoreService); this.closeOperations = closeOperations; this.internalCleanupRunnable = internalCleanupRunnable; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index d98150a5fb5c6..e4c573f21f4cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -73,7 +73,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private CompletableFuture closeAndCleanupAllDataFuture = new CompletableFuture<>(); - private volatile CompletableFuture jobCleanupFuture; + private volatile CompletableFuture globalCleanupFuture; + private volatile CompletableFuture localCleanupFuture; // ------------------------------------------------------------------------ // Setters for mock / testing implementations @@ -148,8 +149,12 @@ public void setCloseAndCleanupAllDataFuture( this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture; } - public void setCleanupJobDataFuture(CompletableFuture jobCleanupFuture) { - this.jobCleanupFuture = jobCleanupFuture; + public void setGlobalCleanupFuture(CompletableFuture globalCleanupFuture) { + this.globalCleanupFuture = globalCleanupFuture; + } + + public void setLocalCleanupFuture(CompletableFuture localCleanupFuture) { + this.localCleanupFuture = localCleanupFuture; } // ------------------------------------------------------------------------ @@ -286,7 +291,12 @@ public void closeAndCleanupAllData() throws Exception { } @Override - public void cleanupJobData(JobID jobID) { - Optional.ofNullable(jobCleanupFuture).ifPresent(f -> f.complete(jobID)); + public void globalCleanup(JobID jobID) throws Exception { + Optional.ofNullable(globalCleanupFuture).ifPresent(f -> f.complete(jobID)); + } + + @Override + public void localCleanup(JobID jobID) throws Exception { + Optional.ofNullable(localCleanupFuture).ifPresent(f -> f.complete(jobID)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java index 56b9b69e61f5f..3a65abf241620 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java @@ -174,7 +174,7 @@ public void testCleanupJobData() throws Exception { haServices -> { final List childrenBefore = client.getChildren().forPath(path); - haServices.cleanupJobData(jobID); + haServices.globalCleanup(jobID); final List childrenAfter = client.getChildren().forPath(path); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java index 568e937716ae8..ccabb3cf69390 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java @@ -200,7 +200,7 @@ public void testRemoveJobGraph() throws Exception { final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); jobGraphStore.putJobGraph(testingJobGraph); - jobGraphStore.removeJobGraph(testingJobGraph.getJobID()); + jobGraphStore.globalCleanup(testingJobGraph.getJobID()); final JobID actual = removeFuture.get(timeout, TimeUnit.MILLISECONDS); assertThat(actual, is(testingJobGraph.getJobID())); } @@ -213,7 +213,7 @@ public void testRemoveJobGraphWithNonExistName() throws Exception { .build(); final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); - jobGraphStore.removeJobGraph(testingJobGraph.getJobID()); + jobGraphStore.globalCleanup(testingJobGraph.getJobID()); try { removeFuture.get(timeout, TimeUnit.MILLISECONDS); @@ -346,7 +346,7 @@ public void testReleasingJobGraphShouldReleaseHandle() throws Exception { builder.setReleaseConsumer(releaseFuture::complete).build(); final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); jobGraphStore.putJobGraph(testingJobGraph); - jobGraphStore.releaseJobGraph(testingJobGraph.getJobID()); + jobGraphStore.localCleanup(testingJobGraph.getJobID()); final String actual = releaseFuture.get(); assertThat(actual, is(testingJobGraph.getJobID().toString())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStoreTest.java index 9f89d9f5669f0..d6c85a0c617dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStoreTest.java @@ -31,7 +31,7 @@ public class StandaloneJobGraphStoreTest { /** Tests that all operations work and don't change the state. */ @Test - public void testNoOps() { + public void testNoOps() throws Exception { StandaloneJobGraphStore jobGraphs = new StandaloneJobGraphStore(); JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph(); @@ -41,7 +41,7 @@ public void testNoOps() { jobGraphs.putJobGraph(jobGraph); assertEquals(0, jobGraphs.getJobIds().size()); - jobGraphs.removeJobGraph(jobGraph.getJobID()); + jobGraphs.globalCleanup(jobGraph.getJobID()); assertEquals(0, jobGraphs.getJobIds().size()); assertNull(jobGraphs.recoverJobGraph(new JobID())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java index ad5bcf81829d3..0773c404803a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java @@ -130,7 +130,7 @@ public void testPutAndRemoveJobGraph() throws Exception { verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId)); // Remove - jobGraphs.removeJobGraph(jobGraph.getJobID()); + jobGraphs.globalCleanup(jobGraph.getJobID()); // Empty state assertEquals(0, jobGraphs.getJobIds().size()); @@ -140,7 +140,7 @@ public void testPutAndRemoveJobGraph() throws Exception { verify(listener, never()).onRemovedJobGraph(any(JobID.class)); // Don't fail if called again - jobGraphs.removeJobGraph(jobGraph.getJobID()); + jobGraphs.globalCleanup(jobGraph.getJobID()); } finally { jobGraphs.stop(); } @@ -193,7 +193,7 @@ public void testRecoverJobGraphs() throws Exception { verifyJobGraphs(expected.get(jobGraph.getJobID()), jobGraph); - jobGraphs.removeJobGraph(jobGraph.getJobID()); + jobGraphs.globalCleanup(jobGraph.getJobID()); } // Empty state @@ -313,7 +313,7 @@ public void testJobGraphRemovalFailureAndLockRelease() throws Exception { assertThat(recoveredJobGraph, is(notNullValue())); try { - otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobID()); + otherSubmittedJobGraphStore.globalCleanup(recoveredJobGraph.getJobID()); fail( "It should not be possible to remove the JobGraph since the first store still has a lock on it."); } catch (Exception ignored) { @@ -323,7 +323,7 @@ public void testJobGraphRemovalFailureAndLockRelease() throws Exception { submittedJobGraphStore.stop(); // now we should be able to delete the job graph - otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobID()); + otherSubmittedJobGraphStore.globalCleanup(recoveredJobGraph.getJobID()); assertThat( otherSubmittedJobGraphStore.recoverJobGraph(recoveredJobGraph.getJobID()), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java index cfce046018c6e..cf5ba56a20bfb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java @@ -52,7 +52,7 @@ public class DefaultJobMasterServiceProcessTest extends TestLogger { private static final Function failedArchivedExecutionGraphFactory = (throwable -> - ArchivedExecutionGraph.createFromInitializingJob( + ArchivedExecutionGraph.createSparseArchivedExecutionGraph( jobId, "test", JobStatus.FAILED, throwable, null, 1337)); @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index 11fd24949905e..fa55290ce224a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -257,7 +257,7 @@ public void testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializat @Nonnull private ExecutionGraphInfo createFailedExecutionGraphInfo(FlinkException testException) { return new ExecutionGraphInfo( - ArchivedExecutionGraph.createFromInitializingJob( + ArchivedExecutionGraph.createSparseArchivedExecutionGraph( jobGraph.getJobID(), jobGraph.getName(), JobStatus.FAILED, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java index e6d613ccf7bc0..3b431fbf51112 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java @@ -64,7 +64,7 @@ private TestingJobManagerRunner( final ExecutionGraphInfo suspendedExecutionGraphInfo = new ExecutionGraphInfo( - ArchivedExecutionGraph.createFromInitializingJob( + ArchivedExecutionGraph.createSparseArchivedExecutionGraph( jobId, "TestJob", JobStatus.SUSPENDED, null, null, 0L), null); terminationFuture.whenComplete( @@ -150,6 +150,10 @@ public void completeTerminationFuture() { terminationFuture.complete(null); } + public void completeTerminationFutureExceptionally(Throwable expectedException) { + terminationFuture.completeExceptionally(expectedException); + } + public CompletableFuture getTerminationFuture() { return terminationFuture; } @@ -161,7 +165,7 @@ public void completeJobMasterGatewayFuture(JobMasterGateway testingJobMasterGate /** {@code Builder} for instantiating {@link TestingJobManagerRunner} instances. */ public static class Builder { - private JobID jobId = null; + private JobID jobId = new JobID(); private boolean blockingTermination = false; private CompletableFuture jobMasterGatewayFuture = new CompletableFuture<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactory.java index 379c747d5eeb3..40aa709bf57f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactory.java @@ -64,7 +64,7 @@ public JobID getJobId() { @Override public ArchivedExecutionGraph createArchivedExecutionGraph( JobStatus jobStatus, @Nullable Throwable cause) { - return ArchivedExecutionGraph.createFromInitializingJob( + return ArchivedExecutionGraph.createSparseArchivedExecutionGraph( jobId, jobName, jobStatus, cause, null, initializationTimestamp); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactoryOld.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactoryOld.java index 986b1bd9be05d..2f316be42b6fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactoryOld.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceProcessFactoryOld.java @@ -72,7 +72,7 @@ public JobID getJobId() { @Override public ArchivedExecutionGraph createArchivedExecutionGraph( JobStatus jobStatus, @Nullable Throwable cause) { - return ArchivedExecutionGraph.createFromInitializingJob( + return ArchivedExecutionGraph.createSparseArchivedExecutionGraph( jobId, "test-job", jobStatus, cause, null, System.currentTimeMillis()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java index 958c01a4a189b..ed74b2e363019 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -62,12 +62,12 @@ public void addAndRemoveJobs() throws Exception { assertEquals(2, group.numRegisteredJobMetricGroups()); - group.removeJob(jid1); + group.globalCleanup(jid1); assertTrue(jmJobGroup11.isClosed()); assertEquals(1, group.numRegisteredJobMetricGroups()); - group.removeJob(jid2); + group.globalCleanup(jid2); assertTrue(jmJobGroup21.isClosed()); assertEquals(0, group.numRegisteredJobMetricGroups()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java deleted file mode 100644 index 23073aa55a0c9..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.scheduler; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -/** Tests for the {@link SchedulerUtils} utilities. */ -public class SchedulerUtilsTest extends TestLogger { - - @Test - public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception { - - final int maxNumberOfCheckpointsToRetain = 10; - final Configuration jobManagerConfig = new Configuration(); - jobManagerConfig.setInteger( - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, maxNumberOfCheckpointsToRetain); - - final CompletedCheckpointStore completedCheckpointStore = - SchedulerUtils.createCompletedCheckpointStore( - jobManagerConfig, - getClass().getClassLoader(), - new StandaloneCheckpointRecoveryFactory(), - log, - new JobID()); - - assertEquals( - maxNumberOfCheckpointsToRetain, - completedCheckpointStore.getMaxNumberOfRetainedCheckpoints()); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java index 3709149bb649b..6887308f01b9a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java @@ -117,7 +117,7 @@ public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) { @Override public ArchivedExecutionGraph getArchivedExecutionGraph( JobStatus jobStatus, @Nullable Throwable cause) { - return ArchivedExecutionGraph.createFromInitializingJob( + return ArchivedExecutionGraph.createSparseArchivedExecutionGraph( new JobID(), "testJob", jobStatus, cause, null, 0L); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java index 12533aa1c71b1..edf7bbc77e847 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java @@ -201,7 +201,7 @@ public void goToExecuting(ExecutionGraph executionGraph) { @Override public ArchivedExecutionGraph getArchivedExecutionGraph( JobStatus jobStatus, @Nullable Throwable cause) { - return ArchivedExecutionGraph.createFromInitializingJob( + return ArchivedExecutionGraph.createSparseArchivedExecutionGraph( new JobID(), "testJob", jobStatus, cause, null, 0L); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java index e9a797f0f47f2..9fa121a31d39f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java @@ -54,9 +54,9 @@ public class TestingJobGraphStore implements JobGraphStore { private final ThrowingConsumer putJobGraphConsumer; - private final ThrowingConsumer removeJobGraphConsumer; + private final ThrowingConsumer globalCleanupConsumer; - private final ThrowingConsumer releaseJobGraphConsumer; + private final ThrowingConsumer localCleanupConsumer; private boolean started; @@ -68,16 +68,16 @@ private TestingJobGraphStore( BiFunctionWithException, JobGraph, ? extends Exception> recoverJobGraphFunction, ThrowingConsumer putJobGraphConsumer, - ThrowingConsumer removeJobGraphConsumer, - ThrowingConsumer releaseJobGraphConsumer, + ThrowingConsumer globalCleanupConsumer, + ThrowingConsumer localCleanupConsumer, Collection initialJobGraphs) { this.startConsumer = startConsumer; this.stopRunnable = stopRunnable; this.jobIdsFunction = jobIdsFunction; this.recoverJobGraphFunction = recoverJobGraphFunction; this.putJobGraphConsumer = putJobGraphConsumer; - this.removeJobGraphConsumer = removeJobGraphConsumer; - this.releaseJobGraphConsumer = releaseJobGraphConsumer; + this.globalCleanupConsumer = globalCleanupConsumer; + this.localCleanupConsumer = localCleanupConsumer; for (JobGraph initialJobGraph : initialJobGraphs) { storedJobs.put(initialJobGraph.getJobID(), initialJobGraph); @@ -110,16 +110,16 @@ public synchronized void putJobGraph(JobGraph jobGraph) throws Exception { } @Override - public synchronized void removeJobGraph(JobID jobId) throws Exception { + public synchronized void globalCleanup(JobID jobId) throws Exception { verifyIsStarted(); - removeJobGraphConsumer.accept(jobId); + globalCleanupConsumer.accept(jobId); storedJobs.remove(jobId); } @Override - public synchronized void releaseJobGraph(JobID jobId) throws Exception { + public synchronized void localCleanup(JobID jobId) throws Exception { verifyIsStarted(); - releaseJobGraphConsumer.accept(jobId); + localCleanupConsumer.accept(jobId); } @Override @@ -156,10 +156,9 @@ public static class Builder { private ThrowingConsumer putJobGraphConsumer = ignored -> {}; - private ThrowingConsumer removeJobGraphConsumer = ignored -> {}; + private ThrowingConsumer globalCleanupConsumer = ignored -> {}; - private ThrowingConsumer releaseJobGraphConsumer = - ignored -> {}; + private ThrowingConsumer localCleanupConsumer = ignored -> {}; private Collection initialJobGraphs = Collections.emptyList(); @@ -198,15 +197,15 @@ public Builder setPutJobGraphConsumer( return this; } - public Builder setRemoveJobGraphConsumer( - ThrowingConsumer removeJobGraphConsumer) { - this.removeJobGraphConsumer = removeJobGraphConsumer; + public Builder setGlobalCleanupConsumer( + ThrowingConsumer globalCleanupConsumer) { + this.globalCleanupConsumer = globalCleanupConsumer; return this; } - public Builder setReleaseJobGraphConsumer( - ThrowingConsumer releaseJobGraphConsumer) { - this.releaseJobGraphConsumer = releaseJobGraphConsumer; + public Builder setLocalCleanupConsumer( + ThrowingConsumer localCleanupConsumer) { + this.localCleanupConsumer = localCleanupConsumer; return this; } @@ -228,8 +227,8 @@ public TestingJobGraphStore build() { jobIdsFunction, recoverJobGraphFunction, putJobGraphConsumer, - removeJobGraphConsumer, - releaseJobGraphConsumer, + globalCleanupConsumer, + localCleanupConsumer, initialJobGraphs); if (startJobGraphStore) {