Skip to content

Commit

Permalink
[FLINK-11662] Disable task to fail on checkpoint errors
Browse files Browse the repository at this point in the history
This closes #8745.
  • Loading branch information
Myasuka authored and StefanRRichter committed Jun 27, 2019
1 parent 0004056 commit b760d55
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
/** This flag defines if we use compression for the state snapshot data or not. Default: false */
private boolean useSnapshotCompression = false;

/** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */
/**
* @deprecated Should no longer be used because we would not support to let task directly fail on checkpoint error.
*/
@Deprecated
private boolean failTaskOnCheckpointError = true;

/** The default input dependency constraint to schedule tasks. */
Expand Down Expand Up @@ -948,20 +951,22 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) {
}

/**
* This method is visible because of the way the configuration is currently forwarded from the checkpoint config to
* the task. This should not be called by the user, please use CheckpointConfig.isFailTaskOnCheckpointError()
* instead.
* @deprecated This method takes no effect since we would not forward the configuration from the checkpoint config
* to the task, and we have not supported task to fail on checkpoint error.
* Please use CheckpointConfig.getTolerableCheckpointFailureNumber() to know the behavior on checkpoint errors.
*/
@Deprecated
@Internal
public boolean isFailTaskOnCheckpointError() {
return failTaskOnCheckpointError;
}

/**
* This method is visible because of the way the configuration is currently forwarded from the checkpoint config to
* the task. This should not be called by the user, please use CheckpointConfig.setFailOnCheckpointingErrors(...)
* instead.
* @deprecated This method takes no effect since we would not forward the configuration from the checkpoint config
* to the task, and we have not supported task to fail on checkpoint error.
* Please use CheckpointConfig.setTolerableCheckpointFailureNumber(int) to determine the behavior on checkpoint errors.
*/
@Deprecated
@Internal
public void setFailTaskOnCheckpointError(boolean failTaskOnCheckpointError) {
this.failTaskOnCheckpointError = failTaskOnCheckpointError;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
* <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li>
* <li>environment.externalize_checkpoint (boolean, default - false): whether or not checkpoints should be externalized.</li>
* <li>environment.externalize_checkpoint.cleanup (String, default - 'retain'): Configures the cleanup mode for externalized checkpoints. Can be 'retain' or 'delete'.</li>
* <li>environment.fail_on_checkpointing_errors (String, default - true): Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure.</li>
* <li>environment.tolerable_checkpoint_failure_number (int, default - 0): Sets the expected behaviour for the job manager in case that it received declined checkpoints from tasks.</li>
* <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li>
* <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li>
* <li>environment.restart_strategy (String, default - 'fixed_delay'): The failure restart strategy to use. Can be 'fixed_delay' or 'no_restart'.</li>
Expand Down Expand Up @@ -150,9 +150,9 @@ public class DataStreamAllroundTestJobFactory {
.key("environment.externalize_checkpoint.cleanup")
.defaultValue("retain");

private static final ConfigOption<Boolean> ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS = ConfigOptions
.key("environment.fail_on_checkpointing_errors")
.defaultValue(true);
private static final ConfigOption<Integer> ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER = ConfigOptions
.key("environment.tolerable_declined_checkpoint_number ")
.defaultValue(0);

private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = ConfigOptions
.key("environment.parallelism")
Expand Down Expand Up @@ -272,12 +272,12 @@ private static void setupCheckpointing(final StreamExecutionEnvironment env, fin
throw new IllegalArgumentException("Unknown clean up mode for externalized checkpoints: " + cleanupModeConfig);
}
env.getCheckpointConfig().enableExternalizedCheckpoints(cleanupMode);
}

final boolean failOnCheckpointingErrors = pt.getBoolean(
ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS.key(),
ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS.defaultValue());
env.getCheckpointConfig().setFailOnCheckpointingErrors(failOnCheckpointingErrors);
final int tolerableDeclinedCheckpointNumber = pt.getInt(
ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER.key(),
ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER.defaultValue());
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber);
}
}

