Skip to content

Commit

Permalink
[FLINK-3190] failure rate restart strategy
Browse files Browse the repository at this point in the history
Add toString method to Time

Reintroduce org.apache.flink.streaming.api.windowing.time.Time for backwards compatibility

Remove Duration from FailureRateRestarStrategy

This closes #1954.
  • Loading branch information
fijolekProjects authored and tillrohrmann committed Jul 11, 2016
1 parent 81cf229 commit a7274d5
Show file tree
Hide file tree
Showing 16 changed files with 734 additions and 344 deletions.
91 changes: 83 additions & 8 deletions docs/apis/streaming/fault_tolerance.md
Expand Up @@ -243,6 +243,10 @@ The description of each restart strategy contains more information about the res
<td>Fixed delay</td> <td>Fixed delay</td>
<td>fixed-delay</td> <td>fixed-delay</td>
</tr> </tr>
<tr>
<td>Failure rate</td>
<td>failure-rate</td>
</tr>
<tr> <tr>
<td>No restart</td> <td>No restart</td>
<td>none</td> <td>none</td>
Expand All @@ -261,18 +265,18 @@ In case of a failure the system tries to restart the job 3 times and waits 10 se
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
{% highlight java %} {% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelay( env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts 3, // number of restart attempts
10000 // delay in milliseconds Time.of(10, TimeUnit.SECONDS) // delay
)); ));
{% endhighlight %} {% endhighlight %}
</div> </div>
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment() val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelay( env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts 3, // number of restart attempts
10000 // delay in milliseconds Time.of(10, TimeUnit.SECONDS) // delay
)) ))
{% endhighlight %} {% endhighlight %}
</div> </div>
Expand Down Expand Up @@ -325,18 +329,18 @@ The fixed delay restart strategy can also be set programmatically:
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
{% highlight java %} {% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelay( env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts 3, // number of restart attempts
10000 // delay in milliseconds Time.of(10, TimeUnit.SECONDS) // delay
)); ));
{% endhighlight %} {% endhighlight %}
</div> </div>
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment() val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelay( env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts 3, // number of restart attempts
10000 // delay in milliseconds Time.of(10, TimeUnit.SECONDS) // delay
)) ))
{% endhighlight %} {% endhighlight %}
</div> </div>
Expand All @@ -358,6 +362,77 @@ The default value is the value of *akka.ask.timeout*.


{% top %} {% top %}


### Failure Rate Restart Strategy

The failure rate restart strategy restarts job after failure, but when `failure rate` (failures per time interval) is exceeded, the job eventually fails.
In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.

This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.

~~~
restart-strategy: failure-rate
~~~

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 40%">Configuration Parameter</th>
<th class="text-left" style="width: 40%">Description</th>
<th class="text-left">Default Value</th>
</tr>
</thead>
<tbody>
<tr>
<td><it>restart-strategy.failure-rate.max-failures-per-interval</it></td>
<td>Maximum number of restarts in given time interval before failing a job</td>
<td>1</td>
</tr>
<tr>
<td><it>restart-strategy.failure-rate.failure-rate-interval</it></td>
<td>Time interval for measuring failure rate.</td>
<td>1 minute</td>
</tr>
<tr>
<td><it>restart-strategy.failure-rate.delay</it></td>
<td>Delay between two consecutive restart attempts</td>
<td><it>akka.ask.timeout</it></td>
</tr>
</tbody>
</table>

~~~
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
~~~

The failure rate restart strategy can also be set programmatically:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per interval
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
))
{% endhighlight %}
</div>
</div>

{% top %}

### No Restart Strategy ### No Restart Strategy


The job fails directly and no restart is attempted. The job fails directly and no restart is attempted.
Expand Down
12 changes: 11 additions & 1 deletion docs/setup/config.md
Expand Up @@ -132,8 +132,9 @@ If you are on YARN, then it is sufficient to authenticate the client with Kerber
- `blob.server.port`: Port definition for the blob server (serving user jar's) on the Taskmanagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine. - `blob.server.port`: Port definition for the blob server (serving user jar's) on the Taskmanagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.


- `restart-strategy`: Default restart strategy to use in case that no restart strategy has been specified for the submitted job. - `restart-strategy`: Default restart strategy to use in case that no restart strategy has been specified for the submitted job.
Currently, it can be chosen between using a fixed delay restart strategy and to turn it off. Currently, it can be chosen from fixed delay restart strategy, failure rate restart strategy or no restart strategy.
To use the fixed delay strategy you have to specify "fixed-delay". To use the fixed delay strategy you have to specify "fixed-delay".
To use the failure rate strategy you have to specify "failure-rate".
To turn the restart behaviour off you have to specify "none". To turn the restart behaviour off you have to specify "none".
Default value "none". Default value "none".


Expand All @@ -143,6 +144,15 @@ Default value is 1.
- `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay". - `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay".
Default value is the `akka.ask.timeout`. Default value is the `akka.ask.timeout`.


- `restart-strategy.failure-rate.max-failures-per-interval`: Maximum number of restarts in given time interval before failing a job in "failure-rate" strategy.
Default value is 1.

- `restart-strategy.failure-rate.failure-rate-interval`: Time interval for measuring failure rate in "failure-rate" strategy.
Default value is `1 minute`.

- `restart-strategy.failure-rate.delay`: Delay between restart attempts, used if the default restart strategy is set to "failure-rate".
Default value is the `akka.ask.timeout`.

