Skip to content

Commit

Permalink
[FLINK-15307][failover]Rename Subclasses of FailoverStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
chendonglin committed Jan 14, 2020
1 parent 6f5f23b commit c2f9391
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 54 deletions.
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.executiongraph.SchedulingUtils;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand Down Expand Up @@ -60,7 +60,7 @@
import static org.apache.flink.util.Preconditions.checkState;

/**
* This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions.
* This failover strategy uses flip1.RestartPipelinedRegionFailoverStrategy to make task failover decisions.
*/
public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {

Expand All @@ -73,7 +73,7 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
private final ExecutionVertexVersioner executionVertexVersioner;

/** The underlying new generation region failover strategy. */
private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
private RestartPipelinedRegionFailoverStrategy restartPipelinedRegionFailoverStrategy;

public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) {
this.executionGraph = checkNotNull(executionGraph);
Expand All @@ -96,7 +96,7 @@ public void onTaskFailure(final Execution taskExecution, final Throwable cause)

final ExecutionVertexID vertexID = getExecutionVertexID(taskExecution.getVertex());

final Set<ExecutionVertexID> tasksToRestart = restartPipelinedRegionStrategy.getTasksNeedingRestart(vertexID, cause);
final Set<ExecutionVertexID> tasksToRestart = restartPipelinedRegionFailoverStrategy.getTasksNeedingRestart(vertexID, cause);
restartTasks(tasksToRestart);
}

