Skip to content

Commit

Permalink
[FLINK-13056][runtime] Enabled fast region failover strategy for lega…
Browse files Browse the repository at this point in the history
…cy scheduler
  • Loading branch information
zhuzhurk committed Oct 29, 2019
1 parent 176de01 commit ef600d0
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 9 deletions.
Expand Up @@ -124,7 +124,10 @@ public class JobManagerOptions {
"More details can be found %s.",
link(
"../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy",
"here"))
"here")),
text("'region-fast': It behaves the same as 'region' but has better performance to " +
"determine tasks to restart. This improvement would help if the job scale is large. " +
"The side effect is longer region building time and more memory for cache.")
).build());

/**
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.executiongraph.SchedulingUtils;
import org.apache.flink.runtime.executiongraph.failover.flip1.FastRestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
Expand Down Expand Up @@ -72,12 +73,21 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
/** The versioner helps to maintain execution vertex versions. */
private final ExecutionVertexVersioner executionVertexVersioner;

/** Whether to use the {@link FastRestartPipelinedRegionStrategy} instead of
* {@link RestartPipelinedRegionStrategy} as the underlying failover strategy. */
private final boolean fastMode;

/** The underlying new generation region failover strategy. */
private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;

public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) {
this(executionGraph, false);
}

public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph, final boolean fastMode) {
this.executionGraph = checkNotNull(executionGraph);
this.executionVertexVersioner = new ExecutionVertexVersioner();
this.fastMode = fastMode;
}

@Override
Expand Down Expand Up @@ -293,9 +303,16 @@ public void notifyNewVertices(final List<ExecutionJobVertex> newJobVerticesTopol
// otherwise the failover topology will not be correctly built.
// currently it's safe to add it here, as this method is invoked only once in production code.
checkState(restartPipelinedRegionStrategy == null, "notifyNewVertices() must be called only once");
this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(
executionGraph.getFailoverTopology(),
executionGraph.getResultPartitionAvailabilityChecker());

if (fastMode) {
this.restartPipelinedRegionStrategy = new FastRestartPipelinedRegionStrategy(
executionGraph.getFailoverTopology(),
executionGraph.getResultPartitionAvailabilityChecker());
} else {
this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(
executionGraph.getFailoverTopology(),
executionGraph.getResultPartitionAvailabilityChecker());
}
}

@Override
Expand All @@ -312,9 +329,15 @@ public String getStrategyName() {
*/
public static class Factory implements FailoverStrategy.Factory {

private final boolean fastMode;

Factory(final boolean fastMode) {
this.fastMode = fastMode;
}

@Override
public FailoverStrategy create(final ExecutionGraph executionGraph) {
return new AdaptedRestartPipelinedRegionStrategyNG(executionGraph);
return new AdaptedRestartPipelinedRegionStrategyNG(executionGraph, fastMode);
}
}
}
Expand Up @@ -41,6 +41,9 @@ public class FailoverStrategyLoader {
/** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG}. */
public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region";

/** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG} but use it in fast mode. */
public static final String FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME = "region-fast";

/** Config name for the {@link NoOpFailoverStrategy}. */
public static final String NO_OP_FAILOVER_STRATEGY = "noop";

Expand Down Expand Up @@ -70,7 +73,10 @@ public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config
return new RestartAllStrategy.Factory();

case PIPELINED_REGION_RESTART_STRATEGY_NAME:
return new AdaptedRestartPipelinedRegionStrategyNG.Factory();
return new AdaptedRestartPipelinedRegionStrategyNG.Factory(false);

case FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME:
return new AdaptedRestartPipelinedRegionStrategyNG.Factory(true);

case INDIVIDUAL_RESTART_STRATEGY_NAME:
return new RestartIndividualStrategy.Factory();
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
Expand Down Expand Up @@ -64,6 +65,10 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -83,6 +88,7 @@
/**
* Tests for region failover with multi regions.
*/
@RunWith(Parameterized.class)
public class RegionFailoverITCase extends TestLogger {

private static final int FAIL_BASE = 1000;
Expand All @@ -109,10 +115,21 @@ public class RegionFailoverITCase extends TestLogger {
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

@Parameter
public String failoverStrategyName;

@Parameters(name = "[{index}] failover strategy: {0}")
public static Object[] failoverStrategies() {
return new Object[] {
FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME,
FailoverStrategyLoader.FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME
};
}

@Before
public void setup() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, failoverStrategyName);
configuration.setString(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName());

// global failover times: 3, region failover times: NUM_OF_RESTARTS
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
Expand Down Expand Up @@ -64,6 +65,8 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -91,7 +94,6 @@
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

Expand Down Expand Up @@ -120,6 +122,7 @@
* lost results.
* </ul>
*/
@RunWith(Parameterized.class)
public class BatchFineGrainedRecoveryITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(BatchFineGrainedRecoveryITCase.class);

Expand Down Expand Up @@ -177,11 +180,22 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {

private static GlobalMapFailureTracker failureTracker;

@Parameterized.Parameter
public String failoverStrategyName;

@Parameterized.Parameters(name = "[{index}] failover strategy: {0}")
public static Object[] failoverStrategies() {
return new Object[] {
FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME,
FailoverStrategyLoader.FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME
};
}

@SuppressWarnings("OverlyBroadThrowsClause")
@Before
public void setup() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, PIPELINED_REGION_RESTART_STRATEGY_NAME);
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, failoverStrategyName);

miniCluster = new TestingMiniCluster(
new Builder()
Expand Down

0 comments on commit ef600d0

Please sign in to comment.