Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ed29419
[hotfix] Adds missing JavaDoc
XComp Nov 15, 2021
f3b5ddf
[hotfix] Makes intention of comment clearer
XComp Nov 15, 2021
aaef424
[FLINK-11813] Renames JobGraphStoreFactory into JobPersistenceCompone…
XComp Nov 29, 2021
b13d14f
[FLINK-11813] Introduces new method HaServicesJobPersistenceComponent…
XComp Nov 29, 2021
9480c58
[FLINK-11813] Introduces TestingJobResultStore
XComp Nov 29, 2021
0114fdd
[FLINK-11813] Make JobResultStore available to SessionDispatcherLeade…
XComp Nov 29, 2021
2e56822
[FLINK-11813] Adds mapping from ApplicationStatus to JobStatus
XComp Nov 29, 2021
23d4a86
[FLINK-11813] Integrated JobResult handling into SessionDispatcherLea…
XComp Nov 29, 2021
b9e84aa
[FLINK-11813] Integrated JobResultStore into DispatcherGatewayService…
XComp Nov 29, 2021
8822c2f
[FLINK-11813] Integrated JobResultStore into DispatcherFactory
XComp Nov 29, 2021
12c227a
[FLINK-11813] Renames PartialDispatcherServicesWithJobGraphStore into…
XComp Nov 29, 2021
95f2c92
[hotfix] Removes @Nonnull annotation from PartialDispatcherServicesWi…
XComp Nov 29, 2021
a9ae6f4
[hotfix] Removes @Nonnull annotations from DispatcherServices
XComp Nov 29, 2021
7d03655
[FLINK-11813] Adds JobResultStore to DispatcherServices
XComp Nov 29, 2021
e2ed7ce
[FLINK-11813] Adds globally-terminated jobs to Dispatcher interface
XComp Nov 29, 2021
b3ac425
[FLINK-11813] Moves CheckpointsCleaner initialization up into Default…
XComp Dec 1, 2021
66f7527
[FLINK-11813] Moves CheckpointsCleaner initialization up into JobMaster
XComp Dec 2, 2021
f758260
[FLINK-11813] Moves CheckpointsCleaner initialization into JobManager…
XComp Dec 2, 2021
d17f7f0
[FLINK-11813] Moves cleanup logic into dedicated methods
XComp Dec 1, 2021
f5e5bfa
[FLINK-11813] Fixes wrong condition for cleanup marking
XComp Dec 1, 2021
0ff1f3a
[FLINK-11813] Adds async cleanup on ioExecutor
XComp Dec 1, 2021
97f392e
[FLINK-11813] Removes unused classloader parameter from CheckpointRec…
XComp Nov 25, 2021
4dee11a
[FLINK-11813] Moves CheckpointRecoveryFactory initialization based on…
XComp Nov 26, 2021
91385e4
[FLINK-11813] Introduces classes for checkpoint-related cleanup
XComp Nov 29, 2021
d8b8782
[FLINK-11813] Integrates checkpoint-related cleanup
XComp Dec 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobPersistenceComponents;
import org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherGatewayService;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.FlinkRuntimeException;

Expand Down Expand Up @@ -83,7 +85,9 @@ public ApplicationDispatcherGatewayServiceFactory(
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {
Collection<JobResult> globallyTerminatedJobs,
JobGraphWriter jobGraphWriter,
JobResultStore jobResultStore) {

final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);

Expand All @@ -94,6 +98,7 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
rpcService,
fencingToken,
recoveredJobs,
globallyTerminatedJobs,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new ApplicationDispatcherBootstrap(
application,
Expand All @@ -102,8 +107,8 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
dispatcherGateway,
scheduledExecutor,
errorHandler),
PartialDispatcherServicesWithJobGraphStore.from(
partialDispatcherServices, jobGraphWriter));
PartialDispatcherServicesWithJobPersistenceComponents.from(
partialDispatcherServices, jobGraphWriter, jobResultStore));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactoryFactory;
import org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcessFactory;
import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;

