diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index f35977b095320e..c39a7f92205406 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -124,7 +124,10 @@ public class JobManagerOptions { "More details can be found %s.", link( "../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy", - "here")) + "here")), + text("'region-fast': It behaves the same as 'region' but has better performance to " + + "determine tasks to restart. This improvement would help if the job scale is large. " + + "The side effect is longer region building time and more memory for cache.") ).build()); /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java index c73114e32c3044..0c415bcccfa142 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; import org.apache.flink.runtime.executiongraph.SchedulingUtils; +import org.apache.flink.runtime.executiongraph.failover.flip1.FastRestartPipelinedRegionStrategy; import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; @@ -72,12 +73,21 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { /** The versioner helps to maintain execution vertex versions. */ private final ExecutionVertexVersioner executionVertexVersioner; + /** Whether to use the {@link FastRestartPipelinedRegionStrategy} instead of + * {@link RestartPipelinedRegionStrategy} as the underlying failover strategy. */ + private final boolean fastMode; + /** The underlying new generation region failover strategy. */ private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this(executionGraph, false); + } + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph, final boolean fastMode) { this.executionGraph = checkNotNull(executionGraph); this.executionVertexVersioner = new ExecutionVertexVersioner(); + this.fastMode = fastMode; } @Override @@ -293,9 +303,16 @@ public void notifyNewVertices(final List newJobVerticesTopol // otherwise the failover topology will not be correctly built. // currently it's safe to add it here, as this method is invoked only once in production code. checkState(restartPipelinedRegionStrategy == null, "notifyNewVertices() must be called only once"); - this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy( - executionGraph.getFailoverTopology(), - executionGraph.getResultPartitionAvailabilityChecker()); + + if (fastMode) { + this.restartPipelinedRegionStrategy = new FastRestartPipelinedRegionStrategy( + executionGraph.getFailoverTopology(), + executionGraph.getResultPartitionAvailabilityChecker()); + } else { + this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy( + executionGraph.getFailoverTopology(), + executionGraph.getResultPartitionAvailabilityChecker()); + } } @Override @@ -312,9 +329,15 @@ public String getStrategyName() { */ public static class Factory implements FailoverStrategy.Factory { + private final boolean fastMode; + + Factory(final boolean fastMode) { + this.fastMode = fastMode; + } + @Override public FailoverStrategy create(final ExecutionGraph executionGraph) { - return new AdaptedRestartPipelinedRegionStrategyNG(executionGraph); + return new AdaptedRestartPipelinedRegionStrategyNG(executionGraph, fastMode); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java index e0818275fee092..77fa7693618551 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java @@ -41,6 +41,9 @@ public class FailoverStrategyLoader { /** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG}. */ public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region"; + /** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG} but use it in fast mode. */ + public static final String FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME = "region-fast"; + /** Config name for the {@link NoOpFailoverStrategy}. */ public static final String NO_OP_FAILOVER_STRATEGY = "noop"; @@ -70,7 +73,10 @@ public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config return new RestartAllStrategy.Factory(); case PIPELINED_REGION_RESTART_STRATEGY_NAME: - return new AdaptedRestartPipelinedRegionStrategyNG.Factory(); + return new AdaptedRestartPipelinedRegionStrategyNG.Factory(false); + + case FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME: + return new AdaptedRestartPipelinedRegionStrategyNG.Factory(true); case INDIVIDUAL_RESTART_STRATEGY_NAME: return new RestartIndividualStrategy.Factory(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java index 181743a25d37da..832a03544d738a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader; import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory; @@ -64,6 +65,10 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import java.io.IOException; import java.util.Collections; @@ -83,6 +88,7 @@ /** * Tests for region failover with multi regions. */ +@RunWith(Parameterized.class) public class RegionFailoverITCase extends TestLogger { private static final int FAIL_BASE = 1000; @@ -109,10 +115,21 @@ public class RegionFailoverITCase extends TestLogger { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @Parameter + public String failoverStrategyName; + + @Parameters(name = "[{index}] failover strategy: {0}") + public static Object[] failoverStrategies() { + return new Object[] { + FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME, + FailoverStrategyLoader.FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME + }; + } + @Before public void setup() throws Exception { Configuration configuration = new Configuration(); - configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region"); + configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, failoverStrategyName); configuration.setString(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName()); // global failover times: 3, region failover times: NUM_OF_RESTARTS diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java index bf12e111823b18..762dd888733922 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview; @@ -64,6 +65,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +94,6 @@ import java.util.stream.IntStream; import java.util.stream.LongStream; -import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; @@ -120,6 +122,7 @@ * lost results. * */ +@RunWith(Parameterized.class) public class BatchFineGrainedRecoveryITCase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(BatchFineGrainedRecoveryITCase.class); @@ -177,11 +180,22 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger { private static GlobalMapFailureTracker failureTracker; + @Parameterized.Parameter + public String failoverStrategyName; + + @Parameterized.Parameters(name = "[{index}] failover strategy: {0}") + public static Object[] failoverStrategies() { + return new Object[] { + FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME, + FailoverStrategyLoader.FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME + }; + } + @SuppressWarnings("OverlyBroadThrowsClause") @Before public void setup() throws Exception { Configuration configuration = new Configuration(); - configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, PIPELINED_REGION_RESTART_STRATEGY_NAME); + configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, failoverStrategyName); miniCluster = new TestingMiniCluster( new Builder()