diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java index f1d08e184ac6e..49197f68f7766 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherGatewayServiceFactory.java @@ -26,11 +26,13 @@ import org.apache.flink.runtime.dispatcher.DispatcherFactory; import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; -import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore; +import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobPersistenceComponents; import org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess; import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherGatewayService; +import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.FlinkRuntimeException; @@ -83,7 +85,9 @@ public ApplicationDispatcherGatewayServiceFactory( public AbstractDispatcherLeaderProcess.DispatcherGatewayService create( DispatcherId fencingToken, Collection recoveredJobs, - JobGraphWriter jobGraphWriter) { + Collection globallyTerminatedJobs, + JobGraphWriter jobGraphWriter, + JobResultStore jobResultStore) { final List recoveredJobIds = getRecoveredJobIds(recoveredJobs); @@ -94,6 +98,7 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create( rpcService, fencingToken, recoveredJobs, + globallyTerminatedJobs, (dispatcherGateway, scheduledExecutor, errorHandler) -> new ApplicationDispatcherBootstrap( application, @@ -102,8 +107,8 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create( dispatcherGateway, scheduledExecutor, errorHandler), - PartialDispatcherServicesWithJobGraphStore.from( - partialDispatcherServices, jobGraphWriter)); + PartialDispatcherServicesWithJobPersistenceComponents.from( + partialDispatcherServices, jobGraphWriter, jobResultStore)); } catch (Exception e) { throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherLeaderProcessFactoryFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherLeaderProcessFactoryFactory.java index 2c1ac37187f23..03b94a88ef9e2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherLeaderProcessFactoryFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherLeaderProcessFactoryFactory.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactory; import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactoryFactory; import org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcessFactory; -import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory; +import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -59,7 +59,7 @@ private ApplicationDispatcherLeaderProcessFactoryFactory( @Override public DispatcherLeaderProcessFactory createFactory( - JobGraphStoreFactory jobGraphStoreFactory, + JobPersistenceComponentFactory jobPersistenceComponentFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices, @@ -74,7 +74,10 @@ public DispatcherLeaderProcessFactory createFactory( partialDispatcherServices); return new SessionDispatcherLeaderProcessFactory( - dispatcherServiceFactory, jobGraphStoreFactory, ioExecutor, fatalErrorHandler); + dispatcherServiceFactory, + jobPersistenceComponentFactory, + ioExecutor, + fatalErrorHandler); } public static ApplicationDispatcherLeaderProcessFactoryFactory create( diff --git a/flink-core/src/main/java/org/apache/flink/util/function/QuintFunction.java b/flink-core/src/main/java/org/apache/flink/util/function/QuintFunction.java new file mode 100644 index 0000000000000..327dc865b1872 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/function/QuintFunction.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.function; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Function which takes three arguments. + * + * @param type of the first argument + * @param type of the second argument + * @param type of the third argument + * @param type of the fourth argument + * @param type of the fifth argument + * @param type of the return value + */ +@PublicEvolving +@FunctionalInterface +public interface QuintFunction { + + /** + * Applies this function to the given arguments. + * + * @param s the first function argument + * @param t the second function argument + * @param u the third function argument + * @param v the fourth function argument + * @param w the fifth function argument + * @return the function result + */ + R apply(S s, T t, U u, V v, W w); +} diff --git a/flink-core/src/main/java/org/apache/flink/util/function/QuintFunctionWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/QuintFunctionWithException.java new file mode 100644 index 0000000000000..aa93f8b2db88b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/function/QuintFunctionWithException.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.function; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.ExceptionUtils; + +/** + * Function which takes five arguments. + * + * @param type of the first argument + * @param type of the second argument + * @param type of the third argument + * @param type of the four argument + * @param type of the five argument + * @param type of the return value + * @param type of the thrown exception + */ +@PublicEvolving +@FunctionalInterface +public interface QuintFunctionWithException { + + /** + * Applies this function to the given arguments. + * + * @param s the first function argument + * @param t the second function argument + * @param u the third function argument + * @param v the four function argument + * @param w the five function argument + * @return the function result + * @throws E if it fails + */ + R apply(S s, T t, U u, V v, W w) throws E; + + /** + * Convert at {@link QuintFunctionWithException} into a {@link QuintFunction}. + * + * @param quintFunctionWithException function with exception to convert into a function + * @param first input type + * @param second input type + * @param third input type + * @param fourth input type + * @param fifth input type + * @param output type + * @return {@link QuintFunction} which throws all checked exception as an unchecked exception. + */ + static QuintFunction unchecked( + QuintFunctionWithException quintFunctionWithException) { + return (A a, B b, C c, D d, E e) -> { + try { + return quintFunctionWithException.apply(a, b, c, d, e); + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + // we need this to appease the compiler :-( + return null; + } + }; + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java index f8a661f9a085f..2b39904cea375 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java @@ -71,8 +71,7 @@ public KubernetesCheckpointRecoveryFactory( @Override public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( - JobID jobID, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) - throws Exception { + JobID jobID, int maxNumberOfCheckpointsToRetain) throws Exception { final String configMapName = getConfigMapNameFunction.apply(jobID); return KubernetesUtils.createCompletedCheckpointStore( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 93aeab48f1206..6989a52dd44f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -846,7 +846,8 @@ public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) { try { FileUtils.deleteDirectory(jobDir); - // NOTE: Instead of going through blobExpiryTimes, keep lingering entries - they + // NOTE on why blobExpiryTimes are not cleaned up: + // Instead of going through blobExpiryTimes, keep lingering entries - they // will be cleaned up by the timer task which tolerates non-existing files // If inserted again with the same IDs (via put()), the TTL will be updated // again. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java index 88c2a5689d604..81f54b1687965 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java @@ -19,6 +19,10 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; + +import org.slf4j.Logger; /** A factory for per Job checkpoint recovery components. */ public interface CheckpointRecoveryFactory { @@ -30,12 +34,43 @@ public interface CheckpointRecoveryFactory { * * @param jobId Job ID to recover checkpoints for * @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to retain - * @param userClassLoader User code class loader of the job * @return {@link CompletedCheckpointStore} instance for the job */ CompletedCheckpointStore createRecoveredCompletedCheckpointStore( - JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) - throws Exception; + JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception; + + /** + * Instantiates the {@link CompletedCheckpointStore} based on the passed {@code Configuration}. + * + * @param jobId The {@code JobID} for which the {@code CompletedCheckpointStore} shall be + * created. + * @param config The {@code Configuration} that shall be used (see {@link + * CheckpointingOptions#MAX_RETAINED_CHECKPOINTS}. + * @param logger The logger that shall be used internally. + * @return The {@code CompletedCheckpointStore} instance for the given {@code Job}. + * @throws Exception if an error occurs while instantiating the {@code + * CompletedCheckpointStore}. + */ + default CompletedCheckpointStore createRecoveredCompletedCheckpointStore( + JobID jobId, Configuration config, Logger logger) throws Exception { + int maxNumberOfCheckpointsToRetain = + config.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS); + + if (maxNumberOfCheckpointsToRetain <= 0) { + // warning and use 1 as the default value if the setting in + // state.checkpoints.max-retained-checkpoints is not greater than 0. + logger.warn( + "The setting for '{} : {}' is invalid. Using default value of {}", + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), + maxNumberOfCheckpointsToRetain, + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()); + + maxNumberOfCheckpointsToRetain = + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue(); + } + + return this.createRecoveredCompletedCheckpointStore(jobId, maxNumberOfCheckpointsToRetain); + } /** * Creates a {@link CheckpointIDCounter} instance for a job. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java index 5b91bb8974ae7..02b3e95336f6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java @@ -68,7 +68,7 @@ public PerJobCheckpointRecoveryFactory( @Override public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( - JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) { + JobID jobId, int maxNumberOfCheckpointsToRetain) { return store.compute( jobId, (key, previous) -> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java index c323256ca3d86..8ad1849f03bd5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java @@ -26,8 +26,7 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa @Override public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( - JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) - throws Exception { + JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception { return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java index 6d66594739f1d..e2fbaa17b5595 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java @@ -47,8 +47,7 @@ public ZooKeeperCheckpointRecoveryFactory( @Override public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( - JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) - throws Exception { + JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception { return ZooKeeperUtils.createCompletedCheckpoints( ZooKeeperUtils.useNamespaceAndEnsurePath( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java index a971e69a456d6..f0cd6582727f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java @@ -19,6 +19,10 @@ package org.apache.flink.runtime.clusterframework; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava30.com.google.common.collect.BiMap; +import org.apache.flink.shaded.guava30.com.google.common.collect.HashBiMap; /** The status of an application. */ public enum ApplicationStatus { @@ -37,6 +41,16 @@ public enum ApplicationStatus { // ------------------------------------------------------------------------ + private static final BiMap JOB_STATUS_APPLICATION_STATUS_BI_MAP = + HashBiMap.create(); + + static { + // only globally-terminated JobStatus have a corresponding ApplicationStatus + JOB_STATUS_APPLICATION_STATUS_BI_MAP.put(JobStatus.FAILED, ApplicationStatus.FAILED); + JOB_STATUS_APPLICATION_STATUS_BI_MAP.put(JobStatus.CANCELED, ApplicationStatus.CANCELED); + JOB_STATUS_APPLICATION_STATUS_BI_MAP.put(JobStatus.FINISHED, ApplicationStatus.SUCCEEDED); + } + /** The associated process exit code */ private final int processExitCode; @@ -59,20 +73,20 @@ public int processExitCode() { * #UNKNOWN}. */ public static ApplicationStatus fromJobStatus(JobStatus jobStatus) { - if (jobStatus == null) { + if (jobStatus == null || !JOB_STATUS_APPLICATION_STATUS_BI_MAP.containsKey(jobStatus)) { return UNKNOWN; - } else { - switch (jobStatus) { - case FAILED: - return FAILED; - case CANCELED: - return CANCELED; - case FINISHED: - return SUCCEEDED; - - default: - return UNKNOWN; - } } + + return JOB_STATUS_APPLICATION_STATUS_BI_MAP.get(jobStatus); + } + + public static JobStatus fromApplicationStatus(ApplicationStatus applicationStatus) { + Preconditions.checkNotNull(applicationStatus, "ApplicationStatus must not be null"); + if (JOB_STATUS_APPLICATION_STATUS_BI_MAP.inverse().containsKey(applicationStatus)) { + throw new UnsupportedOperationException( + applicationStatus.name() + " cannot be mapped to a JobStatus."); + } + + return JOB_STATUS_APPLICATION_STATUS_BI_MAP.inverse().get(applicationStatus); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/CheckpointIDCounterCleanup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/CheckpointIDCounterCleanup.java new file mode 100644 index 0000000000000..1884dac7b3e65 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/CheckpointIDCounterCleanup.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.util.Preconditions; + +/** + * {@code JobCheckpointIDCounterCleanup} cleans up the checkpoint ID counter metadata of given jobs. + * + * @see org.apache.flink.runtime.checkpoint.CheckpointIDCounter + */ +// TODO: no tests provided, yet +public class CheckpointIDCounterCleanup { + + private final CheckpointRecoveryFactory checkpointRecoveryFactory; + + public CheckpointIDCounterCleanup(CheckpointRecoveryFactory checkpointRecoveryFactory) { + this.checkpointRecoveryFactory = + Preconditions.checkNotNull(checkpointRecoveryFactory, "CheckpointRecoveryFactory"); + } + + public void cleanupJobData(JobID jobId, JobStatus jobStatus) throws Exception { + checkpointRecoveryFactory.createCheckpointIDCounter(jobId).shutdown(jobStatus); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/CompletedCheckpointStoreCleanup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/CompletedCheckpointStoreCleanup.java new file mode 100644 index 0000000000000..d16b7763489bc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/CompletedCheckpointStoreCleanup.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.SharedStateRegistryFactory; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executor; + +/** + * {@code JobCheckpointStoreCleanup} cleans up left-over checkpoint data of given jobs. + * + * @see CompletedCheckpointStore + */ +// TODO: no tests provided, yet +public class CompletedCheckpointStoreCleanup { + + private static final Logger logger = + LoggerFactory.getLogger(CompletedCheckpointStoreCleanup.class); + + private final CheckpointsCleaner checkpointsCleaner; + private final Configuration jobManagerConfiguration; + + private final CheckpointRecoveryFactory checkpointRecoveryFactory; + private final SharedStateRegistryFactory sharedStateRegistryFactory; + + private final Executor executor; + + public CompletedCheckpointStoreCleanup( + CheckpointsCleaner checkpointsCleaner, + Configuration jobManagerConfiguration, + CheckpointRecoveryFactory checkpointRecoveryFactory, + SharedStateRegistryFactory sharedStateRegistryFactory, + Executor executor) { + this.checkpointsCleaner = + Preconditions.checkNotNull(checkpointsCleaner, "CheckpointsCleaner"); + this.jobManagerConfiguration = + Preconditions.checkNotNull(jobManagerConfiguration, "JobManagerConfiguration"); + this.checkpointRecoveryFactory = + Preconditions.checkNotNull(checkpointRecoveryFactory, "CheckpointRecoveryFactory"); + this.sharedStateRegistryFactory = + Preconditions.checkNotNull( + sharedStateRegistryFactory, "SharedStateRegistryFactory"); + this.executor = Preconditions.checkNotNull(executor, "Executor"); + } + + public void cleanupJobData(JobID jobId, JobStatus jobStatus) throws Exception { + final CompletedCheckpointStore completedCheckpointStore = + checkpointRecoveryFactory.createRecoveredCompletedCheckpointStore( + jobId, jobManagerConfiguration, logger); + + createSharedStateFromCheckpoints(completedCheckpointStore, jobStatus); + } + + private void createSharedStateFromCheckpoints( + CompletedCheckpointStore completedCheckpointStore, JobStatus jobStatus) + throws Exception { + try (SharedStateRegistry sharedStateRegistry = + sharedStateRegistryFactory.create(executor)) { + this.collectSharedStateFromCheckpoints(completedCheckpointStore, sharedStateRegistry); + + completedCheckpointStore.shutdown(jobStatus, checkpointsCleaner); + } + } + + private void collectSharedStateFromCheckpoints( + CompletedCheckpointStore completedCheckpointStore, + SharedStateRegistry sharedStateRegistry) + throws Exception { + // register all (shared) states from the checkpoint store with the new registry + for (CompletedCheckpoint completedCheckpoint : + completedCheckpointStore.getAllCheckpoints()) { + completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 44a712bcde7a1..671d023ef1101 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -26,7 +26,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.Checkpoints; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.client.DuplicateJobSubmissionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -66,6 +68,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcServiceUtils; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -122,6 +125,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint recoveredJobs; + private final Collection globallyTerminatedJobs; + private final DispatcherBootstrapFactory dispatcherBootstrapFactory; private final ExecutionGraphInfoStore executionGraphInfoStore; @@ -154,6 +159,7 @@ public Dispatcher( RpcService rpcService, DispatcherId fencingToken, Collection recoveredJobs, + Collection globallyTerminatedJobs, DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { @@ -194,6 +200,8 @@ public Dispatcher( this.recoveredJobs = new HashSet<>(recoveredJobs); + this.globallyTerminatedJobs = new HashSet<>(globallyTerminatedJobs); + this.dispatcherCachedOperationsHandler = new DispatcherCachedOperationsHandler( dispatcherServices.getOperationCaches(), @@ -226,6 +234,8 @@ public void onStart() throws Exception { } startRecoveredJobs(); + cleanupDirtyJobs(); + this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create( getSelfGateway(DispatcherGateway.class), @@ -819,16 +829,7 @@ private CompletableFuture removeJob(JobID jobId, CleanupJobState cleanupJo */ private boolean cleanUpJobGraph(JobID jobId, boolean cleanupHA) { if (cleanupHA) { - try { - jobGraphWriter.removeJobGraph(jobId); - return true; - } catch (Exception e) { - log.warn( - "Could not properly remove job {} from submitted job graph store.", - jobId, - e); - return false; - } + return cleanupJobGraph(jobId); } try { jobGraphWriter.releaseJobGraph(jobId); @@ -838,24 +839,138 @@ private boolean cleanUpJobGraph(JobID jobId, boolean cleanupHA) { return false; } + private void cleanupDirtyJobs() { + CheckpointRecoveryFactory checkpointRecoveryFactory; + try { + checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); + } catch (Exception e) { + log.warn( + "Cleanup of dirty jobs couldn't be performed due to a failure during CheckpointRecoveryFactory initialization.", + e); + return; + } + + final CheckpointIDCounterCleanup checkpointIDCounterCleanup = + new CheckpointIDCounterCleanup(checkpointRecoveryFactory); + final CompletedCheckpointStoreCleanup completedCheckpointStoreCleanup = + new CompletedCheckpointStoreCleanup( + new CheckpointsCleaner(), + configuration, + checkpointRecoveryFactory, + SharedStateRegistry.DEFAULT_FACTORY, + ioExecutor); + for (JobResult jobResult : globallyTerminatedJobs) { + cleanupDirtyJobDataAsync( + jobResult.getJobId(), + ApplicationStatus.fromApplicationStatus(jobResult.getApplicationStatus()), + checkpointIDCounterCleanup, + completedCheckpointStoreCleanup); + } + } + + private void cleanupDirtyJobDataAsync( + JobID jobId, + JobStatus jobStatus, + CheckpointIDCounterCleanup checkpointIDCounterCleanup, + CompletedCheckpointStoreCleanup completedCheckpointStoreCleanup) { + final List> cleanupTaskResults = new ArrayList<>(); + + cleanupTaskResults.add( + CompletableFuture.supplyAsync(() -> cleanupJobGraph(jobId), ioExecutor)); + + cleanupTaskResults.add( + CompletableFuture.supplyAsync( + () -> { + try { + checkpointIDCounterCleanup.cleanupJobData(jobId, jobStatus); + return true; + } catch (Exception e) { + log.warn( + "An error occurred while cleaning up the CheckpointIDCounter for job {}.", + jobId, + e); + return false; + } + })); + + cleanupTaskResults.add( + CompletableFuture.supplyAsync( + () -> { + try { + completedCheckpointStoreCleanup.cleanupJobData(jobId, jobStatus); + return true; + } catch (Exception e) { + log.warn( + "An error occurred while cleaning up the CompletedCheckpointStore for job {}.", + jobId, + e); + return false; + } + })); + + cleanupTaskResults.add( + CompletableFuture.supplyAsync( + () -> cleanupHighAvailabilityServices(jobId), ioExecutor)); + + cleanupTaskResults.add( + CompletableFuture.supplyAsync(() -> cleanupBlobServer(jobId, true), ioExecutor)); + + FutureUtils.combineAll(cleanupTaskResults) + .thenAccept( + cleanupResults -> { + boolean cleanupSuccessful = + cleanupResults.stream().mapToInt(result -> result ? 0 : 1).sum() + == 0; + if (cleanupSuccessful) { + markJobAsClean(jobId); + } + }); + } + private void cleanUpRemainingJobData(JobID jobId, boolean jobGraphRemoved) { jobManagerMetricGroup.removeJob(jobId); + + boolean remainingJobDataCleaned = jobGraphRemoved; if (jobGraphRemoved) { - try { - highAvailabilityServices.cleanupJobData(jobId); - } catch (Exception e) { - log.warn( - "Could not properly clean data for job {} stored by ha services", jobId, e); - } + remainingJobDataCleaned = cleanupHighAvailabilityServices(jobId); } - blobServer.cleanupJob(jobId, jobGraphRemoved); - if (jobGraphRemoved) { - try { - jobResultStore.markResultAsClean(jobId); - } catch (IOException e) { - log.warn("Could not properly mark job {} result as clean.", jobId, e); - } + remainingJobDataCleaned &= cleanupBlobServer(jobId, jobGraphRemoved); + + if (remainingJobDataCleaned) { + markJobAsClean(jobId); + } + } + + private boolean cleanupJobGraph(JobID jobId) { + try { + jobGraphWriter.removeJobGraph(jobId); + return true; + } catch (Exception e) { + log.warn("Could not properly remove job {} from submitted job graph store.", jobId, e); + return false; + } + } + + private boolean cleanupBlobServer(JobID jobId, boolean jobGraphRemoved) { + return blobServer.cleanupJob(jobId, jobGraphRemoved); + } + + private boolean cleanupHighAvailabilityServices(JobID jobId) { + try { + highAvailabilityServices.cleanupJobData(jobId); + return true; + } catch (Exception e) { + log.warn("Could not properly clean data for job {} stored by ha services", jobId, e); + return false; + } + } + + private void markJobAsClean(JobID jobId) { + try { + jobResultStore.markResultAsClean(jobId); + } catch (IOException e) { + log.warn("Could not properly mark job {} result as clean.", jobId, e); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java index b9a7d7bb7c660..1bfb7e2861e8d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.rpc.RpcService; import java.util.Collection; @@ -31,7 +32,9 @@ Dispatcher createDispatcher( RpcService rpcService, DispatcherId fencingToken, Collection recoveredJobs, + Collection globallyTerminatedJobs, DispatcherBootstrapFactory dispatcherBootstrapFactory, - PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) + PartialDispatcherServicesWithJobPersistenceComponents + partialDispatcherServicesWithJobPersistenceComponents) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java index b18321c44e69a..5e3e32982f551 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java @@ -22,13 +22,14 @@ import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.jobmanager.JobGraphWriter; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.concurrent.Executor; @@ -36,106 +37,108 @@ /** {@link Dispatcher} services container. */ public class DispatcherServices { - @Nonnull private final Configuration configuration; + private final Configuration configuration; - @Nonnull private final HighAvailabilityServices highAvailabilityServices; + private final HighAvailabilityServices highAvailabilityServices; - @Nonnull private final GatewayRetriever resourceManagerGatewayRetriever; + private final GatewayRetriever resourceManagerGatewayRetriever; - @Nonnull private final BlobServer blobServer; + private final BlobServer blobServer; - @Nonnull private final HeartbeatServices heartbeatServices; + private final HeartbeatServices heartbeatServices; - @Nonnull private final JobManagerMetricGroup jobManagerMetricGroup; + private final JobManagerMetricGroup jobManagerMetricGroup; - @Nonnull private final ExecutionGraphInfoStore executionGraphInfoStore; + private final ExecutionGraphInfoStore executionGraphInfoStore; - @Nonnull private final FatalErrorHandler fatalErrorHandler; + private final FatalErrorHandler fatalErrorHandler; - @Nonnull private final HistoryServerArchivist historyServerArchivist; + private final HistoryServerArchivist historyServerArchivist; @Nullable private final String metricQueryServiceAddress; - @Nonnull private final DispatcherOperationCaches operationCaches; + private final DispatcherOperationCaches operationCaches; - @Nonnull private final JobGraphWriter jobGraphWriter; + private final JobGraphWriter jobGraphWriter; - @Nonnull private final JobManagerRunnerFactory jobManagerRunnerFactory; + private final JobResultStore jobResultStore; - @Nonnull private final Executor ioExecutor; + private final JobManagerRunnerFactory jobManagerRunnerFactory; + + private final Executor ioExecutor; DispatcherServices( - @Nonnull Configuration configuration, - @Nonnull HighAvailabilityServices highAvailabilityServices, - @Nonnull GatewayRetriever resourceManagerGatewayRetriever, - @Nonnull BlobServer blobServer, - @Nonnull HeartbeatServices heartbeatServices, - @Nonnull ExecutionGraphInfoStore executionGraphInfoStore, - @Nonnull FatalErrorHandler fatalErrorHandler, - @Nonnull HistoryServerArchivist historyServerArchivist, + Configuration configuration, + HighAvailabilityServices highAvailabilityServices, + GatewayRetriever resourceManagerGatewayRetriever, + BlobServer blobServer, + HeartbeatServices heartbeatServices, + ExecutionGraphInfoStore executionGraphInfoStore, + FatalErrorHandler fatalErrorHandler, + HistoryServerArchivist historyServerArchivist, @Nullable String metricQueryServiceAddress, - @Nonnull DispatcherOperationCaches operationCaches, - @Nonnull JobManagerMetricGroup jobManagerMetricGroup, - @Nonnull JobGraphWriter jobGraphWriter, - @Nonnull JobManagerRunnerFactory jobManagerRunnerFactory, - @Nonnull Executor ioExecutor) { - this.configuration = configuration; - this.highAvailabilityServices = highAvailabilityServices; - this.resourceManagerGatewayRetriever = resourceManagerGatewayRetriever; - this.blobServer = blobServer; - this.heartbeatServices = heartbeatServices; - this.executionGraphInfoStore = executionGraphInfoStore; - this.fatalErrorHandler = fatalErrorHandler; - this.historyServerArchivist = historyServerArchivist; + DispatcherOperationCaches operationCaches, + JobManagerMetricGroup jobManagerMetricGroup, + JobGraphWriter jobGraphWriter, + JobResultStore jobResultStore, + JobManagerRunnerFactory jobManagerRunnerFactory, + Executor ioExecutor) { + this.configuration = Preconditions.checkNotNull(configuration, "Configuration"); + this.highAvailabilityServices = + Preconditions.checkNotNull(highAvailabilityServices, "HighAvailabilityServices"); + this.resourceManagerGatewayRetriever = + Preconditions.checkNotNull( + resourceManagerGatewayRetriever, "ResourceManagerGatewayRetriever"); + this.blobServer = Preconditions.checkNotNull(blobServer, "BlobServer"); + this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices, "HeartBeatServices"); + this.executionGraphInfoStore = + Preconditions.checkNotNull(executionGraphInfoStore, "ExecutionGraphInfoStore"); + this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler, "FatalErrorHandler"); + this.historyServerArchivist = + Preconditions.checkNotNull(historyServerArchivist, "HistoryServerArchivist"); this.metricQueryServiceAddress = metricQueryServiceAddress; - this.operationCaches = operationCaches; - this.jobManagerMetricGroup = jobManagerMetricGroup; - this.jobGraphWriter = jobGraphWriter; - this.jobManagerRunnerFactory = jobManagerRunnerFactory; - this.ioExecutor = ioExecutor; + this.operationCaches = Preconditions.checkNotNull(operationCaches, "OperationCaches"); + this.jobManagerMetricGroup = + Preconditions.checkNotNull(jobManagerMetricGroup, "JobManagerMetricGroup"); + this.jobGraphWriter = Preconditions.checkNotNull(jobGraphWriter, "JobGraphWriter"); + this.jobResultStore = Preconditions.checkNotNull(jobResultStore, "JobResultStore"); + this.jobManagerRunnerFactory = + Preconditions.checkNotNull(jobManagerRunnerFactory, "JobManagerRunnerFactory"); + this.ioExecutor = Preconditions.checkNotNull(ioExecutor, "IOExecutor"); } - @Nonnull public Configuration getConfiguration() { return configuration; } - @Nonnull public HighAvailabilityServices getHighAvailabilityServices() { return highAvailabilityServices; } - @Nonnull public GatewayRetriever getResourceManagerGatewayRetriever() { return resourceManagerGatewayRetriever; } - @Nonnull public BlobServer getBlobServer() { return blobServer; } - @Nonnull public HeartbeatServices getHeartbeatServices() { return heartbeatServices; } - @Nonnull public JobManagerMetricGroup getJobManagerMetricGroup() { return jobManagerMetricGroup; } - @Nonnull public ExecutionGraphInfoStore getArchivedExecutionGraphStore() { return executionGraphInfoStore; } - @Nonnull public FatalErrorHandler getFatalErrorHandler() { return fatalErrorHandler; } - @Nonnull public HistoryServerArchivist getHistoryServerArchivist() { return historyServerArchivist; } @@ -145,47 +148,50 @@ public String getMetricQueryServiceAddress() { return metricQueryServiceAddress; } - @Nonnull public DispatcherOperationCaches getOperationCaches() { return operationCaches; } - @Nonnull public JobGraphWriter getJobGraphWriter() { return jobGraphWriter; } - @Nonnull + public JobResultStore getJobResultStore() { + return jobResultStore; + } + JobManagerRunnerFactory getJobManagerRunnerFactory() { return jobManagerRunnerFactory; } - @Nonnull public Executor getIoExecutor() { return ioExecutor; } public static DispatcherServices from( - @Nonnull - PartialDispatcherServicesWithJobGraphStore - partialDispatcherServicesWithJobGraphStore, - @Nonnull JobManagerRunnerFactory jobManagerRunnerFactory) { + PartialDispatcherServicesWithJobPersistenceComponents + partialDispatcherServicesWithJobPersistenceComponents, + JobManagerRunnerFactory jobManagerRunnerFactory) { return new DispatcherServices( - partialDispatcherServicesWithJobGraphStore.getConfiguration(), - partialDispatcherServicesWithJobGraphStore.getHighAvailabilityServices(), - partialDispatcherServicesWithJobGraphStore.getResourceManagerGatewayRetriever(), - partialDispatcherServicesWithJobGraphStore.getBlobServer(), - partialDispatcherServicesWithJobGraphStore.getHeartbeatServices(), - partialDispatcherServicesWithJobGraphStore.getArchivedExecutionGraphStore(), - partialDispatcherServicesWithJobGraphStore.getFatalErrorHandler(), - partialDispatcherServicesWithJobGraphStore.getHistoryServerArchivist(), - partialDispatcherServicesWithJobGraphStore.getMetricQueryServiceAddress(), - partialDispatcherServicesWithJobGraphStore.getOperationCaches(), - partialDispatcherServicesWithJobGraphStore + partialDispatcherServicesWithJobPersistenceComponents.getConfiguration(), + partialDispatcherServicesWithJobPersistenceComponents.getHighAvailabilityServices(), + partialDispatcherServicesWithJobPersistenceComponents + .getResourceManagerGatewayRetriever(), + partialDispatcherServicesWithJobPersistenceComponents.getBlobServer(), + partialDispatcherServicesWithJobPersistenceComponents.getHeartbeatServices(), + partialDispatcherServicesWithJobPersistenceComponents + .getArchivedExecutionGraphStore(), + partialDispatcherServicesWithJobPersistenceComponents.getFatalErrorHandler(), + partialDispatcherServicesWithJobPersistenceComponents.getHistoryServerArchivist(), + partialDispatcherServicesWithJobPersistenceComponents + .getMetricQueryServiceAddress(), + partialDispatcherServicesWithJobPersistenceComponents.getOperationCaches(), + partialDispatcherServicesWithJobPersistenceComponents .getJobManagerMetricGroupFactory() .create(), - partialDispatcherServicesWithJobGraphStore.getJobGraphWriter(), + partialDispatcherServicesWithJobPersistenceComponents.getJobGraphWriter(), + partialDispatcherServicesWithJobPersistenceComponents.getJobResultStore(), jobManagerRunnerFactory, - partialDispatcherServicesWithJobGraphStore.getIoExecutor()); + partialDispatcherServicesWithJobPersistenceComponents.getIoExecutor()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java index 82c225b3a554b..75641f76769ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; @@ -38,13 +39,15 @@ public MiniDispatcher createDispatcher( RpcService rpcService, DispatcherId fencingToken, Collection recoveredJobs, + Collection globallyTerminatedJobs, DispatcherBootstrapFactory dispatcherBootstrapFactory, - PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) + PartialDispatcherServicesWithJobPersistenceComponents + partialDispatcherServicesWithJobPersistenceComponents) throws Exception { final JobGraph jobGraph = Iterables.getOnlyElement(recoveredJobs); final Configuration configuration = - partialDispatcherServicesWithJobGraphStore.getConfiguration(); + partialDispatcherServicesWithJobPersistenceComponents.getConfiguration(); final String executionModeValue = configuration.getString(INTERNAL_CLUSTER_EXECUTION_MODE); final ClusterEntrypoint.ExecutionMode executionMode = ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue); @@ -53,7 +56,7 @@ public MiniDispatcher createDispatcher( rpcService, fencingToken, DispatcherServices.from( - partialDispatcherServicesWithJobGraphStore, + partialDispatcherServicesWithJobPersistenceComponents, JobMasterServiceLeadershipRunnerFactory.INSTANCE), jobGraph, dispatcherBootstrapFactory, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java index 83f173db8c750..dda8e6da88350 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java @@ -63,6 +63,7 @@ public MiniDispatcher( rpcService, fencingToken, Collections.singleton(jobGraph), + Collections.emptyList(), dispatcherBootstrapFactory, dispatcherServices); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java similarity index 67% rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java index c3fbecdc9c2b2..c6fe7b7eaa579 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java @@ -22,35 +22,38 @@ import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.jobmanager.JobGraphWriter; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.concurrent.Executor; /** {@link DispatcherFactory} services container. */ -public class PartialDispatcherServicesWithJobGraphStore extends PartialDispatcherServices { +public class PartialDispatcherServicesWithJobPersistenceComponents + extends PartialDispatcherServices { - @Nonnull private final JobGraphWriter jobGraphWriter; + private final JobGraphWriter jobGraphWriter; + private final JobResultStore jobResultStore; - private PartialDispatcherServicesWithJobGraphStore( - @Nonnull Configuration configuration, - @Nonnull HighAvailabilityServices highAvailabilityServices, - @Nonnull GatewayRetriever resourceManagerGatewayRetriever, - @Nonnull BlobServer blobServer, - @Nonnull HeartbeatServices heartbeatServices, - @Nonnull JobManagerMetricGroupFactory jobManagerMetricGroupFactory, - @Nonnull ExecutionGraphInfoStore executionGraphInfoStore, - @Nonnull FatalErrorHandler fatalErrorHandler, - @Nonnull HistoryServerArchivist historyServerArchivist, + private PartialDispatcherServicesWithJobPersistenceComponents( + Configuration configuration, + HighAvailabilityServices highAvailabilityServices, + GatewayRetriever resourceManagerGatewayRetriever, + BlobServer blobServer, + HeartbeatServices heartbeatServices, + JobManagerMetricGroupFactory jobManagerMetricGroupFactory, + ExecutionGraphInfoStore executionGraphInfoStore, + FatalErrorHandler fatalErrorHandler, + HistoryServerArchivist historyServerArchivist, @Nullable String metricQueryServiceAddress, - @Nonnull Executor ioExecutor, - @Nonnull DispatcherOperationCaches operationCaches, - @Nonnull JobGraphWriter jobGraphWriter) { + Executor ioExecutor, + DispatcherOperationCaches operationCaches, + JobGraphWriter jobGraphWriter, + JobResultStore jobResultStore) { super( configuration, highAvailabilityServices, @@ -65,16 +68,22 @@ private PartialDispatcherServicesWithJobGraphStore( ioExecutor, operationCaches); this.jobGraphWriter = jobGraphWriter; + this.jobResultStore = jobResultStore; } - @Nonnull public JobGraphWriter getJobGraphWriter() { return jobGraphWriter; } - public static PartialDispatcherServicesWithJobGraphStore from( - PartialDispatcherServices partialDispatcherServices, JobGraphWriter jobGraphWriter) { - return new PartialDispatcherServicesWithJobGraphStore( + public JobResultStore getJobResultStore() { + return jobResultStore; + } + + public static PartialDispatcherServicesWithJobPersistenceComponents from( + PartialDispatcherServices partialDispatcherServices, + JobGraphWriter jobGraphWriter, + JobResultStore jobResultStore) { + return new PartialDispatcherServicesWithJobPersistenceComponents( partialDispatcherServices.getConfiguration(), partialDispatcherServices.getHighAvailabilityServices(), partialDispatcherServices.getResourceManagerGatewayRetriever(), @@ -87,6 +96,7 @@ public static PartialDispatcherServicesWithJobGraphStore from( partialDispatcherServices.getMetricQueryServiceAddress(), partialDispatcherServices.getIoExecutor(), partialDispatcherServices.getOperationCaches(), - jobGraphWriter); + jobGraphWriter, + jobResultStore); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java index 55d92efa366ec..bf41f51d4431e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.rpc.RpcService; import java.util.Collection; @@ -32,17 +33,20 @@ public StandaloneDispatcher createDispatcher( RpcService rpcService, DispatcherId fencingToken, Collection recoveredJobs, + Collection globallyTerminatedJobs, DispatcherBootstrapFactory dispatcherBootstrapFactory, - PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) + PartialDispatcherServicesWithJobPersistenceComponents + partialDispatcherServicesWithJobPersistenceComponents) throws Exception { // create the default dispatcher return new StandaloneDispatcher( rpcService, fencingToken, recoveredJobs, + globallyTerminatedJobs, dispatcherBootstrapFactory, DispatcherServices.from( - partialDispatcherServicesWithJobGraphStore, + partialDispatcherServicesWithJobPersistenceComponents, JobMasterServiceLeadershipRunnerFactory.INSTANCE)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java index 1763bb75fc494..36886ef1ef145 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java @@ -20,6 +20,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.rpc.RpcService; import java.util.Collection; @@ -34,6 +35,7 @@ public StandaloneDispatcher( RpcService rpcService, DispatcherId fencingToken, Collection recoveredJobs, + Collection globallyTerminatedJobs, DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { @@ -41,6 +43,7 @@ public StandaloneDispatcher( rpcService, fencingToken, recoveredJobs, + globallyTerminatedJobs, dispatcherBootstrapFactory, dispatcherServices); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java index b5bf60f19a7bb..70b6033561129 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java @@ -24,8 +24,10 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.util.AutoCloseableAsync; @@ -176,16 +178,14 @@ private void handleUnexpectedDispatcherServiceTermination( createdDispatcherService .getTerminationFuture() .whenComplete( - (ignored, throwable) -> { - runIfStateIs( - State.RUNNING, - () -> { - handleError( - new FlinkException( - "Unexpected termination of DispatcherService.", - throwable)); - }); - }); + (ignored, throwable) -> + runIfStateIs( + State.RUNNING, + () -> + handleError( + new FlinkException( + "Unexpected termination of DispatcherService.", + throwable)))); } final Optional supplyUnsynchronizedIfRunning(Supplier supplier) { @@ -259,7 +259,9 @@ public interface DispatcherGatewayServiceFactory { DispatcherGatewayService create( DispatcherId fencingToken, Collection recoveredJobs, - JobGraphWriter jobGraphWriter); + Collection globallyTerminatedJobs, + JobGraphWriter jobGraphWriter, + JobResultStore jobResultStore); } /** An accessor of the {@link DispatcherGateway}. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java index e521c4a75e635..28469929b83ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java @@ -23,9 +23,11 @@ import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.NoOpDispatcherBootstrap; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; -import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore; +import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobPersistenceComponents; +import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.FlinkRuntimeException; @@ -54,7 +56,9 @@ class DefaultDispatcherGatewayServiceFactory public AbstractDispatcherLeaderProcess.DispatcherGatewayService create( DispatcherId fencingToken, Collection recoveredJobs, - JobGraphWriter jobGraphWriter) { + Collection globallyTerminatedJobs, + JobGraphWriter jobGraphWriter, + JobResultStore jobResultStore) { final Dispatcher dispatcher; try { @@ -63,10 +67,11 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create( rpcService, fencingToken, recoveredJobs, + globallyTerminatedJobs, (dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), - PartialDispatcherServicesWithJobGraphStore.from( - partialDispatcherServices, jobGraphWriter)); + PartialDispatcherServicesWithJobPersistenceComponents.from( + partialDispatcherServices, jobGraphWriter, jobResultStore)); } catch (Exception e) { throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java index 9d5c4d62dd4ff..2050271db1634 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java @@ -21,7 +21,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherFactory; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever; -import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory; +import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -44,7 +44,7 @@ public DefaultDispatcherRunnerFactory( public DispatcherRunner createDispatcherRunner( LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, - JobGraphStoreFactory jobGraphStoreFactory, + JobPersistenceComponentFactory jobPersistenceComponentFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices) @@ -52,7 +52,7 @@ public DispatcherRunner createDispatcherRunner( final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactoryFactory.createFactory( - jobGraphStoreFactory, + jobPersistenceComponentFactory, ioExecutor, rpcService, partialDispatcherServices, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherLeaderProcessFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherLeaderProcessFactoryFactory.java index 6792fa32ecccc..222c349870100 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherLeaderProcessFactoryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherLeaderProcessFactoryFactory.java @@ -20,7 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; -import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory; +import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -31,7 +31,7 @@ public interface DispatcherLeaderProcessFactoryFactory { DispatcherLeaderProcessFactory createFactory( - JobGraphStoreFactory jobGraphStoreFactory, + JobPersistenceComponentFactory jobPersistenceComponentFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerFactory.java index a03be4cf42b48..bebafa2bbf0c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerFactory.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.dispatcher.runner; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; -import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory; +import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -32,7 +32,7 @@ public interface DispatcherRunnerFactory { DispatcherRunner createDispatcherRunner( LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, - JobGraphStoreFactory jobGraphStoreFactory, + JobPersistenceComponentFactory jobPersistenceComponentFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcess.java index f50b396ba0a36..67b4ac401c32b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcess.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.dispatcher.runner; import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.ThrowingJobGraphWriter; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -49,7 +50,9 @@ protected void onStart() { dispatcherGatewayServiceFactory.create( DispatcherId.fromUuid(getLeaderSessionId()), Collections.singleton(jobGraph), - ThrowingJobGraphWriter.INSTANCE); + Collections.emptyList(), + ThrowingJobGraphWriter.INSTANCE, + new EmbeddedJobResultStore()); completeDispatcherSetup(dispatcherService); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java index 7be59105478be..424774c3b4b70 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory; +import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.FlinkException; @@ -42,7 +42,7 @@ private JobDispatcherLeaderProcessFactoryFactory(JobGraphRetriever jobGraphRetri @Override public DispatcherLeaderProcessFactory createFactory( - JobGraphStoreFactory jobGraphStoreFactory, + JobPersistenceComponentFactory jobPersistenceComponentFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java index efc6b2098a997..9c9f749cba681 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java @@ -23,8 +23,10 @@ import org.apache.flink.runtime.dispatcher.Dispatcher; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.util.ExceptionUtils; @@ -33,14 +35,17 @@ import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.FunctionUtils; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * Process which encapsulates the job recovery logic and life cycle management of a {@link @@ -53,6 +58,8 @@ public class SessionDispatcherLeaderProcess extends AbstractDispatcherLeaderProc private final JobGraphStore jobGraphStore; + private final JobResultStore jobResultStore; + private final Executor ioExecutor; private CompletableFuture onGoingRecoveryOperation = FutureUtils.completedVoidFuture(); @@ -61,12 +68,14 @@ private SessionDispatcherLeaderProcess( UUID leaderSessionId, DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory, JobGraphStore jobGraphStore, + JobResultStore jobResultStore, Executor ioExecutor, FatalErrorHandler fatalErrorHandler) { super(leaderSessionId, fatalErrorHandler); this.dispatcherGatewayServiceFactory = dispatcherGatewayServiceFactory; this.jobGraphStore = jobGraphStore; + this.jobResultStore = jobResultStore; this.ioExecutor = ioExecutor; } @@ -75,9 +84,7 @@ protected void onStart() { startServices(); onGoingRecoveryOperation = - recoverJobsAsync() - .thenAccept(this::createDispatcherIfRunning) - .handle(this::onErrorIfRunning); + createDispatcherBasedOnRecoveredJobGraphsAndGloballyTerminatedJobs(); } private void startServices() { @@ -92,34 +99,61 @@ private void startServices() { } } - private void createDispatcherIfRunning(Collection jobGraphs) { - runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs)); + private void createDispatcherIfRunning( + Collection jobGraphs, Collection globallyTerminatedJobs) { + runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs, globallyTerminatedJobs)); } - private void createDispatcher(Collection jobGraphs) { + private void createDispatcher( + Collection jobGraphs, Collection globallyTerminatedJobs) { final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create( - DispatcherId.fromUuid(getLeaderSessionId()), jobGraphs, jobGraphStore); + DispatcherId.fromUuid(getLeaderSessionId()), + jobGraphs, + globallyTerminatedJobs, + jobGraphStore, + jobResultStore); completeDispatcherSetup(dispatcherService); } - private CompletableFuture> recoverJobsAsync() { - return CompletableFuture.supplyAsync(this::recoverJobsIfRunning, ioExecutor); + private CompletableFuture + createDispatcherBasedOnRecoveredJobGraphsAndGloballyTerminatedJobs() { + return CompletableFuture.supplyAsync( + this::getGloballyCompletedJobResultsIfRunning, ioExecutor) + .thenCompose( + globallyTerminatedJobs -> + CompletableFuture.supplyAsync( + () -> + this.recoverJobsIfRunning( + globallyTerminatedJobs.stream() + .map(JobResult::getJobId) + .collect( + Collectors + .toSet())), + ioExecutor) + .thenAccept( + jobGraphs -> + createDispatcherIfRunning( + jobGraphs, globallyTerminatedJobs)) + .handle(this::onErrorIfRunning)); } - private Collection recoverJobsIfRunning() { - return supplyUnsynchronizedIfRunning(this::recoverJobs).orElse(Collections.emptyList()); + private Collection recoverJobsIfRunning(Set globallyTerminatedJobs) { + return supplyUnsynchronizedIfRunning(() -> recoverJobs(globallyTerminatedJobs)) + .orElse(Collections.emptyList()); } - private Collection recoverJobs() { + private Collection recoverJobs(Set globallyTerminatedJobs) { log.info("Recover all persisted job graphs."); final Collection jobIds = getJobIds(); final Collection recoveredJobGraphs = new ArrayList<>(); for (JobID jobId : jobIds) { - recoveredJobGraphs.add(recoverJob(jobId)); + if (!globallyTerminatedJobs.contains(jobId)) { + recoveredJobGraphs.add(recoverJob(jobId)); + } } log.info("Successfully recovered {} persisted job graphs.", recoveredJobGraphs.size()); @@ -145,6 +179,21 @@ private JobGraph recoverJob(JobID jobId) { } } + private Collection getGloballyCompletedJobResultsIfRunning() { + return supplyUnsynchronizedIfRunning(this::getGloballyCompletedJobResults) + .orElse(Collections.emptyList()); + } + + private Collection getGloballyCompletedJobResults() { + try { + return jobResultStore.getDirtyResults(); + } catch (IOException e) { + throw new FlinkRuntimeException( + "Could not retrieve JobResults of globally-terminated jobs from JobResultStore", + e); + } + } + @Override protected CompletableFuture onClose() { return CompletableFuture.runAsync(this::stopServices, ioExecutor); @@ -261,9 +310,15 @@ public static SessionDispatcherLeaderProcess create( UUID leaderSessionId, DispatcherGatewayServiceFactory dispatcherFactory, JobGraphStore jobGraphStore, + JobResultStore jobResultStore, Executor ioExecutor, FatalErrorHandler fatalErrorHandler) { return new SessionDispatcherLeaderProcess( - leaderSessionId, dispatcherFactory, jobGraphStore, ioExecutor, fatalErrorHandler); + leaderSessionId, + dispatcherFactory, + jobGraphStore, + jobResultStore, + ioExecutor, + fatalErrorHandler); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactory.java index 95cf4e49e10ee..8c08bb0d0b39c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactory.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.dispatcher.runner; import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory; +import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import java.util.UUID; @@ -31,18 +31,18 @@ public class SessionDispatcherLeaderProcessFactory implements DispatcherLeaderPr private final AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory; - private final JobGraphStoreFactory jobGraphStoreFactory; + private final JobPersistenceComponentFactory jobPersistenceComponentFactory; private final Executor ioExecutor; private final FatalErrorHandler fatalErrorHandler; public SessionDispatcherLeaderProcessFactory( AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory, - JobGraphStoreFactory jobGraphStoreFactory, + JobPersistenceComponentFactory jobPersistenceComponentFactory, Executor ioExecutor, FatalErrorHandler fatalErrorHandler) { this.dispatcherGatewayServiceFactory = dispatcherGatewayServiceFactory; - this.jobGraphStoreFactory = jobGraphStoreFactory; + this.jobPersistenceComponentFactory = jobPersistenceComponentFactory; this.ioExecutor = ioExecutor; this.fatalErrorHandler = fatalErrorHandler; } @@ -52,7 +52,8 @@ public DispatcherLeaderProcess create(UUID leaderSessionID) { return SessionDispatcherLeaderProcess.create( leaderSessionID, dispatcherGatewayServiceFactory, - jobGraphStoreFactory.create(), + jobPersistenceComponentFactory.createJobGraphStore(), + jobPersistenceComponentFactory.createJobResultStore(), ioExecutor, fatalErrorHandler); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactoryFactory.java index 29789650ba4f3..ee2fb081647fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactoryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactoryFactory.java @@ -20,7 +20,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherFactory; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; -import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory; +import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -38,7 +38,7 @@ private SessionDispatcherLeaderProcessFactoryFactory(DispatcherFactory dispatche @Override public DispatcherLeaderProcessFactory createFactory( - JobGraphStoreFactory jobGraphStoreFactory, + JobPersistenceComponentFactory jobPersistenceComponentFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices, @@ -50,7 +50,7 @@ public DispatcherLeaderProcessFactory createFactory( return new SessionDispatcherLeaderProcessFactory( dispatcherGatewayServiceFactory, - jobGraphStoreFactory, + jobPersistenceComponentFactory, ioExecutor, fatalErrorHandler); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java index 83f630ee98a33..53f7a95e4ebf3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.jobmanager.HaServicesJobGraphStoreFactory; +import org.apache.flink.runtime.jobmanager.HaServicesJobPersistenceComponentFactory; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; @@ -218,7 +218,7 @@ public DispatcherResourceManagerComponent create( dispatcherRunnerFactory.createDispatcherRunner( highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, - new HaServicesJobGraphStoreFactory(highAvailabilityServices), + new HaServicesJobPersistenceComponentFactory(highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HaServicesJobGraphStoreFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HaServicesJobPersistenceComponentFactory.java similarity index 59% rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HaServicesJobGraphStoreFactory.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HaServicesJobPersistenceComponentFactory.java index ae8d6ca5cc7a7..05dce66c3aac4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HaServicesJobGraphStoreFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HaServicesJobPersistenceComponentFactory.java @@ -19,28 +19,40 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.SupplierWithException; /** - * {@link JobGraphStoreFactory} implementation which creates a {@link JobGraphStore} using the - * provided {@link HighAvailabilityServices}. + * {@link JobPersistenceComponentFactory} implementation which creates a {@link JobGraphStore} using + * the provided {@link HighAvailabilityServices}. */ -public class HaServicesJobGraphStoreFactory implements JobGraphStoreFactory { +public class HaServicesJobPersistenceComponentFactory implements JobPersistenceComponentFactory { private final HighAvailabilityServices highAvailabilityServices; - public HaServicesJobGraphStoreFactory(HighAvailabilityServices highAvailabilityServices) { + public HaServicesJobPersistenceComponentFactory( + HighAvailabilityServices highAvailabilityServices) { this.highAvailabilityServices = highAvailabilityServices; } @Override - public JobGraphStore create() { + public JobGraphStore createJobGraphStore() { + return create(highAvailabilityServices::getJobGraphStore, JobGraphStore.class); + } + + @Override + public JobResultStore createJobResultStore() { + return create(highAvailabilityServices::getJobResultStore, JobResultStore.class); + } + + private T create(SupplierWithException supplier, Class clazz) { try { - return highAvailabilityServices.getJobGraphStore(); + return supplier.get(); } catch (Exception e) { throw new FlinkRuntimeException( String.format( "Could not create %s from %s.", - JobGraphStore.class.getSimpleName(), + clazz.getSimpleName(), highAvailabilityServices.getClass().getSimpleName()), e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphStoreFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobPersistenceComponentFactory.java similarity index 70% rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphStoreFactory.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobPersistenceComponentFactory.java index 1530509003981..7d7a80bdd3461 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphStoreFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobPersistenceComponentFactory.java @@ -18,13 +18,22 @@ package org.apache.flink.runtime.jobmanager; -/** Factory for {@link JobGraphStore}. */ -public interface JobGraphStoreFactory { +import org.apache.flink.runtime.highavailability.JobResultStore; + +/** Factory for components that are responsible for persisting a job for recovery. */ +public interface JobPersistenceComponentFactory { /** * Creates a {@link JobGraphStore}. * * @return a {@link JobGraphStore} instance */ - JobGraphStore create(); + JobGraphStore createJobGraphStore(); + + /** + * Creates {@link JobResultStore} instances. + * + * @return {@code JobResultStore} instances. + */ + JobResultStore createJobResultStore(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java index 7298fbaa2e36d..a73393f7af1c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; @@ -95,6 +96,7 @@ public SchedulerNG createScheduler( ScheduledExecutorService futureExecutor, ClassLoader userCodeLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, + CheckpointsCleaner checkpointsCleaner, Time rpcTimeout, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, @@ -116,6 +118,7 @@ public SchedulerNG createScheduler( futureExecutor, userCodeLoader, checkpointRecoveryFactory, + checkpointsCleaner, rpcTimeout, blobWriter, jobManagerJobMetricGroup, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java index 4320ae9171ad0..cf4208253bcfc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; @@ -55,6 +56,8 @@ public class JobManagerSharedServices { private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(10); + private final CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner(); + private final ScheduledExecutorService futureExecutor; private final ExecutorService ioExecutor; @@ -100,6 +103,10 @@ public BlobWriter getBlobWriter() { return blobWriter; } + public CheckpointsCleaner getCheckpointsCleaner() { + return checkpointsCleaner; + } + /** * Shutdown the {@link JobMaster} services. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 99cf0a64cf96f..5fc797b92fb9d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -145,6 +146,8 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint private final HighAvailabilityServices highAvailabilityServices; + private final CheckpointsCleaner checkpointsCleaner; + private final BlobWriter blobWriter; private final HeartbeatServices heartbeatServices; @@ -281,6 +284,7 @@ public void onUnknownDeploymentsOf( this.blobWriter = jobManagerSharedServices.getBlobWriter(); this.futureExecutor = jobManagerSharedServices.getFutureExecutor(); this.ioExecutor = jobManagerSharedServices.getIoExecutor(); + this.checkpointsCleaner = jobManagerSharedServices.getCheckpointsCleaner(); this.jobCompletionActions = checkNotNull(jobCompletionActions); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.userCodeLoader = checkNotNull(userCodeLoader); @@ -353,6 +357,7 @@ private SchedulerNG createScheduler( futureExecutor, userCodeLoader, highAvailabilityServices.getCheckpointRecoveryFactory(), + checkpointsCleaner, rpcTimeout, blobWriter, jobManagerJobMetricGroup, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java index b655602b168d2..887a833e6b63c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; @@ -72,6 +73,7 @@ SchedulerNG createScheduler( ScheduledExecutorService futureExecutor, ClassLoader userCodeLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, + CheckpointsCleaner checkpointsCleaner, Time rpcTimeout, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index fdfa2384dda1b..62b0992f6a474 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -142,7 +142,6 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio jobGraph, ioExecutor, jobMasterConfiguration, - userCodeLoader, checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java index ec39bb9d69aa5..3d5a4a304239e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java @@ -60,6 +60,7 @@ public SchedulerNG createInstance( final ScheduledExecutorService futureExecutor, final ClassLoader userCodeLoader, final CheckpointRecoveryFactory checkpointRecoveryFactory, + final CheckpointsCleaner checkpointsCleaner, final Time rpcTimeout, final BlobWriter blobWriter, final JobManagerJobMetricGroup jobManagerJobMetricGroup, @@ -123,7 +124,7 @@ public SchedulerNG createInstance( schedulerComponents.getStartUpAction(), new ScheduledExecutorServiceAdapter(futureExecutor), userCodeLoader, - new CheckpointsCleaner(), + checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, schedulerComponents.getSchedulingStrategyFactory(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 9f195d6103064..262424d4dd7f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -161,7 +161,6 @@ public SchedulerBase( final JobGraph jobGraph, final Executor ioExecutor, final Configuration jobMasterConfiguration, - final ClassLoader userCodeLoader, final CheckpointsCleaner checkpointsCleaner, final CheckpointRecoveryFactory checkpointRecoveryFactory, final JobManagerJobMetricGroup jobManagerJobMetricGroup, @@ -185,7 +184,6 @@ public SchedulerBase( SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled( jobGraph, jobMasterConfiguration, - userCodeLoader, checkNotNull(checkpointRecoveryFactory), log); this.checkpointIdCounter = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java index 27b92b3d8a846..0c03c8da1049d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; @@ -51,6 +52,7 @@ SchedulerNG createInstance( ScheduledExecutorService futureExecutor, ClassLoader userCodeLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, + CheckpointsCleaner checkpointsCleaner, Time rpcTimeout, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java index 481baa3bb8516..99126acd52cd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java @@ -18,9 +18,7 @@ package org.apache.flink.runtime.scheduler; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; @@ -44,15 +42,14 @@ private SchedulerUtils() { public static CompletedCheckpointStore createCompletedCheckpointStoreIfCheckpointingIsEnabled( JobGraph jobGraph, Configuration configuration, - ClassLoader userCodeLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, Logger log) throws JobExecutionException { final JobID jobId = jobGraph.getJobID(); if (DefaultExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { try { - return createCompletedCheckpointStore( - configuration, userCodeLoader, checkpointRecoveryFactory, log, jobId); + return checkpointRecoveryFactory.createRecoveredCompletedCheckpointStore( + jobId, configuration, log); } catch (Exception e) { throw new JobExecutionException( jobId, @@ -64,34 +61,6 @@ public static CompletedCheckpointStore createCompletedCheckpointStoreIfCheckpoin } } - @VisibleForTesting - static CompletedCheckpointStore createCompletedCheckpointStore( - Configuration jobManagerConfig, - ClassLoader classLoader, - CheckpointRecoveryFactory recoveryFactory, - Logger log, - JobID jobId) - throws Exception { - int maxNumberOfCheckpointsToRetain = - jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS); - - if (maxNumberOfCheckpointsToRetain <= 0) { - // warning and use 1 as the default value if the setting in - // state.checkpoints.max-retained-checkpoints is not greater than 0. - log.warn( - "The setting for '{} : {}' is invalid. Using default value of {}", - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), - maxNumberOfCheckpointsToRetain, - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()); - - maxNumberOfCheckpointsToRetain = - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue(); - } - - return recoveryFactory.createRecoveredCompletedCheckpointStore( - jobId, maxNumberOfCheckpointsToRetain, classLoader); - } - public static CheckpointIDCounter createCheckpointIDCounterIfCheckpointingIsEnabled( JobGraph jobGraph, CheckpointRecoveryFactory checkpointRecoveryFactory) throws JobExecutionException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 01014e3aa1b4b..9d3b63a2fd5c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -242,11 +242,7 @@ public AdaptiveScheduler( this.checkpointsCleaner = checkpointsCleaner; this.completedCheckpointStore = SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled( - jobGraph, - configuration, - userCodeClassLoader, - checkpointRecoveryFactory, - LOG); + jobGraph, configuration, checkpointRecoveryFactory, LOG); this.checkpointIdCounter = SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled( jobGraph, checkpointRecoveryFactory); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java index 61236a1c18e7f..694f66d261c7a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java @@ -70,6 +70,7 @@ public SchedulerNG createInstance( ScheduledExecutorService futureExecutor, ClassLoader userCodeLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, + CheckpointsCleaner checkpointsCleaner, Time rpcTimeout, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, @@ -126,7 +127,7 @@ public SchedulerNG createInstance( slotAllocator, ioExecutor, userCodeLoader, - new CheckpointsCleaner(), + checkpointsCleaner, checkpointRecoveryFactory, initialResourceAllocationTimeout, resourceStabilizationTimeout, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactoryTest.java new file mode 100644 index 0000000000000..b1c1f03a93c7a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactoryTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code CheckpointRecoveryFactoryTest} tests the default functionality of {@link + * CheckpointRecoveryFactory}. + */ +public class CheckpointRecoveryFactoryTest { + + private static final Logger log = LoggerFactory.getLogger(CheckpointRecoveryFactoryTest.class); + + @ParameterizedTest(name = "actual: {0}; expected: {1}") + @CsvSource({"10,10", "0,1", "-1,1"}) + public void testMaxRemainingCheckpointsParameterSetting(int actualValue, int expectedValue) + throws Exception { + final JobID expectedJobId = new JobID(); + final Configuration jobManagerConfig = new Configuration(); + jobManagerConfig.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, actualValue); + + final TestCheckpointRecoveryFactory testInstance = new TestCheckpointRecoveryFactory(); + assertThat( + testInstance.createRecoveredCompletedCheckpointStore( + expectedJobId, jobManagerConfig, log)) + .isNull(); + + assertThat(testInstance.getActualJobId()).isEqualTo(expectedJobId); + assertThat(testInstance.getActualMaximumNumberOfRetainedCheckpointsParamValue()) + .isEqualTo(expectedValue); + } + + private static class TestCheckpointRecoveryFactory implements CheckpointRecoveryFactory { + + private JobID actualJobId; + private int actualMaximumNumberOfRetainedCheckpointsParamValue; + + @Nullable + @Override + public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( + JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception { + this.actualJobId = jobId; + this.actualMaximumNumberOfRetainedCheckpointsParamValue = + maxNumberOfCheckpointsToRetain; + + return null; + } + + @Override + public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception { + throw new UnsupportedOperationException("createCheckpointIDCounter is not implemented"); + } + + public JobID getActualJobId() { + return actualJobId; + } + + public int getActualMaximumNumberOfRetainedCheckpointsParamValue() { + return actualMaximumNumberOfRetainedCheckpointsParamValue; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java index 6b65d00e4f091..6912299195a6a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java @@ -38,21 +38,16 @@ public void testFactoryWithoutCheckpointStoreRecovery() throws Exception { final CheckpointRecoveryFactory factory = PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery( maxCheckpoints -> store); - final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - final JobID firstJobId = new JobID(); - assertSame( - store, factory.createRecoveredCompletedCheckpointStore(firstJobId, 1, classLoader)); + assertSame(store, factory.createRecoveredCompletedCheckpointStore(firstJobId, 1)); assertThrows( UnsupportedOperationException.class, - () -> factory.createRecoveredCompletedCheckpointStore(firstJobId, 1, classLoader)); + () -> factory.createRecoveredCompletedCheckpointStore(firstJobId, 1)); final JobID secondJobId = new JobID(); - assertSame( - store, - factory.createRecoveredCompletedCheckpointStore(secondJobId, 1, classLoader)); + assertSame(store, factory.createRecoveredCompletedCheckpointStore(secondJobId, 1)); assertThrows( UnsupportedOperationException.class, - () -> factory.createRecoveredCompletedCheckpointStore(secondJobId, 1, classLoader)); + () -> factory.createRecoveredCompletedCheckpointStore(secondJobId, 1)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java index f4e9256922a1f..b5133d523db7e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java @@ -33,7 +33,7 @@ public TestingCheckpointRecoveryFactory( @Override public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( - JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) { + JobID jobId, int maxNumberOfCheckpointsToRetain) { return store; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java index 5ee43aebc276d..df5630d0bc080 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.JobGraphWriter; @@ -39,6 +40,7 @@ import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.TestingJobResultStore; import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; import org.apache.flink.util.TestLogger; import org.apache.flink.util.TimeUtils; @@ -142,6 +144,8 @@ public class TestingDispatcherBuilder { private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE; + private JobResultStore jobResultStore = TestingJobResultStore.builder().build(); + private FatalErrorHandler fatalErrorHandler = testingFatalErrorHandlerResource.getFatalErrorHandler(); @@ -171,6 +175,11 @@ TestingDispatcherBuilder setJobGraphWriter(JobGraphWriter jobGraphWriter) { return this; } + TestingDispatcherBuilder setJobResultStore(JobResultStore jobResultStore) { + this.jobResultStore = jobResultStore; + return this; + } + public TestingDispatcherBuilder setFatalErrorHandler(FatalErrorHandler fatalErrorHandler) { this.fatalErrorHandler = fatalErrorHandler; return this; @@ -187,6 +196,7 @@ TestingDispatcher build() throws Exception { rpcService, DispatcherId.generate(), initialJobGraphs, + Collections.emptyList(), dispatcherBootstrapFactory, new DispatcherServices( configuration, @@ -201,6 +211,7 @@ TestingDispatcher build() throws Exception { new DispatcherOperationCaches(), UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), jobGraphWriter, + jobResultStore, jobManagerRunnerFactory, ForkJoinPool.commonPool())); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index 8427d6f684c2f..291fd5f1b9afe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -208,6 +208,7 @@ private void startDispatcher(JobManagerRunnerFactory jobManagerRunnerFactory) th rpcService, DispatcherId.generate(), Collections.emptyList(), + Collections.emptyList(), (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), new DispatcherServices( @@ -223,6 +224,7 @@ private void startDispatcher(JobManagerRunnerFactory jobManagerRunnerFactory) th new DispatcherOperationCaches(), UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), jobGraphWriter, + jobResultStore, jobManagerRunnerFactory, ForkJoinPool.commonPool())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index 03bd399422335..06bd91ff09cee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -266,6 +266,7 @@ private MiniDispatcher createMiniDispatcher(ClusterEntrypoint.ExecutionMode exec new DispatcherOperationCaches(), UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), highAvailabilityServices.getJobGraphStore(), + highAvailabilityServices.getJobResultStore(), testingJobManagerRunnerFactory, ForkJoinPool.commonPool()), jobGraph, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java index 918d4f3cce0ba..b35ea525a2663 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; @@ -39,6 +40,7 @@ class TestingDispatcher extends Dispatcher { RpcService rpcService, DispatcherId fencingToken, Collection recoveredJobs, + Collection globallyTerminatedJobs, DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { @@ -46,6 +48,7 @@ class TestingDispatcher extends Dispatcher { rpcService, fencingToken, recoveredJobs, + globallyTerminatedJobs, dispatcherBootstrapFactory, dispatcherServices); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java index 03bf6dff8c3ee..2d32cfdb94176 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java @@ -31,17 +31,21 @@ import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory; import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; -import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore; +import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobPersistenceComponents; import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; import org.apache.flink.runtime.dispatcher.SingleJobJobGraphStore; import org.apache.flink.runtime.dispatcher.StandaloneDispatcher; import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory; import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory; +import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; @@ -95,6 +99,8 @@ public class DefaultDispatcherRunnerITCase extends TestLogger { private JobGraphStore jobGraphStore; + private JobResultStore jobResultStore; + private PartialDispatcherServices partialDispatcherServices; private DefaultDispatcherRunnerFactory dispatcherRunnerFactory; @@ -108,6 +114,7 @@ public void setup() { dispatcherLeaderElectionService = new TestingLeaderElectionService(); fatalErrorHandler = new TestingFatalErrorHandler(); jobGraphStore = TestingJobGraphStore.newBuilder().build(); + jobResultStore = new EmbeddedJobResultStore(); partialDispatcherServices = new PartialDispatcherServices( @@ -240,17 +247,20 @@ public Dispatcher createDispatcher( RpcService rpcService, DispatcherId fencingToken, Collection recoveredJobs, + Collection globallyTerminatedJobs, DispatcherBootstrapFactory dispatcherBootstrapFactory, - PartialDispatcherServicesWithJobGraphStore - partialDispatcherServicesWithJobGraphStore) + PartialDispatcherServicesWithJobPersistenceComponents + partialDispatcherServicesWithJobPersistenceComponents) throws Exception { return new StandaloneDispatcher( rpcService, fencingToken, recoveredJobs, + globallyTerminatedJobs, dispatcherBootstrapFactory, DispatcherServices.from( - partialDispatcherServicesWithJobGraphStore, jobManagerRunnerFactory)); + partialDispatcherServicesWithJobPersistenceComponents, + jobManagerRunnerFactory)); } } @@ -262,7 +272,17 @@ private DispatcherRunner createDispatcherRunner() throws Exception { return dispatcherRunnerFactory.createDispatcherRunner( dispatcherLeaderElectionService, fatalErrorHandler, - () -> jobGraphStore, + new JobPersistenceComponentFactory() { + @Override + public JobGraphStore createJobGraphStore() { + return jobGraphStore; + } + + @Override + public JobResultStore createJobResultStore() { + return jobResultStore; + } + }, TestingUtils.defaultExecutor(), rpcServiceResource.getTestingRpcService(), partialDispatcherServices); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java index 8592f612e0369..11c1abf19363a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java @@ -22,11 +22,13 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.client.DuplicateJobSubmissionException; import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobmanager.JobGraphStore; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.testutils.TestingJobGraphStore; +import org.apache.flink.runtime.testutils.TestingJobResultStore; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; import org.apache.flink.util.ExceptionUtils; @@ -34,8 +36,8 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.QuintFunctionWithException; import org.apache.flink.util.function.ThrowingConsumer; -import org.apache.flink.util.function.TriFunctionWithException; import org.junit.After; import org.junit.AfterClass; @@ -73,6 +75,7 @@ public class SessionDispatcherLeaderProcessTest extends TestLogger { private TestingFatalErrorHandler fatalErrorHandler; private JobGraphStore jobGraphStore; + private JobResultStore jobResultStore; private TestingDispatcherServiceFactory dispatcherServiceFactory; @@ -85,6 +88,7 @@ public static void setupClass() { public void setup() { fatalErrorHandler = new TestingFatalErrorHandler(); jobGraphStore = TestingJobGraphStore.newBuilder().build(); + jobResultStore = TestingJobResultStore.builder().build(); dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder().build(); } @@ -128,7 +132,11 @@ public void start_triggersJobGraphRecoveryAndDispatcherServiceCreation() throws dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder() .setCreateFunction( - (fencingToken, recoveredJobGraphs, jobGraphStore) -> { + (fencingToken, + recoveredJobGraphs, + jobResults, + jobGraphStore, + jobResultStore) -> { recoveredJobGraphsFuture.complete(recoveredJobGraphs); return TestingDispatcherGatewayService.newBuilder().build(); }) @@ -160,7 +168,7 @@ public void closeAsync_stopsJobGraphStoreAndDispatcher() throws Exception { dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder() .setCreateFunction( - (ignoredA, ignoredB, ignoredC) -> + (ignoredA, ignoredB, ignoredC, ignoredD, ignoredE) -> TestingDispatcherGatewayService.newBuilder() .setTerminationFuture( dispatcherServiceTerminationFuture) @@ -196,7 +204,7 @@ public void unexpectedDispatcherServiceTerminationWhileRunning_callsFatalErrorHa dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder() .setCreateFunction( - (ignoredA, ignoredB, ignoredC) -> + (ignoredA, ignoredB, ignoredC, ignoredD, ignoredE) -> TestingDispatcherGatewayService.newBuilder() .setTerminationFuture(terminationFuture) .build()) @@ -221,7 +229,7 @@ public void unexpectedDispatcherServiceTerminationWhileRunning_callsFatalErrorHa dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder() .setCreateFunction( - (ignoredA, ignoredB, ignoredC) -> + (ignoredA, ignoredB, ignoredC, ignoredD, ignoredE) -> TestingDispatcherGatewayService.newBuilder() .setTerminationFuture(terminationFuture) .withManualTerminationFutureCompletion() @@ -250,8 +258,8 @@ public void confirmLeaderSessionFuture_completesAfterDispatcherServiceHasBeenSta dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder() .setCreateFunction( - TriFunctionWithException.unchecked( - (ignoredA, ignoredB, ignoredC) -> { + QuintFunctionWithException.unchecked( + (ignoredA, ignoredB, ignoredC, ignoredD, ignoredE) -> { createDispatcherServiceLatch.await(); return TestingDispatcherGatewayService.newBuilder() .setDispatcherGateway(dispatcherGateway) @@ -293,7 +301,7 @@ public void closeAsync_duringJobRecovery_preventsDispatcherServiceCreation() thr this.dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder() .setCreateFunction( - (ignoredA, ignoredB, ignoredC) -> { + (ignoredA, ignoredB, ignoredC, ignoredD, ignoredE) -> { createDispatcherServiceLatch.trigger(); return TestingDispatcherGatewayService.newBuilder().build(); }) @@ -337,8 +345,11 @@ public void onRemovedJobGraph_terminatesRunningJob() throws Exception { dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder() .setCreateFunction( - (dispatcherId, jobGraphs, jobGraphWriter) -> - testingDispatcherService) + (dispatcherId, + jobGraphs, + globallyTerminatedJobs, + jobGraphWriter, + jobResultStore) -> testingDispatcherService) .build(); try (final SessionDispatcherLeaderProcess dispatcherLeaderProcess = @@ -369,8 +380,11 @@ public void onRemovedJobGraph_failingRemovalCall_failsFatally() throws Exception dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder() .setCreateFunction( - (dispatcherId, jobGraphs, jobGraphWriter) -> - testingDispatcherService) + (dispatcherId, + jobGraphs, + globallyTerminatedJobs, + jobGraphWriter, + jobResultStore) -> testingDispatcherService) .build(); try (final SessionDispatcherLeaderProcess dispatcherLeaderProcess = @@ -588,7 +602,11 @@ private void runOnAddedJobGraphTest( dispatcherServiceFactory = TestingDispatcherServiceFactory.newBuilder() .setCreateFunction( - (dispatcherId, jobGraphs, jobGraphWriter) -> { + (dispatcherId, + jobGraphs, + globallyTerminatedJobs, + jobGraphWriter, + jobResultStore) -> { assertThat(jobGraphs, containsInAnyOrder(JOB_GRAPH)); return TestingDispatcherGatewayService.newBuilder() @@ -623,7 +641,7 @@ private TestingDispatcherServiceFactory createDispatcherServiceFactoryFor( TestingDispatcherGateway testingDispatcherGateway) { return TestingDispatcherServiceFactory.newBuilder() .setCreateFunction( - (ignoredA, ignoredB, ignoredC) -> + (ignoredA, ignoredB, ignoredC, ignoredD, ignoredE) -> TestingDispatcherGatewayService.newBuilder() .setDispatcherGateway(testingDispatcherGateway) .build()) @@ -635,6 +653,7 @@ private SessionDispatcherLeaderProcess createDispatcherLeaderProcess() { leaderSessionId, dispatcherServiceFactory, jobGraphStore, + jobResultStore, ioExecutor, fatalErrorHandler); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/TestingDispatcherServiceFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/TestingDispatcherServiceFactory.java index 5efacdafeab5a..3dcd8bc8a1369 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/TestingDispatcherServiceFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/TestingDispatcherServiceFactory.java @@ -19,26 +19,33 @@ package org.apache.flink.runtime.dispatcher.runner; import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.JobGraphWriter; -import org.apache.flink.util.function.TriFunction; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.util.function.QuintFunction; import java.util.Collection; class TestingDispatcherServiceFactory implements AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory { - private final TriFunction< + + private final QuintFunction< DispatcherId, Collection, + Collection, JobGraphWriter, + JobResultStore, AbstractDispatcherLeaderProcess.DispatcherGatewayService> createFunction; private TestingDispatcherServiceFactory( - TriFunction< + QuintFunction< DispatcherId, Collection, + Collection, JobGraphWriter, + JobResultStore, AbstractDispatcherLeaderProcess.DispatcherGatewayService> createFunction) { this.createFunction = createFunction; @@ -48,8 +55,15 @@ private TestingDispatcherServiceFactory( public AbstractDispatcherLeaderProcess.DispatcherGatewayService create( DispatcherId fencingToken, Collection recoveredJobs, - JobGraphWriter jobGraphWriter) { - return createFunction.apply(fencingToken, recoveredJobs, jobGraphWriter); + Collection globallyTerminatedJobs, + JobGraphWriter jobGraphWriter, + JobResultStore jobResultStore) { + return createFunction.apply( + fencingToken, + recoveredJobs, + globallyTerminatedJobs, + jobGraphWriter, + jobResultStore); } public static Builder newBuilder() { @@ -57,22 +71,26 @@ public static Builder newBuilder() { } public static class Builder { - private TriFunction< + private QuintFunction< DispatcherId, Collection, + Collection, JobGraphWriter, + JobResultStore, AbstractDispatcherLeaderProcess.DispatcherGatewayService> createFunction = - (ignoredA, ignoredB, ignoredC) -> + (ignoredA, ignoredB, ignoredC, ignoredD, ignoredE) -> TestingDispatcherGatewayService.newBuilder().build(); private Builder() {} Builder setCreateFunction( - TriFunction< + QuintFunction< DispatcherId, Collection, + Collection, JobGraphWriter, + JobResultStore, AbstractDispatcherLeaderProcess.DispatcherGatewayService> createFunction) { this.createFunction = createFunction; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java index 70dee1e54f780..47f5e6565c11e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore; @@ -42,7 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.JobGraphStore; -import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory; +import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; @@ -179,7 +180,17 @@ public void testResourceCleanupUnderLeadershipChange() throws Exception { createDispatcherRunner( rpcService, dispatcherLeaderElectionService, - () -> createZooKeeperJobGraphStore(client), + new JobPersistenceComponentFactory() { + @Override + public JobGraphStore createJobGraphStore() { + return createZooKeeperJobGraphStore(client); + } + + @Override + public JobResultStore createJobResultStore() { + return new EmbeddedJobResultStore(); + } + }, partialDispatcherServices, defaultDispatcherRunnerFactory)) { @@ -226,14 +237,14 @@ public void testResourceCleanupUnderLeadershipChange() throws Exception { private DispatcherRunner createDispatcherRunner( TestingRpcService rpcService, TestingLeaderElectionService dispatcherLeaderElectionService, - JobGraphStoreFactory jobGraphStoreFactory, + JobPersistenceComponentFactory jobPersistenceComponentFactory, PartialDispatcherServices partialDispatcherServices, DispatcherRunnerFactory dispatcherRunnerFactory) throws Exception { return dispatcherRunnerFactory.createDispatcherRunner( dispatcherLeaderElectionService, fatalErrorHandler, - jobGraphStoreFactory, + jobPersistenceComponentFactory, TestingUtils.defaultExecutor(), rpcService, partialDispatcherServices); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java index cf53a686c4980..b865fc414c8a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder; -import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory; +import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; @@ -339,7 +339,7 @@ private TestingDispatcherRunnerFactory( public DispatcherRunner createDispatcherRunner( LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, - JobGraphStoreFactory jobGraphStoreFactory, + JobPersistenceComponentFactory jobPersistenceComponentFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java index 9dc17dff3ddf2..619f4d5f7e894 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; @@ -102,6 +103,7 @@ public SchedulerNG createInstance( ScheduledExecutorService futureExecutor, ClassLoader userCodeLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, + CheckpointsCleaner checkpointsCleaner, Time rpcTimeout, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java deleted file mode 100644 index 23073aa55a0c9..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.scheduler; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -/** Tests for the {@link SchedulerUtils} utilities. */ -public class SchedulerUtilsTest extends TestLogger { - - @Test - public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception { - - final int maxNumberOfCheckpointsToRetain = 10; - final Configuration jobManagerConfig = new Configuration(); - jobManagerConfig.setInteger( - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, maxNumberOfCheckpointsToRetain); - - final CompletedCheckpointStore completedCheckpointStore = - SchedulerUtils.createCompletedCheckpointStore( - jobManagerConfig, - getClass().getClassLoader(), - new StandaloneCheckpointRecoveryFactory(), - log, - new JobID()); - - assertEquals( - maxNumberOfCheckpointsToRetain, - completedCheckpointStore.getMaxNumberOfRetainedCheckpoints()); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java index 40a122e46f597..be511ec809f64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; @@ -59,6 +60,7 @@ public SchedulerNG createInstance( ScheduledExecutorService futureExecutor, ClassLoader userCodeLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, + CheckpointsCleaner checkpointsCleaner, Time rpcTimeout, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java index 4c31f78554699..e9a797f0f47f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java @@ -141,6 +141,7 @@ public static Builder newBuilder() { return new Builder(); } + /** {@code Builder} for creating {@code TestingJobGraphStore} instances. */ public static class Builder { private ThrowingConsumer startConsumer = ignored -> {}; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java new file mode 100644 index 0000000000000..d4231794cdc0e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.highavailability.JobResultStore; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +/** + * {@code TestingJobResultStore} is a {@link JobResultStore} implementation that can be used in + * tests. + */ +public class TestingJobResultStore implements JobResultStore { + + private final ThrowingConsumer createDirtyResultConsumer; + private final ThrowingConsumer markResultAsCleanConsumer; + private final FunctionWithException + hasJobResultEntryFunction; + private final SupplierWithException, ? extends IOException> + getDirtyResultsSupplier; + + private TestingJobResultStore( + ThrowingConsumer createDirtyResultConsumer, + ThrowingConsumer markResultAsCleanConsumer, + FunctionWithException hasJobResultEntryFunction, + SupplierWithException, ? extends IOException> + getDirtyResultsSupplier) { + this.createDirtyResultConsumer = createDirtyResultConsumer; + this.markResultAsCleanConsumer = markResultAsCleanConsumer; + this.hasJobResultEntryFunction = hasJobResultEntryFunction; + this.getDirtyResultsSupplier = getDirtyResultsSupplier; + } + + @Override + public void createDirtyResult(JobResult jobResult) throws IOException { + createDirtyResultConsumer.accept(jobResult); + } + + @Override + public void markResultAsClean(JobID jobId) throws IOException { + markResultAsCleanConsumer.accept(jobId); + } + + @Override + public boolean hasJobResultEntry(JobID jobId) throws IOException { + return hasJobResultEntryFunction.apply(jobId); + } + + @Override + public Collection getDirtyResults() throws IOException { + return getDirtyResultsSupplier.get(); + } + + public static TestingJobResultStore.Builder builder() { + return new Builder(); + } + + /** {@code Builder} for instantiating {@code TestingJobResultStore} instances. */ + public static class Builder { + + private ThrowingConsumer createDirtyResultConsumer = + ignored -> {}; + private ThrowingConsumer markResultAsCleanConsumer = + ignored -> {}; + private FunctionWithException + hasJobResultEntryFunction = ignored -> false; + private SupplierWithException, ? extends IOException> + getDirtyResultsSupplier = Collections::emptyList; + + public Builder withCreateDirtyResultConsumer( + ThrowingConsumer createDirtyResultConsumer) { + this.createDirtyResultConsumer = createDirtyResultConsumer; + return this; + } + + public Builder withMarkResultAsCleanConsumer( + ThrowingConsumer markResultAsCleanConsumer) { + this.markResultAsCleanConsumer = markResultAsCleanConsumer; + return this; + } + + public Builder withHasJobResultEntryFunction( + FunctionWithException + hasJobResultEntryFunction) { + this.hasJobResultEntryFunction = hasJobResultEntryFunction; + return this; + } + + public Builder withGetDirtyResultsSupplier( + SupplierWithException, ? extends IOException> + getDirtyResultsSupplier) { + this.getDirtyResultsSupplier = getDirtyResultsSupplier; + return this; + } + + public TestingJobResultStore build() { + return new TestingJobResultStore( + createDirtyResultConsumer, + markResultAsCleanConsumer, + hasJobResultEntryFunction, + getDirtyResultsSupplier); + } + } +}