Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11662] Disable task to fail on checkpoint errors #8745

Merged
merged 4 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
/** This flag defines if we use compression for the state snapshot data or not. Default: false */
private boolean useSnapshotCompression = false;

/** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */
/**
* @deprecated Should no longer be used because we would not support to let task directly fail on checkpoint error.
*/
@Deprecated
private boolean failTaskOnCheckpointError = true;

/** The default input dependency constraint to schedule tasks. */
Expand Down Expand Up @@ -948,20 +951,22 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) {
}

/**
* This method is visible because of the way the configuration is currently forwarded from the checkpoint config to
* the task. This should not be called by the user, please use CheckpointConfig.isFailTaskOnCheckpointError()
* instead.
* @deprecated This method takes no effect since we would not forward the configuration from the checkpoint config
* to the task, and we have not supported task to fail on checkpoint error.
* Please use CheckpointConfig.getTolerableCheckpointFailureNumber() to know the behavior on checkpoint errors.
*/
@Deprecated
@Internal
public boolean isFailTaskOnCheckpointError() {
return failTaskOnCheckpointError;
}

/**
* This method is visible because of the way the configuration is currently forwarded from the checkpoint config to
* the task. This should not be called by the user, please use CheckpointConfig.setFailOnCheckpointingErrors(...)
* instead.
* @deprecated This method takes no effect since we would not forward the configuration from the checkpoint config
* to the task, and we have not supported task to fail on checkpoint error.
* Please use CheckpointConfig.setTolerableCheckpointFailureNumber(int) to determine the behavior on checkpoint errors.
*/
@Deprecated
@Internal
public void setFailTaskOnCheckpointError(boolean failTaskOnCheckpointError) {
this.failTaskOnCheckpointError = failTaskOnCheckpointError;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
* <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li>
* <li>environment.externalize_checkpoint (boolean, default - false): whether or not checkpoints should be externalized.</li>
* <li>environment.externalize_checkpoint.cleanup (String, default - 'retain'): Configures the cleanup mode for externalized checkpoints. Can be 'retain' or 'delete'.</li>
* <li>environment.fail_on_checkpointing_errors (String, default - true): Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure.</li>
* <li>environment.tolerable_checkpoint_failure_number (int, default - 0): Sets the expected behaviour for the job manager in case that it received declined checkpoints from tasks.</li>
* <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li>
* <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li>
* <li>environment.restart_strategy (String, default - 'fixed_delay'): The failure restart strategy to use. Can be 'fixed_delay' or 'no_restart'.</li>
Expand Down Expand Up @@ -150,9 +150,9 @@ public class DataStreamAllroundTestJobFactory {
.key("environment.externalize_checkpoint.cleanup")
.defaultValue("retain");

private static final ConfigOption<Boolean> ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS = ConfigOptions
.key("environment.fail_on_checkpointing_errors")
.defaultValue(true);
private static final ConfigOption<Integer> ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER = ConfigOptions
.key("environment.tolerable_declined_checkpoint_number ")
.defaultValue(0);

private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = ConfigOptions
.key("environment.parallelism")
Expand Down Expand Up @@ -272,12 +272,12 @@ private static void setupCheckpointing(final StreamExecutionEnvironment env, fin
throw new IllegalArgumentException("Unknown clean up mode for externalized checkpoints: " + cleanupModeConfig);
}
env.getCheckpointConfig().enableExternalizedCheckpoints(cleanupMode);
}

final boolean failOnCheckpointingErrors = pt.getBoolean(
ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS.key(),
ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS.defaultValue());
env.getCheckpointConfig().setFailOnCheckpointingErrors(failOnCheckpointingErrors);
final int tolerableDeclinedCheckpointNumber = pt.getInt(
ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER.key(),
ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER.defaultValue());
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber);
}
}

