Skip to content

Commit

Permalink
BDI-5971 Introduce restart-strategy.non-recoverable-exceptions to c…
Browse files Browse the repository at this point in the history
…onfigure Restart Strategy not restarting job for specific exceptions (apache#12)

* Implement non-recoverable-exception configuration for RestartStrategies
* FallbackRestartStrategy supports the non-recoverable configuration
* Add unit tests
* format code
  • Loading branch information
Jie Wang authored and GitHub Enterprise committed Jul 6, 2021
1 parent 838db22 commit 401614e
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
@Deprecated private long executionRetryDelay = DEFAULT_RESTART_DELAY;

@Deprecated private List<String> nonRecoverableExceptions = Collections.emptyList();

private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
new RestartStrategies.FallbackRestartStrategyConfiguration();

Expand Down Expand Up @@ -426,7 +428,9 @@ public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
// support the old API calls by creating a restart strategy from them
if (getNumberOfExecutionRetries() > 0 && getExecutionRetryDelay() >= 0) {
return RestartStrategies.fixedDelayRestart(
getNumberOfExecutionRetries(), getExecutionRetryDelay());
getNumberOfExecutionRetries(),
getExecutionRetryDelay(),
nonRecoverableExceptions);
} else if (getNumberOfExecutionRetries() == 0) {
return RestartStrategies.noRestart();
} else {
Expand Down Expand Up @@ -1117,6 +1121,13 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
.getOptional(ExecutionOptions.SNAPSHOT_COMPRESSION)
.ifPresent(this::setUseSnapshotCompression);
RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy);
// if FallbackRestartStrategy is used, try to get the non-recoverable exception
// configuration
if (restartStrategyConfiguration
instanceof RestartStrategies.FallbackRestartStrategyConfiguration) {
this.nonRecoverableExceptions =
RestartStrategies.getNonRecoverableExceptions(configuration);
}
configuration
.getOptional(PipelineOptions.KRYO_DEFAULT_SERIALIZERS)
.map(s -> parseKryoSerializersWithExceptionHandling(classLoader, s))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ public class RestartStrategyOptions {
code(RESTART_STRATEGY.key()), code("exponential-delay"))
.build());

public static final ConfigOption<String> RESTART_STRATEGY_NON_RECOVERABLE_EXCEPTIONS =
ConfigOptions.key("restart-strategy.non-recoverable-exceptions")
.stringType()
.defaultValue("")
.withDescription(
Description.builder()
.text(
"Non-recoverable exceptions. Flink will not try to restart the job when job fails because of these non-recoverable exceptions.")
.build());

