From 36ef22bc3d5491c67165a3ee1d4e08663723260b Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 16 Feb 2016 01:15:39 +0100 Subject: [PATCH 1/2] [FLINK-3410] [restart] Choose NoRestart strategy if the number of retries is set to 0 Add test case --- .../flink/api/common/ExecutionConfig.java | 16 ++-- .../streaming/api/RestartStrategyTest.java | 96 +++++++++++++++++++ 2 files changed, 106 insertions(+), 6 deletions(-) create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index b31106f4a1908..1f07dd74c0dd6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -68,6 +68,8 @@ public class ExecutionConfig implements Serializable { */ public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE; + private static final long DEFAULT_RESTART_DELAY = 10000L; + // -------------------------------------------------------------------------------------------- /** Defines how data exchange happens - batch or pipelined */ @@ -81,7 +83,7 @@ public class ExecutionConfig implements Serializable { * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration */ @Deprecated - private int numberOfExecutionRetries = 0; + private int numberOfExecutionRetries = -1; private boolean forceKryo = false; @@ -106,7 +108,7 @@ public class ExecutionConfig implements Serializable { * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration */ @Deprecated - private long executionRetryDelay = 0; + private long executionRetryDelay = DEFAULT_RESTART_DELAY; private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration; @@ -284,6 +286,8 @@ public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() { // support the old API calls by creating a restart strategy from them if (getNumberOfExecutionRetries() > 0 && getExecutionRetryDelay() >= 0) { return RestartStrategies.fixedDelayRestart(getNumberOfExecutionRetries(), getExecutionRetryDelay()); + } else if (getNumberOfExecutionRetries() == 0) { + return RestartStrategies.noRestart(); } else { return null; } @@ -342,8 +346,8 @@ public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries) } /** - * Sets the delay between executions. A value of {@code -1} indicates that the default value - * should be used. + * Sets the delay between executions. + * * @param executionRetryDelay The number of milliseconds the system will wait to retry. * * @return The current execution configuration @@ -354,9 +358,9 @@ public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries) */ @Deprecated public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) { - if (executionRetryDelay < -1 ) { + if (executionRetryDelay < 0 ) { throw new IllegalArgumentException( - "The delay between reties must be non-negative, or -1 (use system default)"); + "The delay between reties must be non-negative."); } this.executionRetryDelay = executionRetryDelay; return this; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java new file mode 100644 index 0000000000000..9f75727f0cd14 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.junit.Assert; +import org.junit.Test; + +public class RestartStrategyTest { + + /** + * Tests that in a streaming use case where checkpointing is enabled, a + * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart + * strategy has been specified + */ + @Test + public void testAutomaticRestartingWhenCheckpointing() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(); + + env.fromElements(1).print(); + + StreamGraph graph = env.getStreamGraph(); + JobGraph jobGraph = graph.getJobGraph(); + + RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getRestartStrategyConfiguration(); + + Assert.assertNotNull(restartStrategy); + Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration); + Assert.assertEquals(Integer.MAX_VALUE, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy).getRestartAttempts()); + } + + /** + * Checks that in a streaming use case where checkpointing is enabled and the number + * of execution retries is set to 0, restarting is deactivated + */ + @Test + public void testNoRestartingWhenCheckpointingAndExplicitExecutionRetriesZero() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(); + env.setNumberOfExecutionRetries(0); + + env.fromElements(1).print(); + + StreamGraph graph = env.getStreamGraph(); + JobGraph jobGraph = graph.getJobGraph(); + + RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getRestartStrategyConfiguration(); + + Assert.assertNotNull(restartStrategy); + Assert.assertTrue(restartStrategy instanceof RestartStrategies.NoRestartStrategyConfiguration); + } + + /** + * Checks that in a streaming use case where checkpointing is enabled and the number + * of execution retries is set to 42 and the delay to 1337, fixed delay restarting is used. + */ + @Test + public void testFixedRestartingWhenCheckpointingAndExplicitExecutionRetriesNonZero() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(); + env.setNumberOfExecutionRetries(42); + env.getConfig().setExecutionRetryDelay(1337); + + env.fromElements(1).print(); + + StreamGraph graph = env.getStreamGraph(); + JobGraph jobGraph = graph.getJobGraph(); + + RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getRestartStrategyConfiguration(); + + Assert.assertNotNull(restartStrategy); + Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration); + Assert.assertEquals(42, ((RestartStrategies.FixedDelayRestartStrategyConfiguration)restartStrategy).getRestartAttempts()); + Assert.assertEquals(1337, ((RestartStrategies.FixedDelayRestartStrategyConfiguration)restartStrategy).getDelayBetweenAttempts()); + } +} From 1bcc73e40bc5c73781a54938150559705a55f8ac Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 16 Feb 2016 16:07:28 +0100 Subject: [PATCH 2/2] Fix typo --- .../main/java/org/apache/flink/api/common/ExecutionConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 1f07dd74c0dd6..a59268009826c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -360,7 +360,7 @@ public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries) public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) { if (executionRetryDelay < 0 ) { throw new IllegalArgumentException( - "The delay between reties must be non-negative."); + "The delay between retries must be non-negative."); } this.executionRetryDelay = executionRetryDelay; return this;