Expand Down Expand Up @@ -292,8 +292,8 @@ public void notifyNewVertices(final List<ExecutionJobVertex> newJobVerticesTopol
// build the underlying new generation failover strategy when the executionGraph vertices are all added,
// otherwise the failover topology will not be correctly built.
// currently it's safe to add it here, as this method is invoked only once in production code.
checkState(restartPipelinedRegionStrategy == null, "notifyNewVertices() must be called only once");
this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(
checkState(restartPipelinedRegionFailoverStrategy == null, "notifyNewVertices() must be called only once");
this.restartPipelinedRegionFailoverStrategy = new RestartPipelinedRegionFailoverStrategy(
executionGraph.getFailoverTopology(),
executionGraph.getResultPartitionAvailabilityChecker());
}
Expand Down
Expand Up @@ -29,10 +29,10 @@
*/
public final class FailoverStrategyFactoryLoader {

/** Config name for the {@link RestartAllStrategy}. */
/** Config name for the {@link RestartAllFailoverStrategy}. */
public static final String FULL_RESTART_STRATEGY_NAME = "full";

/** Config name for the {@link RestartPipelinedRegionStrategy}. */
/** Config name for the {@link RestartPipelinedRegionFailoverStrategy}. */
public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region";

private FailoverStrategyFactoryLoader() {
Expand All @@ -57,10 +57,10 @@ public static FailoverStrategy.Factory loadFailoverStrategyFactory(final Configu

switch (strategyParam.toLowerCase()) {
case FULL_RESTART_STRATEGY_NAME:
return new RestartAllStrategy.Factory();
return new RestartAllFailoverStrategy.Factory();

case PIPELINED_REGION_RESTART_STRATEGY_NAME:
return new RestartPipelinedRegionStrategy.Factory();
return new RestartPipelinedRegionFailoverStrategy.Factory();

default:
throw new IllegalConfigurationException("Unknown failover strategy: " + strategyParam);
Expand Down
Expand Up @@ -28,11 +28,11 @@
/**
* A failover strategy that proposes to restart all vertices when a vertex fails.
*/
public class RestartAllStrategy implements FailoverStrategy {
public class RestartAllFailoverStrategy implements FailoverStrategy {

private final FailoverTopology<?, ?> topology;

public RestartAllStrategy(final FailoverTopology<?, ?> topology) {
public RestartAllFailoverStrategy(final FailoverTopology<?, ?> topology) {
this.topology = checkNotNull(topology);
}

Expand All @@ -51,7 +51,7 @@ public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID execution
}

/**
* The factory to instantiate {@link RestartAllStrategy}.
* The factory to instantiate {@link RestartAllFailoverStrategy}.
*/
public static class Factory implements FailoverStrategy.Factory {

Expand All @@ -60,7 +60,7 @@ public FailoverStrategy create(
final FailoverTopology<?, ?> topology,
final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) {

return new RestartAllStrategy(topology);
return new RestartAllFailoverStrategy(topology);
}
}
}
Expand Up @@ -42,10 +42,10 @@
* A failover strategy that proposes to restart involved regions when a vertex fails.
* A region is defined by this strategy as tasks that communicate via pipelined data exchange.
*/
public class RestartPipelinedRegionStrategy implements FailoverStrategy {
public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy {

/** The log object used for debugging. */
private static final Logger LOG = LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class);
private static final Logger LOG = LoggerFactory.getLogger(RestartPipelinedRegionFailoverStrategy.class);

/** The topology containing info about all the vertices and result partitions. */
private final FailoverTopology<?, ?> topology;
Expand All @@ -66,7 +66,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
* @param topology containing info about all the vertices and result partitions
*/
@VisibleForTesting
public RestartPipelinedRegionStrategy(FailoverTopology<?, ?> topology) {
public RestartPipelinedRegionFailoverStrategy(FailoverTopology<?, ?> topology) {
this(topology, resultPartitionID -> true);
}

Expand All @@ -76,7 +76,7 @@ public RestartPipelinedRegionStrategy(FailoverTopology<?, ?> topology) {
* @param topology containing info about all the vertices and result partitions
* @param resultPartitionAvailabilityChecker helps to query result partition availability
*/
public RestartPipelinedRegionStrategy(
public RestartPipelinedRegionFailoverStrategy(
FailoverTopology<?, ?> topology,
ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) {

Expand Down Expand Up @@ -262,7 +262,7 @@ public void removeResultPartitionFromFailedState(IntermediateResultPartitionID r
}

/**
* The factory to instantiate {@link RestartPipelinedRegionStrategy}.
* The factory to instantiate {@link RestartPipelinedRegionFailoverStrategy}.
*/
public static class Factory implements FailoverStrategy.Factory {

Expand All @@ -271,7 +271,7 @@ public FailoverStrategy create(
final FailoverTopology<?, ?> topology,
final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) {

return new RestartPipelinedRegionStrategy(topology, resultPartitionAvailabilityChecker);
return new RestartPipelinedRegionFailoverStrategy(topology, resultPartitionAvailabilityChecker);
}
}
}
Expand Up @@ -59,7 +59,7 @@
/**
* Tests for {@link AdaptedRestartPipelinedRegionStrategyNG}.
*/
public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest extends TestLogger {
public class AdaptedRestartPipelinedRegionFailoverStrategyNGAbortPendingCheckpointsTest extends TestLogger {

private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;

Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG;
import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionFailoverStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG;
import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
Expand All @@ -51,7 +51,7 @@
* Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen.
* There can be local+local and local+global concurrent failovers.
*/
public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger {
public class AdaptedRestartPipelinedRegionFailoverStrategyNGConcurrentFailoverTest extends TestLogger {

private static final JobID TEST_JOB_ID = new JobID();

Expand Down
Expand Up @@ -63,7 +63,7 @@
/**
* Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
*/
public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger {
public class AdaptedRestartPipelinedRegionFailoverStrategyNGFailoverTest extends TestLogger {

private static final JobID TEST_JOB_ID = new JobID();

Expand Down
Expand Up @@ -41,7 +41,7 @@ public void testLoadRestartAllStrategyFactory() {
FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME);
assertThat(
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(config),
instanceOf(RestartAllStrategy.Factory.class));
instanceOf(RestartAllFailoverStrategy.Factory.class));
}

@Test
Expand All @@ -52,15 +52,15 @@ public void testLoadRestartPipelinedRegionStrategyFactory() {
FailoverStrategyFactoryLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME);
assertThat(
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(config),
instanceOf(RestartPipelinedRegionStrategy.Factory.class));
instanceOf(RestartPipelinedRegionFailoverStrategy.Factory.class));
}

@Test
public void testDefaultFailoverStrategyIsRegion() {
final Configuration config = new Configuration();
assertThat(
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(config),
instanceOf(RestartPipelinedRegionStrategy.Factory.class));
instanceOf(RestartPipelinedRegionFailoverStrategy.Factory.class));
}

@Test(expected = IllegalConfigurationException.class)
Expand Down
Expand Up @@ -29,9 +29,9 @@
import static org.junit.Assert.assertEquals;

/**
* Tests for {@link RestartAllStrategy}.
* Tests for {@link RestartAllFailoverStrategy}.
*/
public class RestartAllStrategyTest extends TestLogger {
public class RestartAllFailoverStrategyTest extends TestLogger {

@Test
public void testGetTasksNeedingRestart() {
Expand All @@ -46,7 +46,7 @@ public void testGetTasksNeedingRestart() {

final TestFailoverTopology topology = topologyBuilder.build();

final RestartAllStrategy strategy = new RestartAllStrategy(topology);
final RestartAllFailoverStrategy strategy = new RestartAllFailoverStrategy(topology);

assertEquals(
new HashSet<>(Arrays.asList(v1.getId(), v2.getId(), v3.getId())),
Expand Down
Expand Up @@ -28,9 +28,9 @@
import static org.junit.Assert.assertSame;

/**
* Tests the failover region building logic of the {@link RestartPipelinedRegionStrategy}.
* Tests the failover region building logic of the {@link RestartPipelinedRegionFailoverStrategy}.
*/
public class RestartPipelinedRegionStrategyBuildingTest extends TestLogger {
public class RestartPipelinedRegionFailoverStrategyBuildingTest extends TestLogger {

/**
* Tests that validates that a graph with single unconnected vertices works correctly.
Expand All @@ -53,7 +53,7 @@ public void testIndividualVertices() throws Exception {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

FailoverRegion r1 = strategy.getFailoverRegion(v1.getId());
FailoverRegion r2 = strategy.getFailoverRegion(v2.getId());
Expand Down Expand Up @@ -91,7 +91,7 @@ public void testEmbarrassinglyParallelCase() throws Exception {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId());
FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId());
Expand Down Expand Up @@ -140,7 +140,7 @@ public void testOneComponentViaTwoExchanges() throws Exception {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId());
FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId());
Expand Down Expand Up @@ -188,7 +188,7 @@ public void testOneComponentViaCascadeOfJoins() throws Exception {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

FailoverRegion r1 = strategy.getFailoverRegion(v1.getId());
FailoverRegion r2 = strategy.getFailoverRegion(v2.getId());
Expand Down Expand Up @@ -237,7 +237,7 @@ public void testOneComponentInstanceFromOneSource() throws Exception {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

FailoverRegion r1 = strategy.getFailoverRegion(v1.getId());
FailoverRegion r2 = strategy.getFailoverRegion(v2.getId());
Expand Down Expand Up @@ -283,7 +283,7 @@ public void testTwoComponentsViaBlockingExchange() throws Exception {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId());
FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId());
Expand Down Expand Up @@ -332,7 +332,7 @@ public void testTwoComponentsViaBlockingExchange2() throws Exception {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId());
FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId());
Expand Down Expand Up @@ -386,7 +386,7 @@ public void testMultipleComponentsViaCascadeOfJoins() throws Exception {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

FailoverRegion r1 = strategy.getFailoverRegion(v1.getId());
FailoverRegion r2 = strategy.getFailoverRegion(v2.getId());
Expand Down Expand Up @@ -432,7 +432,7 @@ public void testDiamondWithMixedPipelinedAndBlockingExchanges() throws Exception

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

FailoverRegion r1 = strategy.getFailoverRegion(v1.getId());
FailoverRegion r2 = strategy.getFailoverRegion(v2.getId());
Expand Down Expand Up @@ -474,7 +474,7 @@ public void testBlockingAllToAllTopologyWithCoLocation() throws Exception {
topologyBuilder.setContainsCoLocationConstraints(true);
TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId());
FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId());
Expand Down Expand Up @@ -510,7 +510,7 @@ public void testPipelinedOneToOneTopologyWithCoLocation() throws Exception {
topologyBuilder.setContainsCoLocationConstraints(true);
TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

FailoverRegion ra1 = strategy.getFailoverRegion(va1.getId());
FailoverRegion ra2 = strategy.getFailoverRegion(va2.getId());
Expand Down
Expand Up @@ -37,9 +37,9 @@
import static org.junit.Assert.assertEquals;

/**
* Tests the failure handling logic of the {@link RestartPipelinedRegionStrategy}.
* Tests the failure handling logic of the {@link RestartPipelinedRegionFailoverStrategy}.
*/
public class RestartPipelinedRegionStrategyTest extends TestLogger {
public class RestartPipelinedRegionFailoverStrategyTest extends TestLogger {

/**
* Tests for scenes that a task fails for its own error, in which case the
Expand Down Expand Up @@ -76,7 +76,7 @@ public void testRegionFailoverForRegionInternalErrors() throws Exception {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

// when v1 fails, {v1,v4,v5} should be restarted
HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
Expand Down Expand Up @@ -156,7 +156,7 @@ public void testRegionFailoverForDataConsumptionErrors() throws Exception {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

// when v4 fails to consume data from v1, {v1,v4,v5} should be restarted
HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testRegionFailoverForVariousResultPartitionAvailabilityCombinations(
TestFailoverTopology topology = topologyBuilder.build();

TestResultPartitionAvailabilityChecker availabilityChecker = new TestResultPartitionAvailabilityChecker();
RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology, availabilityChecker);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology, availabilityChecker);

IntermediateResultPartitionID rp1ID = v1.getProducedResults().iterator().next().getId();
IntermediateResultPartitionID rp2ID = v2.getProducedResults().iterator().next().getId();
Expand Down Expand Up @@ -372,7 +372,7 @@ public void testRegionFailoverForMultipleVerticesRegions() throws Exception {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

// when v3 fails due to internal error, {v3,v4,v5,v6} should be restarted
HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
Expand Down

0 comments on commit c2f9391

Please sign in to comment.