Skip to content

Commit

Permalink
[FLINK-13056][runtime] Enabled fast region failover strategy for lega…
Browse files Browse the repository at this point in the history
…cy scheduler
  • Loading branch information
zhuzhurk committed Oct 29, 2019
1 parent 176de01 commit bea922d
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docs/_includes/generated/job_manager_configuration.html
Expand Up @@ -20,7 +20,7 @@
<tr>
<td><h5>jobmanager.execution.failover-strategy</h5></td>
<td style="word-wrap: break-word;">"full"</td>
<td>This option specifies how the job computation recovers from task failures. Accepted values are:<ul><li>'full': Restarts all tasks to recover the job.</li><li>'region': Restarts all tasks that could be affected by the task failure. More details can be found <a href="../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy">here</a>.</li></ul></td>
<td>This option specifies how the job computation recovers from task failures. Accepted values are:<ul><li>'full': Restarts all tasks to recover the job.</li><li>'region': Restarts all tasks that could be affected by the task failure. More details can be found <a href="../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy">here</a>.</li><li>'region-fast': It behaves the same as 'region' but has better performance to determine tasks to restart. This improvement would help if the job scale is large. The side effect is longer region building time and more memory for cache.</li></ul></td>
</tr>
<tr>
<td><h5>jobmanager.heap.size</h5></td>
Expand Down
Expand Up @@ -124,7 +124,10 @@ public class JobManagerOptions {
"More details can be found %s.",
link(
"../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy",
"here"))
"here")),
text("'region-fast': It behaves the same as 'region' but has better performance to " +
"determine tasks to restart. This improvement would help if the job scale is large. " +
"The side effect is longer region building time and more memory for cache.")
).build());

/**
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.executiongraph.SchedulingUtils;
import org.apache.flink.runtime.executiongraph.failover.flip1.FastRestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
Expand Down Expand Up @@ -72,12 +73,21 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
/** The versioner helps to maintain execution vertex versions. */
private final ExecutionVertexVersioner executionVertexVersioner;

/** Whether to use the {@link FastRestartPipelinedRegionStrategy} instead of
* {@link RestartPipelinedRegionStrategy} as the underlying failover strategy. */
private final boolean fastMode;

/** The underlying new generation region failover strategy. */
private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;

public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) {
this(executionGraph, false);
}

public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph, final boolean fastMode) {
this.executionGraph = checkNotNull(executionGraph);
this.executionVertexVersioner = new ExecutionVertexVersioner();
this.fastMode = fastMode;
}

@Override
Expand Down Expand Up @@ -293,9 +303,16 @@ public void notifyNewVertices(final List<ExecutionJobVertex> newJobVerticesTopol
// otherwise the failover topology will not be correctly built.
// currently it's safe to add it here, as this method is invoked only once in production code.
checkState(restartPipelinedRegionStrategy == null, "notifyNewVertices() must be called only once");
this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(
executionGraph.getFailoverTopology(),
executionGraph.getResultPartitionAvailabilityChecker());

if (fastMode) {
this.restartPipelinedRegionStrategy = new FastRestartPipelinedRegionStrategy(
executionGraph.getFailoverTopology(),
executionGraph.getResultPartitionAvailabilityChecker());
} else {
this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(
executionGraph.getFailoverTopology(),
executionGraph.getResultPartitionAvailabilityChecker());
}
}

@Override
Expand All @@ -312,9 +329,15 @@ public String getStrategyName() {
*/
public static class Factory implements FailoverStrategy.Factory {

private final boolean fastMode;

Factory(final boolean fastMode) {
this.fastMode = fastMode;
}

@Override
public FailoverStrategy create(final ExecutionGraph executionGraph) {
return new AdaptedRestartPipelinedRegionStrategyNG(executionGraph);
return new AdaptedRestartPipelinedRegionStrategyNG(executionGraph, fastMode);
}
}
}
Expand Up @@ -41,6 +41,9 @@ public class FailoverStrategyLoader {
/** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG}. */
public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region";

/** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG} but use it in fast mode. */
public static final String FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME = "region-fast";

/** Config name for the {@link NoOpFailoverStrategy}. */
public static final String NO_OP_FAILOVER_STRATEGY = "noop";

Expand Down Expand Up @@ -70,7 +73,10 @@ public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config
return new RestartAllStrategy.Factory();

case PIPELINED_REGION_RESTART_STRATEGY_NAME:
return new AdaptedRestartPipelinedRegionStrategyNG.Factory();
return new AdaptedRestartPipelinedRegionStrategyNG.Factory(false);

case FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME:
return new AdaptedRestartPipelinedRegionStrategyNG.Factory(true);

case INDIVIDUAL_RESTART_STRATEGY_NAME:
return new RestartIndividualStrategy.Factory();
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
Expand Down Expand Up @@ -64,6 +65,10 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -83,6 +88,7 @@
/**
* Tests for region failover with multi regions.
*/
@RunWith(Parameterized.class)
public class RegionFailoverITCase extends TestLogger {

private static final int FAIL_BASE = 1000;
Expand All @@ -109,10 +115,21 @@ public class RegionFailoverITCase extends TestLogger {
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

@Parameter
public String failoverStrategyName;

@Parameters(name = "[{index}] failover strategy: {0}")
public static Object[] failoverStrategies() {
return new Object[] {
FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME,
FailoverStrategyLoader.FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME
};
}

@Before
public void setup() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, failoverStrategyName);
configuration.setString(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName());

// global failover times: 3, region failover times: NUM_OF_RESTARTS
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
Expand Down Expand Up @@ -64,6 +65,8 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -91,7 +94,6 @@
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

Expand Down Expand Up @@ -120,6 +122,7 @@
* lost results.
* </ul>
*/
@RunWith(Parameterized.class)
public class BatchFineGrainedRecoveryITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(BatchFineGrainedRecoveryITCase.class);

Expand Down Expand Up @@ -177,11 +180,22 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {

private static GlobalMapFailureTracker failureTracker;

@Parameterized.Parameter
public String failoverStrategyName;

@Parameterized.Parameters(name = "[{index}] failover strategy: {0}")
public static Object[] failoverStrategies() {
return new Object[] {
FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME,
FailoverStrategyLoader.FAST_PIPELINED_REGION_RESTART_STRATEGY_NAME
};
}

@SuppressWarnings("OverlyBroadThrowsClause")
@Before
public void setup() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, PIPELINED_REGION_RESTART_STRATEGY_NAME);
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, failoverStrategyName);

miniCluster = new TestingMiniCluster(
new Builder()
Expand Down

0 comments on commit bea922d

Please sign in to comment.