## Full Reference ## Full Reference


### HDFS ### HDFS
Expand Down
Expand Up @@ -19,8 +19,10 @@
package org.apache.flink.api.common.restartstrategy; package org.apache.flink.api.common.restartstrategy;


import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.time.Time;


import java.io.Serializable; import java.io.Serializable;
import java.util.concurrent.TimeUnit;


/** /**
* This class defines methods to generate RestartStrategyConfigurations. These configurations are * This class defines methods to generate RestartStrategyConfigurations. These configurations are
Expand All @@ -47,11 +49,31 @@ public static RestartStrategyConfiguration noRestart() {
* @param delayBetweenAttempts Delay in-between restart attempts for the FixedDelayRestartStrategy * @param delayBetweenAttempts Delay in-between restart attempts for the FixedDelayRestartStrategy
* @return FixedDelayRestartStrategy * @return FixedDelayRestartStrategy
*/ */
public static RestartStrategyConfiguration fixedDelayRestart( public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, long delayBetweenAttempts) {
int restartAttempts, return fixedDelayRestart(restartAttempts, Time.of(delayBetweenAttempts, TimeUnit.MILLISECONDS));
long delayBetweenAttempts) { }

/**
* Generates a FixedDelayRestartStrategyConfiguration.
*
* @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy
* @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy
* @return FixedDelayRestartStrategy
*/
public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayInterval) {
return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayInterval);
}


return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayBetweenAttempts); /**
* Generates a FailureRateRestartStrategyConfiguration.
*
* @param failureRate Maximum number of restarts in given interval {@code failureInterval} before failing a job
* @param failureInterval Time interval for failures
* @param delayInterval Delay in-between restart attempts
*/
public static FailureRateRestartStrategyConfiguration failureRateRestart(
int failureRate, Time failureInterval, Time delayInterval) {
return new FailureRateRestartStrategyConfiguration(failureRate, failureInterval, delayInterval);
} }


public abstract static class RestartStrategyConfiguration implements Serializable { public abstract static class RestartStrategyConfiguration implements Serializable {
Expand Down Expand Up @@ -80,41 +102,75 @@ final public static class FixedDelayRestartStrategyConfiguration extends Restart
private static final long serialVersionUID = 4149870149673363190L; private static final long serialVersionUID = 4149870149673363190L;


private final int restartAttempts; private final int restartAttempts;
private final long delayBetweenAttempts; private final Time delayBetweenAttemptsInterval;


FixedDelayRestartStrategyConfiguration(int restartAttempts, long delayBetweenAttempts) { FixedDelayRestartStrategyConfiguration(int restartAttempts, Time delayBetweenAttemptsInterval) {
this.restartAttempts = restartAttempts; this.restartAttempts = restartAttempts;
this.delayBetweenAttempts = delayBetweenAttempts; this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
} }


public int getRestartAttempts() { public int getRestartAttempts() {
return restartAttempts; return restartAttempts;
} }


public long getDelayBetweenAttempts() { public Time getDelayBetweenAttemptsInterval() {
return delayBetweenAttempts; return delayBetweenAttemptsInterval;
} }


@Override @Override
public int hashCode() { public int hashCode() {
return 31 * restartAttempts + (int)(delayBetweenAttempts ^ (delayBetweenAttempts >>> 32)); int result = restartAttempts;
result = 31 * result + (delayBetweenAttemptsInterval != null ? delayBetweenAttemptsInterval.hashCode() : 0);
return result;
} }


@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (obj instanceof FixedDelayRestartStrategyConfiguration) { if (obj instanceof FixedDelayRestartStrategyConfiguration) {
FixedDelayRestartStrategyConfiguration other = (FixedDelayRestartStrategyConfiguration) obj; FixedDelayRestartStrategyConfiguration other = (FixedDelayRestartStrategyConfiguration) obj;


return restartAttempts == other.restartAttempts && delayBetweenAttempts == other.delayBetweenAttempts; return restartAttempts == other.restartAttempts && delayBetweenAttemptsInterval.equals(other.delayBetweenAttemptsInterval);
} else { } else {
return false; return false;
} }
} }


@Override @Override
public String getDescription() { public String getDescription() {
return "Restart with fixed delay (" + delayBetweenAttempts + " ms). #" return "Restart with fixed delay (" + delayBetweenAttemptsInterval + " ms). #"
+ restartAttempts + " restart attempts."; + restartAttempts + " restart attempts.";
} }
} }

final public static class FailureRateRestartStrategyConfiguration extends RestartStrategyConfiguration {
private static final long serialVersionUID = 1195028697539661739L;
private final int maxFailureRate;

private final Time failureInterval;
private final Time delayBetweenAttemptsInterval;

public FailureRateRestartStrategyConfiguration(int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) {
this.maxFailureRate = maxFailureRate;
this.failureInterval = failureInterval;
this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
}

public int getMaxFailureRate() {
return maxFailureRate;
}

public Time getFailureInterval() {
return failureInterval;
}

public Time getDelayBetweenAttemptsInterval() {
return delayBetweenAttemptsInterval;
}

@Override
public String getDescription() {
return "Failure rate restart with maximum of " + maxFailureRate + " failures within interval " + failureInterval.toString()
+ " and fixed delay " + delayBetweenAttemptsInterval.toString();
}
}
} }

0 comments on commit a7274d5

Please sign in to comment.