private static void setupParallelism(final StreamExecutionEnvironment env, final ParameterTool pt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,24 @@
*/
public class CheckpointFailureManager {

private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE;
public final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE;
public final static int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;

private final int tolerableCpFailureNumber;
private final FailJobCallback failureCallback;
private final AtomicInteger continuousFailureCounter;
private final Set<Long> countedCheckpointIds;

public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) {
checkArgument(tolerableCpFailureNumber >= 0,
checkArgument(tolerableCpFailureNumber >= 0 || tolerableCpFailureNumber == UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER,
"The tolerable checkpoint failure number is illegal, " +
"it must be greater than or equal to 0 .");
this.tolerableCpFailureNumber = tolerableCpFailureNumber;
// set tolerableCpFailureNumber as 0 when passing UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER.
if (tolerableCpFailureNumber == UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER) {
Myasuka marked this conversation as resolved.
Show resolved Hide resolved
this.tolerableCpFailureNumber = 0;
} else {
this.tolerableCpFailureNumber = tolerableCpFailureNumber;
}
this.continuousFailureCounter = new AtomicInteger(0);
this.failureCallback = checkNotNull(failureCallback);
this.countedCheckpointIds = ConcurrentHashMap.newKeySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.io.Serializable;
import java.util.Objects;

import static org.apache.flink.runtime.checkpoint.CheckpointFailureManager.UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER;

/**
* Configuration settings for the {@link CheckpointCoordinator}. This includes the checkpoint
* interval, the checkpoint timeout, the pause between checkpoints, the maximum number of
Expand Down Expand Up @@ -65,13 +67,13 @@ public CheckpointCoordinatorConfiguration(
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
boolean isPerfetCheckpointForRecovery,
boolean isPreferCheckpointForRecovery,
int tolerableCpFailureNumber) {

// sanity checks
if (checkpointInterval < 10 || checkpointTimeout < 10 ||
minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1 ||
tolerableCpFailureNumber < 0) {
(tolerableCpFailureNumber < 0 && tolerableCpFailureNumber != UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER)) {
throw new IllegalArgumentException();
}

Expand All @@ -81,7 +83,7 @@ public CheckpointCoordinatorConfiguration(
this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy);
this.isExactlyOnce = isExactlyOnce;
this.isPreferCheckpointForRecovery = isPerfetCheckpointForRecovery;
this.isPreferCheckpointForRecovery = isPreferCheckpointForRecovery;
this.tolerableCheckpointFailureNumber = tolerableCpFailureNumber;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.streaming.api.CheckpointingMode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Objects.requireNonNull;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureManager.UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -34,6 +39,8 @@ public class CheckpointConfig implements java.io.Serializable {

private static final long serialVersionUID = -750378776078908147L;

private static final Logger LOG = LoggerFactory.getLogger(CheckpointConfig.class);

/** The default checkpoint mode: exactly once. */
public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;

Expand Down Expand Up @@ -69,14 +76,25 @@ public class CheckpointConfig implements java.io.Serializable {
/** Cleanup behaviour for persistent checkpoints. */
private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;

/** Determines if a tasks are failed or not if there is an error in their checkpointing. Default: true */
/**
* Task would not fail if there is an error in their checkpointing.
*
* <p>{@link #tolerableCheckpointFailureNumber} would always overrule this deprecated field if they have conflicts.
*
* @deprecated Use {@link #tolerableCheckpointFailureNumber}.
*/
@Deprecated
private boolean failOnCheckpointingErrors = true;

/** Determines if a job will fallback to checkpoint when there is a more recent savepoint. **/
private boolean preferCheckpointForRecovery = false;

/** Determines the threshold that we tolerance checkpoint failure number. */
private int tolerableCheckpointFailureNumber = 0;
/**
* Determines the threshold that we tolerance declined checkpoint failure number.
* -1 means undetermined by calling {@link #setTolerableCheckpointFailureNumber(int)} but still acts as fail the
* whole job once a checkpoint fail.
* */
private int tolerableCheckpointFailureNumber = UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER;

// ------------------------------------------------------------------------

Expand Down Expand Up @@ -239,20 +257,44 @@ public void setForceCheckpointing(boolean forceCheckpointing) {
}

/**
* This determines the behaviour of tasks if there is an error in their local checkpointing. If this returns true,
* tasks will fail as a reaction. If this returns false, task will only decline the failed checkpoint.
* This determines the behaviour when meeting checkpoint errors.
* If this returns true, which is equivalent to get tolerableCheckpointFailureNumber as zero, job manager would
* fail the whole job once it received a decline checkpoint message.
* If this returns false, which is equivalent to get tolerableCheckpointFailureNumber as the maximum of integer (means unlimited),
* job manager would not fail the whole job no matter how many declined checkpoints it received.
*
* @deprecated Use {@link #getTolerableCheckpointFailureNumber()}.
*/
@Deprecated
public boolean isFailOnCheckpointingErrors() {
return failOnCheckpointingErrors;
}

/**
* Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure.
* If this is set to true, the task will fail on checkpointing error. If this is set to false, the task will only
* decline a the checkpoint and continue running. The default is true.
* Sets the expected behaviour for tasks in case that they encounter an error when checkpointing.
* If this is set as true, which is equivalent to set tolerableCheckpointFailureNumber as zero, job manager would
* fail the whole job once it received a decline checkpoint message.
* If this is set as false, which is equivalent to set tolerableCheckpointFailureNumber as the maximum of integer (means unlimited),
* job manager would not fail the whole job no matter how many declined checkpoints it received.
*
* <p>{@link #setTolerableCheckpointFailureNumber(int)} would always overrule this deprecated method if they have conflicts.
*
* @deprecated Use {@link #setTolerableCheckpointFailureNumber(int)}.
*/
@Deprecated
public void setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors) {
if (tolerableCheckpointFailureNumber != UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER) {
LOG.warn("Since tolerableCheckpointFailureNumber has been configured as {}, deprecated #setFailOnCheckpointingErrors(boolean) " +
"method would not take any effect and please use #setTolerableCheckpointFailureNumber(int) method to " +
"determine your expected behaviour when checkpoint errors on task side.", tolerableCheckpointFailureNumber);
return;
}
this.failOnCheckpointingErrors = failOnCheckpointingErrors;
if (failOnCheckpointingErrors) {
this.tolerableCheckpointFailureNumber = 0;
} else {
this.tolerableCheckpointFailureNumber = UNLIMITED_TOLERABLE_FAILURE_NUMBER;
}
}

/**
Expand All @@ -268,6 +310,9 @@ public int getTolerableCheckpointFailureNumber() {
* we do not tolerance any checkpoint failure.
*/
public void setTolerableCheckpointFailureNumber(int tolerableCheckpointFailureNumber) {
if (tolerableCheckpointFailureNumber < 0) {
throw new IllegalArgumentException("The tolerable failure checkpoint number must be non-negative.");
}
this.tolerableCheckpointFailureNumber = tolerableCheckpointFailureNumber;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.graph;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.ResourceSpec;
Expand Down Expand Up @@ -606,11 +605,7 @@ private void configureCheckpointing() {
CheckpointConfig cfg = streamGraph.getCheckpointConfig();

long interval = cfg.getCheckpointInterval();
if (interval >= 10) {
ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
// propagate the expected behaviour for checkpoint errors to task.
executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
} else {
if (interval < 10) {
// interval of max value means disable periodic checkpoint
interval = Long.MAX_VALUE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand Down Expand Up @@ -419,7 +421,7 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times
if (!getContainingTask().isCanceled()) {
LOG.info(snapshotFailMessage, snapshotException);
}
throw new Exception(snapshotFailMessage, snapshotException);
throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
}

return snapshotInProgress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,12 @@
public class CheckpointExceptionHandlerFactory {

/**
* Returns a {@link CheckpointExceptionHandler} that either causes a task to fail completely or to just declines
* checkpoint on exception, depending on the parameter flag.
* Returns a {@link CheckpointExceptionHandler} that just declines checkpoint on exception.
*/
public CheckpointExceptionHandler createCheckpointExceptionHandler(
boolean failTaskOnCheckpointException,
Environment environment) {

if (failTaskOnCheckpointException) {
return new FailingCheckpointExceptionHandler();
} else {
return new DecliningCheckpointExceptionHandler(environment);
}
}

/**
* This handler makes the task fail by rethrowing a reported exception.
*/
static final class FailingCheckpointExceptionHandler implements CheckpointExceptionHandler {

@Override
public void tryHandleCheckpointException(
CheckpointMetaData checkpointMetaData,
Exception exception) throws Exception {

throw exception;
}
return new DecliningCheckpointExceptionHandler(environment);
}

/**
Expand All @@ -71,7 +51,7 @@ static final class DecliningCheckpointExceptionHandler implements CheckpointExce
@Override
public void tryHandleCheckpointException(
CheckpointMetaData checkpointMetaData,
Exception exception) throws Exception {
Exception exception) {

environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), exception);
}
Expand Down
Loading