private RestartStrategyOptions() {
throw new UnsupportedOperationException("This class should never be instantiated.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -55,13 +57,18 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof

private long lastFailureTimestamp;

private final List<String> nonRecoverableExceptions;

private boolean recoverable;

ExponentialDelayRestartBackoffTimeStrategy(
Clock clock,
long initialBackoffMS,
long maxBackoffMS,
double backoffMultiplier,
long resetBackoffThresholdMS,
double jitterFactor) {
double jitterFactor,
List<String> nonRecoverableExceptions) {

checkArgument(initialBackoffMS >= 1, "Initial backoff must be at least 1.");
checkArgument(maxBackoffMS >= 1, "Maximum backoff must be at least 1.");
Expand All @@ -84,12 +91,31 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof

this.clock = checkNotNull(clock);
this.lastFailureTimestamp = 0;
this.nonRecoverableExceptions = nonRecoverableExceptions;
this.strategyString = generateStrategyString();
this.recoverable = true;
}

ExponentialDelayRestartBackoffTimeStrategy(
Clock clock,
long initialBackoffMS,
long maxBackoffMS,
double backoffMultiplier,
long resetBackoffThresholdMS,
double jitterFactor) {
this(
clock,
initialBackoffMS,
maxBackoffMS,
backoffMultiplier,
resetBackoffThresholdMS,
jitterFactor,
Collections.emptyList());
}

@Override
public boolean canRestart() {
return true;
return recoverable;
}

@Override
Expand All @@ -107,6 +133,10 @@ public void notifyFailure(Throwable cause) {
increaseBackoff();
}
lastFailureTimestamp = now;
String causeClassName = cause.getClass().getName();
if (nonRecoverableExceptions.contains(causeClassName)) {
recoverable = false;
}
}

@Override
Expand Down Expand Up @@ -157,6 +187,10 @@ private String generateStrategyString() {
+ currentBackoffMS
+ ", lastFailureTimestamp="
+ lastFailureTimestamp
+ (nonRecoverableExceptions.isEmpty()
? ""
: ", nonRecoverableExceptions="
+ String.join("|", nonRecoverableExceptions))
+ ")";
}

Expand Down Expand Up @@ -202,18 +236,36 @@ public static class ExponentialDelayRestartBackoffTimeStrategyFactory implements
private final double backoffMultiplier;
private final long resetBackoffThresholdMS;
private final double jitterFactor;
private final List<String> nonRecoverableExceptions;

public ExponentialDelayRestartBackoffTimeStrategyFactory(
long initialBackoffMS,
long maxBackoffMS,
double backoffMultiplier,
long resetBackoffThresholdMS,
double jitterFactor) {
double jitterFactor,
List<String> nonRecoverableExceptions) {
this.initialBackoffMS = initialBackoffMS;
this.maxBackoffMS = maxBackoffMS;
this.backoffMultiplier = backoffMultiplier;
this.resetBackoffThresholdMS = resetBackoffThresholdMS;
this.jitterFactor = jitterFactor;
this.nonRecoverableExceptions = nonRecoverableExceptions;
}

public ExponentialDelayRestartBackoffTimeStrategyFactory(
long initialBackoffMS,
long maxBackoffMS,
double backoffMultiplier,
long resetBackoffThresholdMS,
double jitterFactor) {
this(
initialBackoffMS,
maxBackoffMS,
backoffMultiplier,
resetBackoffThresholdMS,
jitterFactor,
Collections.emptyList());
}

@Override
Expand All @@ -224,7 +276,8 @@ public RestartBackoffTimeStrategy create() {
maxBackoffMS,
backoffMultiplier,
resetBackoffThresholdMS,
jitterFactor);
jitterFactor,
nonRecoverableExceptions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.flink.util.clock.SystemClock;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -44,8 +46,16 @@ public class FailureRateRestartBackoffTimeStrategy implements RestartBackoffTime

private final Clock clock;

private final List<String> nonRecoverableExceptions;

private boolean recoverable;

FailureRateRestartBackoffTimeStrategy(
Clock clock, int maxFailuresPerInterval, long failuresIntervalMS, long backoffTimeMS) {
Clock clock,
int maxFailuresPerInterval,
long failuresIntervalMS,
long backoffTimeMS,
List<String> nonRecoverableExceptions) {

checkArgument(
maxFailuresPerInterval > 0,
Expand All @@ -57,12 +67,27 @@ public class FailureRateRestartBackoffTimeStrategy implements RestartBackoffTime
this.backoffTimeMS = backoffTimeMS;
this.maxFailuresPerInterval = maxFailuresPerInterval;
this.failureTimestamps = new ArrayDeque<>(maxFailuresPerInterval);
this.nonRecoverableExceptions = nonRecoverableExceptions;
this.strategyString = generateStrategyString();
this.clock = checkNotNull(clock);
this.recoverable = true;
}

FailureRateRestartBackoffTimeStrategy(
Clock clock, int maxFailuresPerInterval, long failuresIntervalMS, long backoffTimeMS) {
this(
clock,
maxFailuresPerInterval,
failuresIntervalMS,
backoffTimeMS,
Collections.emptyList());
}

@Override
public boolean canRestart() {
if (!recoverable) {
return false;
}
if (isFailureTimestampsQueueFull()) {
Long now = clock.absoluteTimeMillis();
Long earliestFailure = failureTimestamps.peek();
Expand All @@ -84,6 +109,10 @@ public void notifyFailure(Throwable cause) {
failureTimestamps.remove();
}
failureTimestamps.add(clock.absoluteTimeMillis());
String causeClassName = cause.getClass().getName();
if (nonRecoverableExceptions.contains(causeClassName)) {
recoverable = false;
}
}

@Override
Expand All @@ -103,6 +132,10 @@ private String generateStrategyString() {
str.append(backoffTimeMS);
str.append(",maxFailuresPerInterval=");
str.append(maxFailuresPerInterval);
if (!nonRecoverableExceptions.isEmpty()) {
str.append(", nonRecoverableExceptions=");
str.append(String.join("|", nonRecoverableExceptions));
}
str.append(")");

return str.toString();
Expand Down Expand Up @@ -139,12 +172,27 @@ public static class FailureRateRestartBackoffTimeStrategyFactory

private final long backoffTimeMS;

private final List<String> nonRecoverableExceptions;

public FailureRateRestartBackoffTimeStrategyFactory(
int maxFailuresPerInterval, long failuresIntervalMS, long backoffTimeMS) {
int maxFailuresPerInterval,
long failuresIntervalMS,
long backoffTimeMS,
List<String> nonRecoverableExceptions) {

this.maxFailuresPerInterval = maxFailuresPerInterval;
this.failuresIntervalMS = failuresIntervalMS;
this.backoffTimeMS = backoffTimeMS;
this.nonRecoverableExceptions = nonRecoverableExceptions;
}

public FailureRateRestartBackoffTimeStrategyFactory(
int maxFailuresPerInterval, long failuresIntervalMS, long backoffTimeMS) {
this(
maxFailuresPerInterval,
failuresIntervalMS,
backoffTimeMS,
Collections.emptyList());
}

@Override
Expand All @@ -153,7 +201,8 @@ public RestartBackoffTimeStrategy create() {
SystemClock.getInstance(),
maxFailuresPerInterval,
failuresIntervalMS,
backoffTimeMS);
backoffTimeMS,
nonRecoverableExceptions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;

import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
Expand All @@ -37,7 +40,14 @@ public class FixedDelayRestartBackoffTimeStrategy implements RestartBackoffTimeS

private int currentRestartAttempt;

FixedDelayRestartBackoffTimeStrategy(int maxNumberRestartAttempts, long backoffTimeMS) {
private final List<String> nonRecoverableExceptions;

private boolean recoverable;

FixedDelayRestartBackoffTimeStrategy(
int maxNumberRestartAttempts,
long backoffTimeMS,
List<String> nonRecoverableExceptions) {
checkArgument(
maxNumberRestartAttempts >= 0,
"Maximum number of restart attempts must be at least 0.");
Expand All @@ -47,12 +57,18 @@ public class FixedDelayRestartBackoffTimeStrategy implements RestartBackoffTimeS
this.maxNumberRestartAttempts = maxNumberRestartAttempts;
this.backoffTimeMS = backoffTimeMS;
this.currentRestartAttempt = 0;
this.nonRecoverableExceptions = nonRecoverableExceptions;
this.strategyString = generateStrategyString();
this.recoverable = true;
}

FixedDelayRestartBackoffTimeStrategy(int maxNumberRestartAttempts, long backoffTimeMS) {
this(maxNumberRestartAttempts, backoffTimeMS, Collections.emptyList());
}

@Override
public boolean canRestart() {
return currentRestartAttempt <= maxNumberRestartAttempts;
return recoverable && currentRestartAttempt <= maxNumberRestartAttempts;
}

@Override
Expand All @@ -67,6 +83,10 @@ public int getMaxNumberRestartAttempts() {
@Override
public void notifyFailure(Throwable cause) {
currentRestartAttempt++;
String causeClassName = cause.getClass().getName();
if (nonRecoverableExceptions.contains(causeClassName)) {
recoverable = false;
}
}

@Override
Expand All @@ -80,6 +100,10 @@ private String generateStrategyString() {
str.append(maxNumberRestartAttempts);
str.append(", backoffTimeMS=");
str.append(backoffTimeMS);
if (!nonRecoverableExceptions.isEmpty()) {
str.append(", nonRecoverableExceptions=");
str.append(String.join("|", nonRecoverableExceptions));
}
str.append(")");

return str.toString();
Expand All @@ -106,16 +130,26 @@ public static class FixedDelayRestartBackoffTimeStrategyFactory

private final long backoffTimeMS;

private final List<String> nonRecoverableExceptions;

public FixedDelayRestartBackoffTimeStrategyFactory(
int maxNumberRestartAttempts, long backoffTimeMS) {
int maxNumberRestartAttempts,
long backoffTimeMS,
List<String> nonRecoverableExceptions) {
this.maxNumberRestartAttempts = maxNumberRestartAttempts;
this.backoffTimeMS = backoffTimeMS;
this.nonRecoverableExceptions = nonRecoverableExceptions;
}

public FixedDelayRestartBackoffTimeStrategyFactory(
int maxNumberRestartAttempts, long backoffTimeMS) {
this(maxNumberRestartAttempts, backoffTimeMS, Collections.emptyList());
}

@Override
public RestartBackoffTimeStrategy create() {
return new FixedDelayRestartBackoffTimeStrategy(
maxNumberRestartAttempts, backoffTimeMS);
maxNumberRestartAttempts, backoffTimeMS, nonRecoverableExceptions);
}
}
}

0 comments on commit 401614e

Please sign in to comment.