From f511ac9adc789f75761f2f997c294966cfe05b93 Mon Sep 17 00:00:00 2001 From: Nagarjun Date: Wed, 5 Oct 2016 00:26:05 -0700 Subject: [PATCH 1/2] FLINK-4596: Fallback restart strategy config to let jobs choose restart configuration set at cluster level --- .../common/restartstrategy/RestartStrategies.java | 13 +++++++++++++ .../restart/RestartStrategyFactory.java | 2 ++ .../flink/runtime/jobmanager/JobManager.scala | 7 ++++--- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java index d5db466e7a9ff..032163853974b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java @@ -42,6 +42,10 @@ public static RestartStrategyConfiguration noRestart() { return new NoRestartStrategyConfiguration(); } + public static RestartStrategyConfiguration fallBackRestart() { + return new FallbackRestartStrategyConfiguration(); + } + /** * Generates a FixedDelayRestartStrategyConfiguration. * @@ -173,4 +177,13 @@ public String getDescription() { + " and fixed delay " + delayBetweenAttemptsInterval.toString(); } } + + final public static class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration{ + private static final long serialVersionUID = -4441787204284085544L; + + @Override + public String getDescription() { + return "Cluster Default Restart"; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java index 870bf632861f5..27ee9b61eec07 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java @@ -66,6 +66,8 @@ public static RestartStrategy createRestartStrategy(RestartStrategies.RestartStr config.getFailureInterval(), config.getDelayBetweenAttemptsInterval() ); + } else if (restartStrategyConfiguration instanceof RestartStrategies.FallbackRestartStrategyConfiguration) { + return null; } else { throw new IllegalArgumentException("Unknown restart strategy configuration " + restartStrategyConfiguration + "."); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 01f9cec4646aa..7bb604c740161 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1114,7 +1114,8 @@ class JobManager( Option(jobGraph.getSerializedExecutionConfig() .deserializeValue(userCodeLoader) .getRestartStrategy()) - .map(RestartStrategyFactory.createRestartStrategy(_)) match { + .map(RestartStrategyFactory.createRestartStrategy(_)) + .filter(p => p != null) match { case Some(strategy) => strategy case None => restartStrategyFactory.createRestartStrategy() } @@ -1539,7 +1540,7 @@ class JobManager( case _ => unhandled(actorMsg) } } - + /** * Handle unmatched messages with an exception. */ @@ -2750,7 +2751,7 @@ object JobManager { case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName) case None => actorSystem.actorOf(jobManagerProps) } - + metricsRegistry match { case Some(registry) => registry.startQueryService(actorSystem) From a25f7b62c11c6ab9bd7586003833c1263bc1157b Mon Sep 17 00:00:00 2001 From: Nagarjun Date: Thu, 3 Nov 2016 23:29:57 -0700 Subject: [PATCH 2/2] Added java doc for fallback restart strategy --- .../flink/api/common/restartstrategy/RestartStrategies.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java index 032163853974b..72ee13217351e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java @@ -178,12 +178,15 @@ public String getDescription() { } } + /** + * Restart strategy configuration that could be used by jobs to use cluster level restart strategy. Useful especially when one has a custom implementation of restart strategy set via yaml + */ final public static class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration{ private static final long serialVersionUID = -4441787204284085544L; @Override public String getDescription() { - return "Cluster Default Restart"; + return "Cluster level default restart strategy"; } } }