Expand Down Expand Up @@ -59,7 +59,7 @@ private ApplicationDispatcherLeaderProcessFactoryFactory(

@Override
public DispatcherLeaderProcessFactory createFactory(
JobGraphStoreFactory jobGraphStoreFactory,
JobPersistenceComponentFactory jobPersistenceComponentFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices,
Expand All @@ -74,7 +74,10 @@ public DispatcherLeaderProcessFactory createFactory(
partialDispatcherServices);

return new SessionDispatcherLeaderProcessFactory(
dispatcherServiceFactory, jobGraphStoreFactory, ioExecutor, fatalErrorHandler);
dispatcherServiceFactory,
jobPersistenceComponentFactory,
ioExecutor,
fatalErrorHandler);
}

public static ApplicationDispatcherLeaderProcessFactoryFactory create(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util.function;

import org.apache.flink.annotation.PublicEvolving;

/**
* Function which takes three arguments.
*
* @param <S> type of the first argument
* @param <T> type of the second argument
* @param <U> type of the third argument
* @param <V> type of the fourth argument
* @param <W> type of the fifth argument
* @param <R> type of the return value
*/
@PublicEvolving
@FunctionalInterface
public interface QuintFunction<S, T, U, V, W, R> {

/**
* Applies this function to the given arguments.
*
* @param s the first function argument
* @param t the second function argument
* @param u the third function argument
* @param v the fourth function argument
* @param w the fifth function argument
* @return the function result
*/
R apply(S s, T t, U u, V v, W w);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util.function;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.ExceptionUtils;

/**
* Function which takes five arguments.
*
* @param <S> type of the first argument
* @param <T> type of the second argument
* @param <U> type of the third argument
* @param <V> type of the four argument
* @param <W> type of the five argument
* @param <R> type of the return value
* @param <E> type of the thrown exception
*/
@PublicEvolving
@FunctionalInterface
public interface QuintFunctionWithException<S, T, U, V, W, R, E extends Throwable> {

/**
* Applies this function to the given arguments.
*
* @param s the first function argument
* @param t the second function argument
* @param u the third function argument
* @param v the four function argument
* @param w the five function argument
* @return the function result
* @throws E if it fails
*/
R apply(S s, T t, U u, V v, W w) throws E;

/**
* Convert at {@link QuintFunctionWithException} into a {@link QuintFunction}.
*
* @param quintFunctionWithException function with exception to convert into a function
* @param <A> first input type
* @param <B> second input type
* @param <C> third input type
* @param <D> fourth input type
* @param <E> fifth input type
* @param <R> output type
* @return {@link QuintFunction} which throws all checked exception as an unchecked exception.
*/
static <A, B, C, D, E, R> QuintFunction<A, B, C, D, E, R> unchecked(
QuintFunctionWithException<A, B, C, D, E, R, ?> quintFunctionWithException) {
return (A a, B b, C c, D d, E e) -> {
try {
return quintFunctionWithException.apply(a, b, c, d, e);
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
// we need this to appease the compiler :-(
return null;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ public KubernetesCheckpointRecoveryFactory(

@Override
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobID, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception {
JobID jobID, int maxNumberOfCheckpointsToRetain) throws Exception {

final String configMapName = getConfigMapNameFunction.apply(jobID);
return KubernetesUtils.createCompletedCheckpointStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,8 @@ public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) {
try {
FileUtils.deleteDirectory(jobDir);

// NOTE: Instead of going through blobExpiryTimes, keep lingering entries - they
// NOTE on why blobExpiryTimes are not cleaned up:
// Instead of going through blobExpiryTimes, keep lingering entries - they
// will be cleaned up by the timer task which tolerates non-existing files
// If inserted again with the same IDs (via put()), the TTL will be updated
// again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.flink.runtime.checkpoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;

import org.slf4j.Logger;

/** A factory for per Job checkpoint recovery components. */
public interface CheckpointRecoveryFactory {
Expand All @@ -30,12 +34,43 @@ public interface CheckpointRecoveryFactory {
*
* @param jobId Job ID to recover checkpoints for
* @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to retain
* @param userClassLoader User code class loader of the job
* @return {@link CompletedCheckpointStore} instance for the job
*/
CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception;
JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception;

/**
* Instantiates the {@link CompletedCheckpointStore} based on the passed {@code Configuration}.
*
* @param jobId The {@code JobID} for which the {@code CompletedCheckpointStore} shall be
* created.
* @param config The {@code Configuration} that shall be used (see {@link
* CheckpointingOptions#MAX_RETAINED_CHECKPOINTS}.
* @param logger The logger that shall be used internally.
* @return The {@code CompletedCheckpointStore} instance for the given {@code Job}.
* @throws Exception if an error occurs while instantiating the {@code
* CompletedCheckpointStore}.
*/
default CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId, Configuration config, Logger logger) throws Exception {
int maxNumberOfCheckpointsToRetain =
config.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);

if (maxNumberOfCheckpointsToRetain <= 0) {
// warning and use 1 as the default value if the setting in
// state.checkpoints.max-retained-checkpoints is not greater than 0.
logger.warn(
"The setting for '{} : {}' is invalid. Using default value of {}",
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
maxNumberOfCheckpointsToRetain,
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());

maxNumberOfCheckpointsToRetain =
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
}

return this.createRecoveredCompletedCheckpointStore(jobId, maxNumberOfCheckpointsToRetain);
}

/**
* Creates a {@link CheckpointIDCounter} instance for a job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public PerJobCheckpointRecoveryFactory(

@Override
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) {
JobID jobId, int maxNumberOfCheckpointsToRetain) {
return store.compute(
jobId,
(key, previous) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa

@Override
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception {
JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception {

return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public ZooKeeperCheckpointRecoveryFactory(

@Override
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception {
JobID jobId, int maxNumberOfCheckpointsToRetain) throws Exception {

return ZooKeeperUtils.createCompletedCheckpoints(
ZooKeeperUtils.useNamespaceAndEnsurePath(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.flink.runtime.clusterframework;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.guava30.com.google.common.collect.BiMap;
import org.apache.flink.shaded.guava30.com.google.common.collect.HashBiMap;

/** The status of an application. */
public enum ApplicationStatus {
Expand All @@ -37,6 +41,16 @@ public enum ApplicationStatus {

// ------------------------------------------------------------------------

private static final BiMap<JobStatus, ApplicationStatus> JOB_STATUS_APPLICATION_STATUS_BI_MAP =
HashBiMap.create();

static {
// only globally-terminated JobStatus have a corresponding ApplicationStatus
JOB_STATUS_APPLICATION_STATUS_BI_MAP.put(JobStatus.FAILED, ApplicationStatus.FAILED);
JOB_STATUS_APPLICATION_STATUS_BI_MAP.put(JobStatus.CANCELED, ApplicationStatus.CANCELED);
JOB_STATUS_APPLICATION_STATUS_BI_MAP.put(JobStatus.FINISHED, ApplicationStatus.SUCCEEDED);
}

/** The associated process exit code */
private final int processExitCode;

Expand All @@ -59,20 +73,20 @@ public int processExitCode() {
* #UNKNOWN}.
*/
public static ApplicationStatus fromJobStatus(JobStatus jobStatus) {
if (jobStatus == null) {
if (jobStatus == null || !JOB_STATUS_APPLICATION_STATUS_BI_MAP.containsKey(jobStatus)) {
return UNKNOWN;
} else {
switch (jobStatus) {
case FAILED:
return FAILED;
case CANCELED:
return CANCELED;
case FINISHED:
return SUCCEEDED;

default:
return UNKNOWN;
}
}

return JOB_STATUS_APPLICATION_STATUS_BI_MAP.get(jobStatus);
}

public static JobStatus fromApplicationStatus(ApplicationStatus applicationStatus) {
Preconditions.checkNotNull(applicationStatus, "ApplicationStatus must not be null");
if (JOB_STATUS_APPLICATION_STATUS_BI_MAP.inverse().containsKey(applicationStatus)) {
throw new UnsupportedOperationException(
applicationStatus.name() + " cannot be mapped to a JobStatus.");
}

return JOB_STATUS_APPLICATION_STATUS_BI_MAP.inverse().get(applicationStatus);
}
}
Loading