private static void setupParallelism(final StreamExecutionEnvironment env, final ParameterTool pt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*/
public class CheckpointFailureManager {

private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE;
public static final int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE;

private final int tolerableCpFailureNumber;
private final FailJobCallback failureCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public CheckpointCoordinatorConfiguration(
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
boolean isPerfetCheckpointForRecovery,
boolean isPreferCheckpointForRecovery,
int tolerableCpFailureNumber) {

// sanity checks
Expand All @@ -81,7 +81,7 @@ public CheckpointCoordinatorConfiguration(
this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy);
this.isExactlyOnce = isExactlyOnce;
this.isPreferCheckpointForRecovery = isPerfetCheckpointForRecovery;
this.isPreferCheckpointForRecovery = isPreferCheckpointForRecovery;
this.tolerableCheckpointFailureNumber = tolerableCpFailureNumber;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.streaming.api.CheckpointingMode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Objects.requireNonNull;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -34,6 +38,8 @@ public class CheckpointConfig implements java.io.Serializable {

private static final long serialVersionUID = -750378776078908147L;

private static final Logger LOG = LoggerFactory.getLogger(CheckpointConfig.class);

/** The default checkpoint mode: exactly once. */
public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;

Expand All @@ -46,6 +52,8 @@ public class CheckpointConfig implements java.io.Serializable {
/** The default limit of concurrently happening checkpoints: one. */
public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;

public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;

// ------------------------------------------------------------------------

/** Checkpointing mode (exactly-once vs. at-least-once). */
Expand All @@ -69,14 +77,24 @@ public class CheckpointConfig implements java.io.Serializable {
/** Cleanup behaviour for persistent checkpoints. */
private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;

/** Determines if a tasks are failed or not if there is an error in their checkpointing. Default: true */
/**
* Task would not fail if there is an error in their checkpointing.
*
* <p>{@link #tolerableCheckpointFailureNumber} would always overrule this deprecated field if they have conflicts.
*
* @deprecated Use {@link #tolerableCheckpointFailureNumber}.
*/
@Deprecated
private boolean failOnCheckpointingErrors = true;

/** Determines if a job will fallback to checkpoint when there is a more recent savepoint. **/
private boolean preferCheckpointForRecovery = false;

/** Determines the threshold that we tolerance checkpoint failure number. */
private int tolerableCheckpointFailureNumber = 0;
/**
* Determines the threshold that we tolerance declined checkpoint failure number.
* The default value is -1 meaning undetermined and not set via {@link #setTolerableCheckpointFailureNumber(int)}.
* */
private int tolerableCheckpointFailureNumber = UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER;

// ------------------------------------------------------------------------

Expand Down Expand Up @@ -239,27 +257,57 @@ public void setForceCheckpointing(boolean forceCheckpointing) {
}

/**
* This determines the behaviour of tasks if there is an error in their local checkpointing. If this returns true,
* tasks will fail as a reaction. If this returns false, task will only decline the failed checkpoint.
* This determines the behaviour when meeting checkpoint errors.
* If this returns true, which is equivalent to get tolerableCheckpointFailureNumber as zero, job manager would
* fail the whole job once it received a decline checkpoint message.
* If this returns false, which is equivalent to get tolerableCheckpointFailureNumber as the maximum of integer (means unlimited),
* job manager would not fail the whole job no matter how many declined checkpoints it received.
*
* @deprecated Use {@link #getTolerableCheckpointFailureNumber()}.
*/
@Deprecated
public boolean isFailOnCheckpointingErrors() {
return failOnCheckpointingErrors;
}

/**
* Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure.
* If this is set to true, the task will fail on checkpointing error. If this is set to false, the task will only
* decline a the checkpoint and continue running. The default is true.
* Sets the expected behaviour for tasks in case that they encounter an error when checkpointing.
* If this is set as true, which is equivalent to set tolerableCheckpointFailureNumber as zero, job manager would
* fail the whole job once it received a decline checkpoint message.
* If this is set as false, which is equivalent to set tolerableCheckpointFailureNumber as the maximum of integer (means unlimited),
* job manager would not fail the whole job no matter how many declined checkpoints it received.
*
* <p>{@link #setTolerableCheckpointFailureNumber(int)} would always overrule this deprecated method if they have conflicts.
*
* @deprecated Use {@link #setTolerableCheckpointFailureNumber(int)}.
*/
@Deprecated
public void setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors) {
if (tolerableCheckpointFailureNumber != UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER) {
LOG.warn("Since tolerableCheckpointFailureNumber has been configured as {}, deprecated #setFailOnCheckpointingErrors(boolean) " +
"method would not take any effect and please use #setTolerableCheckpointFailureNumber(int) method to " +
"determine your expected behaviour when checkpoint errors on task side.", tolerableCheckpointFailureNumber);
return;
}
this.failOnCheckpointingErrors = failOnCheckpointingErrors;
if (failOnCheckpointingErrors) {
this.tolerableCheckpointFailureNumber = 0;
} else {
this.tolerableCheckpointFailureNumber = UNLIMITED_TOLERABLE_FAILURE_NUMBER;
}
}

/**
* Get the tolerable checkpoint failure number which used by the checkpoint failure manager
* to determine when we need to fail the job.
*
* <p>If the {@link #tolerableCheckpointFailureNumber} has not been configured, this method would return 0
* which means the checkpoint failure manager would not tolerate any declined checkpoint failure.
*/
public int getTolerableCheckpointFailureNumber() {
if (tolerableCheckpointFailureNumber == UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER) {
return 0;
}
return tolerableCheckpointFailureNumber;
}

Expand All @@ -268,6 +316,9 @@ public int getTolerableCheckpointFailureNumber() {
* we do not tolerance any checkpoint failure.
*/
public void setTolerableCheckpointFailureNumber(int tolerableCheckpointFailureNumber) {
if (tolerableCheckpointFailureNumber < 0) {
throw new IllegalArgumentException("The tolerable failure checkpoint number must be non-negative.");
}
this.tolerableCheckpointFailureNumber = tolerableCheckpointFailureNumber;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.graph;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.ResourceSpec;
Expand Down Expand Up @@ -606,11 +605,7 @@ private void configureCheckpointing() {
CheckpointConfig cfg = streamGraph.getCheckpointConfig();

long interval = cfg.getCheckpointInterval();
if (interval >= 10) {
ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
// propagate the expected behaviour for checkpoint errors to task.
executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
} else {
if (interval < 10) {
// interval of max value means disable periodic checkpoint
interval = Long.MAX_VALUE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand Down Expand Up @@ -419,7 +421,7 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times
if (!getContainingTask().isCanceled()) {
LOG.info(snapshotFailMessage, snapshotException);
}
throw new Exception(snapshotFailMessage, snapshotException);
throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
}

return snapshotInProgress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,12 @@
public class CheckpointExceptionHandlerFactory {

/**
* Returns a {@link CheckpointExceptionHandler} that either causes a task to fail completely or to just declines
* checkpoint on exception, depending on the parameter flag.
* Returns a {@link CheckpointExceptionHandler} that just declines checkpoint on exception.
*/
public CheckpointExceptionHandler createCheckpointExceptionHandler(
boolean failTaskOnCheckpointException,
Environment environment) {

if (failTaskOnCheckpointException) {
return new FailingCheckpointExceptionHandler();
} else {
return new DecliningCheckpointExceptionHandler(environment);
}
}

/**
* This handler makes the task fail by rethrowing a reported exception.
*/
static final class FailingCheckpointExceptionHandler implements CheckpointExceptionHandler {

@Override
public void tryHandleCheckpointException(
CheckpointMetaData checkpointMetaData,
Exception exception) throws Exception {

throw exception;
}
return new DecliningCheckpointExceptionHandler(environment);
}

/**
Expand All @@ -71,7 +51,7 @@ static final class DecliningCheckpointExceptionHandler implements CheckpointExce
@Override
public void tryHandleCheckpointException(
CheckpointMetaData checkpointMetaData,
Exception exception) throws Exception {
Exception exception) {

environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), exception);
}
Expand Down
Loading

0 comments on commit b760d55

Please sign in to comment.