Skip to content

Commit

Permalink
[FLINK-3187] [restart] Introduce RestartStrategy to ExecutionGraph
Browse files Browse the repository at this point in the history
A RestartStrategy defines how the ExecutionGraph reacts in case of a restart. Different strategies
are conceivable. For example, no restart, fixed delay restart, exponential backoff restart, scaling
in/out restart, etc.

Expose RestartStrategy to user API

This removes the setNumberExecutionRetries and the setDelayBetweenRetries on the ExecutionEnvironment and
the ExecutionConfig. Instead the more general RestartStrategy can be set. In order to maintain the
separation between the runtime and api module, one sets a RestartStrategyConfiguration which is transformed
into a RestartStrategy on the JobManager.

Replace old execution-retries configuration parameters by restart-strategy.

Add FixedDelayRestartStrategy test case

Reintroduce old configuration values and API calls for the deprecated restart mechanism

The old configuration values and API calls will be respected if no explicit
RestartStrategy has been set. The values, if correct, are used to instantiate
a FixedDelayRestartStrategy.

Add deprecation comments to the JavaDocs

Add logging statement for job recovery

Fix JobManagerProcessFailureBatchRecoveryITCase by introducing a job recovery timeout

Add proper annotations to RestartStrategies

Let ExecutionGraphRestartTest extend TestLogger

This closes #1470.
  • Loading branch information
tillrohrmann committed Feb 15, 2016
1 parent b17632d commit 5eae47f
Show file tree
Hide file tree
Showing 73 changed files with 1,314 additions and 434 deletions.
168 changes: 168 additions & 0 deletions docs/apis/streaming/fault_tolerance.md
Expand Up @@ -194,3 +194,171 @@ state updates) of Flink coupled with bundled sinks:
</table>

{% top %}

Restart Strategies
------------------

Flink supports different restart strategies which control how the jobs are restarted in case of a failure.
The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined.
In case that the job is submitted with a restart strategy, this strategy overrides the cluster's default setting.

The default restart strategy is set via Flink's configuration file `flink-conf.yaml`.
The configuration parameter *restart-strategy* defines which strategy is taken.
Per default, the no-restart strategy is used.
See the following list of available restart strategies to learn what values are supported.

Each restart strategy comes with its own set of parameters which control its behaviour.
These values are also set in the configuration file.
The description of each restart strategy contains more information about the respective configuration values.

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 50%">Restart Strategy</th>
<th class="text-left">Value for restart-strategy</th>
</tr>
</thead>
<tbody>
<tr>
<td>Fixed delay</td>
<td>fixed-delay</td>
</tr>
<tr>
<td>No restart</td>
<td>none</td>
</tr>
</tbody>
</table>

Apart from defining a default restart strategy, it is possible to define for each Flink job a specific restart strategy.
This restart strategy is set programmatically by calling the `setRestartStrategy` method on the `ExecutionEnvironment`.
Note that this also works for the `StreamExecutionEnvironment`.

The following example shows how we can set a fixed delay restart strategy for our job.
In case of a failure the system tries to restart the job 3 times and waits 10 seconds in-between successive restart attempts.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelay(
3, // number of restart attempts
10000 // delay in milliseconds
));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelay(
3, // number of restart attempts
10000 // delay in milliseconds
))
{% endhighlight %}
</div>
</div>

## Fixed Delay Restart Strategy

The fixed delay restart strategy attempts a given number of times to restart the job.
If the maximum number of attempts is exceeded, the job eventually fails.
In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.

This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.

~~~
restart-strategy: fixed-delay
~~~

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 40%">Configuration Parameter</th>
<th class="text-left" style="width: 40%">Description</th>
<th class="text-left">Default Value</th>
</tr>
</thead>
<tbody>
<tr>
<td><it>restart-strategy.fixed-delay.attempts</it></td>
<td>Number of restart attempts</td>
<td>1</td>
</tr>
<tr>
<td><it>restart-strategy.fixed-delay.delay</it></td>
<td>Delay between two consecutive restart attempts</td>
<td><it>akka.ask.timeout</it></td>
</tr>
</tbody>
</table>

~~~
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
~~~

The fixed delay restart strategy can also be set programmatically:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelay(
3, // number of restart attempts
10000 // delay in milliseconds
));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelay(
3, // number of restart attempts
10000 // delay in milliseconds
))
{% endhighlight %}
</div>
</div>

### Restart Attempts

The number of times that Flink retries the execution before the job is declared as failed is configurable via the *restart-strategy.fixed-delay.attempts* parameter.

The default value is **1**.

### Retry Delays

Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start immediately, but only after a certain delay.

Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted.

The default value is the value of *akka.ask.timeout*.

## No Restart Strategy

The job fails directly and no restart is attempted.

~~~
restart-strategy: none
~~~

The no restart strategy can also be set programmatically:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
{% endhighlight %}
</div>
</div>

