diff --git a/docs/_includes/generated/job_manager_configuration.html b/docs/_includes/generated/job_manager_configuration.html
index 0a60f282adb09..204f9fc12d50e 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -20,7 +20,7 @@
jobmanager.execution.failover-strategy |
"full" |
- This option specifies how the job computation recovers from task failures. Accepted values are:- 'full': Restarts all tasks to recover the job.
- 'region': Restarts all tasks that could be affected by the task failure. More details can be found here.
|
+ This option specifies how the job computation recovers from task failures. Accepted values are:- 'full': Restarts all tasks to recover the job.
- 'region': Restarts all tasks that could be affected by the task failure. More details can be found here.
- 'region-fast': It behaves the same as 'region' but has better performance to determine tasks to restart. This improvement would help if the job scale is large. The side effect is longer region building time and more memory for cache.
|
jobmanager.heap.size |
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index f35977b095320..c39a7f9220540 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -124,7 +124,10 @@ public class JobManagerOptions {
"More details can be found %s.",
link(
"../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy",
- "here"))
+ "here")),
+ text("'region-fast': It behaves the same as 'region' but has better performance to " +
+ "determine tasks to restart. This improvement would help if the job scale is large. " +
+ "The side effect is longer region building time and more memory for cache.")
).build());
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
index c73114e32c304..0c415bcccfa14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
@@ -28,6 +28,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.executiongraph.SchedulingUtils;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FastRestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
@@ -72,12 +73,21 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
/** The versioner helps to maintain execution vertex versions. */
private final ExecutionVertexVersioner executionVertexVersioner;
+ /** Whether to use the {@link FastRestartPipelinedRegionStrategy} instead of
+ * {@link RestartPipelinedRegionStrategy} as the underlying failover strategy. */
+ private final boolean fastMode;
+
/** The underlying new generation region failover strategy. */
private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) {
+ this(executionGraph, false);
+ }
+
+ public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph, final boolean fastMode) {
this.executionGraph = checkNotNull(executionGraph);
this.executionVertexVersioner = new ExecutionVertexVersioner();
+ this.fastMode = fastMode;
}
@Override
@@ -293,9 +303,16 @@ public void notifyNewVertices(final List newJobVerticesTopol
// otherwise the failover topology will not be correctly built.
// currently it's safe to add it here, as this method is invoked only once in production code.
checkState(restartPipelinedRegionStrategy == null, "notifyNewVertices() must be called only once");
- this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(
- executionGraph.getFailoverTopology(),
- executionGraph.getResultPartitionAvailabilityChecker());
+
+ if (fastMode) {
+ this.restartPipelinedRegionStrategy = new FastRestartPipelinedRegionStrategy(
+ executionGraph.getFailoverTopology(),
+ executionGraph.getResultPartitionAvailabilityChecker());
+ } else {
+ this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(
+ executionGraph.getFailoverTopology(),
+ executionGraph.getResultPartitionAvailabilityChecker());
+ }
}
@Override
@@ -312,9 +329,15 @@ public String getStrategyName() {
*/
public static class Factory implements FailoverStrategy.Factory {
+ private final boolean fastMode;
+
+ Factory(final boolean fastMode) {
+ this.fastMode = fastMode;
+ }
+
@Override
public FailoverStrategy create(final ExecutionGraph executionGraph) {
- return new AdaptedRestartPipelinedRegionStrategyNG(executionGraph);
+ return new AdaptedRestartPipelinedRegionStrategyNG(executionGraph, fastMode);
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
index e0818275fee09..77fa769361855 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
@@ -41,6 +41,9 @@ public class FailoverStrategyLoader {
/** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG}. */
public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region";
+ /** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG} but use it in fast mode. */
+ public static final String FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME = "region-fast";
+
/** Config name for the {@link NoOpFailoverStrategy}. */
public static final String NO_OP_FAILOVER_STRATEGY = "noop";
@@ -70,7 +73,10 @@ public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config
return new RestartAllStrategy.Factory();
case PIPELINED_REGION_RESTART_STRATEGY_NAME:
- return new AdaptedRestartPipelinedRegionStrategyNG.Factory();
+ return new AdaptedRestartPipelinedRegionStrategyNG.Factory(false);
+
+ case FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME:
+ return new AdaptedRestartPipelinedRegionStrategyNG.Factory(true);
case INDIVIDUAL_RESTART_STRATEGY_NAME:
return new RestartIndividualStrategy.Factory();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index 181743a25d37d..832a03544d738 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -37,6 +37,7 @@
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
@@ -64,6 +65,10 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.util.Collections;
@@ -83,6 +88,7 @@
/**
* Tests for region failover with multi regions.
*/
+@RunWith(Parameterized.class)
public class RegionFailoverITCase extends TestLogger {
private static final int FAIL_BASE = 1000;
@@ -109,10 +115,21 @@ public class RegionFailoverITCase extends TestLogger {
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+ @Parameter
+ public String failoverStrategyName;
+
+ @Parameters(name = "[{index}] failover strategy: {0}")
+ public static Object[] failoverStrategies() {
+ return new Object[] {
+ FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME,
+ FailoverStrategyLoader.FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME
+ };
+ }
+
@Before
public void setup() throws Exception {
Configuration configuration = new Configuration();
- configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
+ configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, failoverStrategyName);
configuration.setString(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName());
// global failover times: 3, region failover times: NUM_OF_RESTARTS
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
index bf12e111823b1..762dd88873392 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
@@ -30,6 +30,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
@@ -64,6 +65,8 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,7 +94,6 @@
import java.util.stream.IntStream;
import java.util.stream.LongStream;
-import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -120,6 +122,7 @@
* lost results.
*
*/
+@RunWith(Parameterized.class)
public class BatchFineGrainedRecoveryITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(BatchFineGrainedRecoveryITCase.class);
@@ -177,11 +180,22 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
private static GlobalMapFailureTracker failureTracker;
+ @Parameterized.Parameter
+ public String failoverStrategyName;
+
+ @Parameterized.Parameters(name = "[{index}] failover strategy: {0}")
+ public static Object[] failoverStrategies() {
+ return new Object[] {
+ FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME,
+ FailoverStrategyLoader.FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME
+ };
+ }
+
@SuppressWarnings("OverlyBroadThrowsClause")
@Before
public void setup() throws Exception {
Configuration configuration = new Configuration();
- configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, PIPELINED_REGION_RESTART_STRATEGY_NAME);
+ configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, failoverStrategyName);
miniCluster = new TestingMiniCluster(
new Builder()