Skip to content

Commit

Permalink
[FLINK-8977] [e2e] Allow configuring restart strategy for general pur…
Browse files Browse the repository at this point in the history
…pose DataStream job
  • Loading branch information
tzulitai committed May 22, 2018
1 parent d7ec5a9 commit 22e400d
Showing 1 changed file with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
* <li>environment.externalize_checkpoint.cleanup (String, default - 'retain'): Configures the cleanup mode for externalized checkpoints. Can be 'retain' or 'delete'.</li>
* <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li>
* <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li>
* <li>environment.restart_strategy.delay (long, default - 0): delay between restart attempts, in milliseconds.</li>
* <li>environment.restart_strategy (String, default - 'fixed_delay'): The failure restart strategy to use. Can be 'fixed_delay' or 'no_restart'.</li>
* <li>environment.restart_strategy.fixed_delay.attempts (Integer, default - Integer.MAX_VALUE): The number of allowed attempts to restart the job, when using 'fixed_delay' restart.</li>
* <li>environment.restart_strategy.fixed_delay.delay (long, default - 0): delay between restart attempts, in milliseconds, when using 'fixed_delay' restart.</li>
* <li>state_backend (String, default - 'file'): Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li>
* <li>state_backend.checkpoint_directory (String): The checkpoint directory.</li>
* <li>state_backend.rocks.incremental (boolean, default - false): Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.</li>
Expand Down Expand Up @@ -124,9 +126,17 @@ class DataStreamAllroundTestJobFactory {
.key("environment.max_parallelism")
.defaultValue(128);

private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = ConfigOptions
.key("environment.restart_strategy.delay")
.defaultValue(0);
private static final ConfigOption<String> ENVIRONMENT_RESTART_STRATEGY = ConfigOptions
.key("environment.restart_strategy")
.defaultValue("fixed_delay");

private static final ConfigOption<Integer> ENVIRONMENT_RESTART_STRATEGY_FIXED_ATTEMPTS = ConfigOptions
.key("environment.restart_strategy.fixed_delay.attempts")
.defaultValue(Integer.MAX_VALUE);

private static final ConfigOption<Long> ENVIRONMENT_RESTART_STRATEGY_FIXED_DELAY = ConfigOptions
.key("environment.restart_strategy.fixed.delay")
.defaultValue(0L);

private static final ConfigOption<Boolean> ENVIRONMENT_EXTERNALIZE_CHECKPOINT = ConfigOptions
.key("environment.externalize_checkpoint")
Expand Down Expand Up @@ -199,9 +209,27 @@ static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) t
env.setMaxParallelism(pt.getInt(ENVIRONMENT_MAX_PARALLELISM.key(), ENVIRONMENT_MAX_PARALLELISM.defaultValue()));

// restart strategy
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
Integer.MAX_VALUE,
pt.getInt(ENVIRONMENT_RESTART_DELAY.key(), ENVIRONMENT_RESTART_DELAY.defaultValue())));
String restartStrategyConfig = pt.get(ENVIRONMENT_RESTART_STRATEGY.key());
if (restartStrategyConfig != null) {
RestartStrategies.RestartStrategyConfiguration restartStrategy;
switch (restartStrategyConfig) {
case "fixed_delay":
restartStrategy = RestartStrategies.fixedDelayRestart(
pt.getInt(
ENVIRONMENT_RESTART_STRATEGY_FIXED_ATTEMPTS.key(),
ENVIRONMENT_RESTART_STRATEGY_FIXED_ATTEMPTS.defaultValue()),
pt.getLong(
ENVIRONMENT_RESTART_STRATEGY_FIXED_DELAY.key(),
ENVIRONMENT_RESTART_STRATEGY_FIXED_DELAY.defaultValue()));
break;
case "no_restart":
restartStrategy = RestartStrategies.noRestart();
break;
default:
throw new IllegalArgumentException("Unkown restart strategy: " + restartStrategyConfig);
}
env.setRestartStrategy(restartStrategy);
}

// state backend
final String stateBackend = pt.get(
Expand Down

0 comments on commit 22e400d

Please sign in to comment.