[Back to top](#top)


2 changes: 1 addition & 1 deletion docs/internals/monitoring_rest_api.md
Expand Up @@ -246,7 +246,7 @@ Sample Result:
"name": "WordCount Example",
"execution-config": {
"execution-mode": "PIPELINED",
"max-execution-retries": -1,
"restart-strategy": "Restart deactivated",
"job-parallelism": -1,
"object-reuse-mode": false
}
Expand Down
16 changes: 13 additions & 3 deletions docs/setup/config.md
Expand Up @@ -118,9 +118,17 @@ If you are on YARN, then it is sufficient to authenticate the client with Kerber

- `blob.server.port`: Port definition for the blob server (serving user jar's) on the Taskmanagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.

- `execution-retries.delay`: Delay between execution retries. Default value "5 s". Note that values have to be specified as strings with a unit.

- `execution-retries.default`: Default number of execution retries, used by jobs that do not explicitly specify that value on the execution environment. Default value is zero.
- `restart-strategy`: Default restart strategy to use in case that no restart strategy has been specified for the submitted job.
Currently, it can be chosen between using a fixed delay restart strategy and to turn it off.
To use the fixed delay strategy you have to specify "fixed-delay".
To turn the restart behaviour off you have to specify "none".
Default value "none".

- `restart-strategy.fixed-delay.attempts`: Number of restart attempts, used if the default restart strategy is set to "fixed-delay".
Default value is 1.

- `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay".
Default value is the `akka.ask.timeout`.

## Full Reference

Expand Down Expand Up @@ -247,6 +255,8 @@ For example when running Flink on YARN on an environment with a restrictive fire

- `recovery.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.

- `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation.

## Background

### Configuring the Network Buffers
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.esotericsoftware.kryo.Serializer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;

import java.io.Serializable;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -76,7 +77,11 @@ public class ExecutionConfig implements Serializable {

private int parallelism = -1;

private int numberOfExecutionRetries = -1;
/**
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
*/
@Deprecated
private int numberOfExecutionRetries = 0;

private boolean forceKryo = false;

Expand All @@ -96,9 +101,15 @@ public class ExecutionConfig implements Serializable {
private long autoWatermarkInterval = 0;

private boolean timestampsEnabled = false;

private long executionRetryDelay = -1;

/**
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
*/
@Deprecated
private long executionRetryDelay = 0;

private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration;

// Serializers and types registered with Kryo and the PojoSerializer
// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.

Expand Down Expand Up @@ -244,20 +255,65 @@ public ExecutionConfig setParallelism(int parallelism) {
return this;
}

/**
* Sets the restart strategy to be used for recovery.
*
* <pre>{@code
* ExecutionConfig config = env.getConfig();
*
* config.setRestartStrategy(RestartStrategies.fixedDelayRestart(
* 10, // number of retries
* 1000 // delay between retries));
* }</pre>
*
* @param restartStrategyConfiguration Configuration defining the restart strategy to use
*/
@PublicEvolving
public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
this.restartStrategyConfiguration = restartStrategyConfiguration;
}

/**
* Returns the restart strategy which has been set for the current job.
*
* @return The specified restart configuration
*/
@PublicEvolving
public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
if (restartStrategyConfiguration == null) {
// support the old API calls by creating a restart strategy from them
if (getNumberOfExecutionRetries() > 0 && getExecutionRetryDelay() >= 0) {
return RestartStrategies.fixedDelayRestart(getNumberOfExecutionRetries(), getExecutionRetryDelay());
} else {
return null;
}
} else {
return restartStrategyConfiguration;
}
}

/**
* Gets the number of times the system will try to re-execute failed tasks. A value
* of {@code -1} indicates that the system default value (as defined in the configuration)
* should be used.
*
* @return The number of times the system will try to re-execute failed tasks.
*
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
*/
@Deprecated
public int getNumberOfExecutionRetries() {
return numberOfExecutionRetries;
}

/**
* Returns the delay between execution retries.
*
* @return The delay between successive execution retries in milliseconds.
*
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
*/
@Deprecated
public long getExecutionRetryDelay() {
return executionRetryDelay;
}
Expand All @@ -268,11 +324,18 @@ public long getExecutionRetryDelay() {
* default value (as defined in the configuration) should be used.
*
* @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
*
* @return The current execution configuration
*
* @deprecated This method will be replaced by {@link #setRestartStrategy}. The
* {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of
* execution retries.
*/
@Deprecated
public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries) {
if (numberOfExecutionRetries < -1) {
throw new IllegalArgumentException(
"The number of execution retries must be non-negative, or -1 (use system default)");
"The number of execution retries must be non-negative, or -1 (use system default)");
}
this.numberOfExecutionRetries = numberOfExecutionRetries;
return this;
Expand All @@ -282,15 +345,23 @@ public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries)
* Sets the delay between executions. A value of {@code -1} indicates that the default value
* should be used.
* @param executionRetryDelay The number of milliseconds the system will wait to retry.
*
* @return The current execution configuration
*
* @deprecated This method will be replaced by {@link #setRestartStrategy}. The
* {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the delay between
* successive execution attempts.
*/
@Deprecated
public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) {
if (executionRetryDelay < -1 ) {
throw new IllegalArgumentException(
"The delay between reties must be non-negative, or -1 (use system default)");
"The delay between reties must be non-negative, or -1 (use system default)");
}
this.executionRetryDelay = executionRetryDelay;
return this;
}

/**
* Sets the execution mode to execute the program. The execution mode defines whether
* data exchanges are performed in a batch or on a pipelined manner.
Expand Down Expand Up @@ -614,7 +685,7 @@ public boolean equals(Object obj) {
Objects.equals(executionMode, other.executionMode) &&
useClosureCleaner == other.useClosureCleaner &&
parallelism == other.parallelism &&
numberOfExecutionRetries == other.numberOfExecutionRetries &&
restartStrategyConfiguration.equals(other.restartStrategyConfiguration) &&
forceKryo == other.forceKryo &&
objectReuse == other.objectReuse &&
autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled &&
Expand All @@ -640,7 +711,7 @@ public int hashCode() {
executionMode,
useClosureCleaner,
parallelism,
numberOfExecutionRetries,
restartStrategyConfiguration,
forceKryo,
objectReuse,
autoTypeRegistrationEnabled,
Expand Down

0 comments on commit 5eae47f

Please sign in to comment.