Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery #13880

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ public final class PipelinedRegionComputeUtil {
vertexToRegion.put(vertex, currentRegion);

for (R consumedResult : vertex.getConsumedResults()) {
if (consumedResult.getResultType().isPipelined()) {
// Similar to the BLOCKING ResultPartitionType, each vertex connected through PIPELINED_APPROXIMATE
// is also considered as a single region. This attribute is called "reconnectable".
// reconnectable will be removed after FLINK-19895, see also {@link ResultPartitionType#isReconnectable}
if (!consumedResult.getResultType().isReconnectable()) {
curcur marked this conversation as resolved.
Show resolved Hide resolved
final V producerVertex = consumedResult.getProducer();
final Set<V> producerRegion = vertexToRegion.get(producerVertex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ public void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPar
Preconditions.checkNotNull(producingTaskExecutorId);
Preconditions.checkNotNull(resultPartitionDeploymentDescriptor);

// only blocking partitions require explicit release call
if (!resultPartitionDeploymentDescriptor.getPartitionType().isBlocking()) {
// blocking and PIPELINED_APPROXIMATE partitions require explicit partition release calls
// reconnectable will be removed after FLINK-19895, see also {@link ResultPartitionType#isReconnectable}.
if (!resultPartitionDeploymentDescriptor.getPartitionType().isReconnectable()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public enum ResultPartitionType {
* {@link #PIPELINED} partitions), but only released through the scheduler, when it determines
* that the partition is no longer needed.
*/
BLOCKING(false, false, false, false),
BLOCKING(false, false, false, false, true),

/**
* BLOCKING_PERSISTENT partitions are similar to {@link #BLOCKING} partitions, but have
Expand All @@ -47,7 +47,7 @@ public enum ResultPartitionType {
* scenarios, like when the TaskManager exits or when the TaskManager looses connection
* to JobManager / ResourceManager for too long.
*/
BLOCKING_PERSISTENT(false, false, false, true),
BLOCKING_PERSISTENT(false, false, false, true, true),

/**
* A pipelined streaming data exchange. This is applicable to both bounded and unbounded streams.
Expand All @@ -58,7 +58,7 @@ public enum ResultPartitionType {
* <p>This result partition type may keep an arbitrary amount of data in-flight, in contrast to
* the {@link #PIPELINED_BOUNDED} variant.
*/
PIPELINED(true, true, false, false),
PIPELINED(true, true, false, false, false),

/**
* Pipelined partitions with a bounded (local) buffer pool.
Expand All @@ -71,17 +71,17 @@ public enum ResultPartitionType {
* <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
* no checkpoint barriers.
*/
PIPELINED_BOUNDED(true, true, true, false),
PIPELINED_BOUNDED(true, true, true, false, false),

/**
* Pipelined partitions with a bounded (local) buffer pool to support downstream task to
* continue consuming data after reconnection in Approximate Local-Recovery.
*
* <p>Pipelined results can be consumed only once by a single consumer at one time.
* {@link #PIPELINED_APPROXIMATE} is different from {@link #PIPELINED_BOUNDED} in that
* {@link #PIPELINED_APPROXIMATE} is not decomposed automatically after consumption.
* {@link #PIPELINED_APPROXIMATE} is different from {@link #PIPELINED} and {@link #PIPELINED_BOUNDED} in that
* {@link #PIPELINED_APPROXIMATE} partition can be reconnected after down stream task fails.
*/
PIPELINED_APPROXIMATE(true, true, true, true);
PIPELINED_APPROXIMATE(true, true, true, false, true);

/** Can the partition be consumed while being produced? */
private final boolean isPipelined;
Expand All @@ -95,14 +95,32 @@ public enum ResultPartitionType {
/** This partition will not be released after consuming if 'isPersistent' is true. */
private final boolean isPersistent;

/**
* Can the partition be reconnected.
*
* <p>Attention: this attribute is introduced temporally for ResultPartitionType.PIPELINED_APPROXIMATE
* It will be removed afterwards:
* TODO:
* 1. Approximate local recovery has its won failover strategy to restart the failed set of tasks instead of
* restarting downstream of failed tasks depending on {@code RestartPipelinedRegionFailoverStrategy}
* 2. FLINK-19895: Unify the life cycle of ResultPartitionType Pipelined Family
*/
private final boolean isReconnectable;

/**
* Specifies the behaviour of an intermediate result partition at runtime.
*/
ResultPartitionType(boolean isPipelined, boolean hasBackPressure, boolean isBounded, boolean isPersistent) {
ResultPartitionType(
boolean isPipelined,
boolean hasBackPressure,
boolean isBounded,
boolean isPersistent,
boolean isReconnectable) {
this.isPipelined = isPipelined;
this.hasBackPressure = hasBackPressure;
this.isBounded = isBounded;
this.isPersistent = isPersistent;
this.isReconnectable = isReconnectable;
}

public boolean hasBackPressure() {
Expand All @@ -117,6 +135,10 @@ public boolean isPipelined() {
return isPipelined;
}

public boolean isReconnectable() {
return isReconnectable;
}

/**
* Whether this partition uses a limited number of (network) buffers or not.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public class JobGraph implements Serializable {
/** The mode in which the job is scheduled. */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;

/** Whether approximate local recovery is enabled. This flag will be removed together with legacy scheduling strategies. */
private boolean approximateLocalRecovery = false;

// --- checkpointing ---

/** Job specific execution config. */
Expand Down Expand Up @@ -231,6 +234,14 @@ public ScheduleMode getScheduleMode() {
return scheduleMode;
}

public void enableApproximateLocalRecovery(boolean enabled) {
this.approximateLocalRecovery = enabled;
}

public boolean isApproximateLocalRecoveryEnabled() {
return approximateLocalRecovery;
}

/**
* Sets the savepoint restore settings.
* @param settings The savepoint restore settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@

import java.util.function.Consumer;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Components to create a {@link DefaultScheduler} which depends on the
* configured {@link JobManagerOptions#SCHEDULING_STRATEGY}.
Expand Down Expand Up @@ -82,19 +84,26 @@ ExecutionSlotAllocatorFactory getAllocatorFactory() {

static DefaultSchedulerComponents createSchedulerComponents(
final ScheduleMode scheduleMode,
final boolean isApproximateLocalRecoveryEnabled,
final Configuration jobMasterConfiguration,
final SlotPool slotPool,
final Time slotRequestTimeout) {

final String schedulingStrategy = jobMasterConfiguration.getString(JobManagerOptions.SCHEDULING_STRATEGY);
switch (schedulingStrategy) {
case PIPELINED_REGION_SCHEDULING:
checkArgument(
!isApproximateLocalRecoveryEnabled,
"Approximate local recovery can not be used together with PipelinedRegionScheduler for now! " +
"Please set JobManagerOptions.SCHEDULING_STRATEGY to legacy.");
return createPipelinedRegionSchedulerComponents(
scheduleMode,
jobMasterConfiguration,
slotPool,
slotRequestTimeout);
case LEGACY_SCHEDULING:
checkArgument(!isApproximateLocalRecoveryEnabled || !scheduleMode.allowLazyDeployment(),
"Approximate local recovery can only be used together with EAGER schedule mode!");
return createLegacySchedulerComponents(
scheduleMode,
jobMasterConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public SchedulerNG createInstance(

final DefaultSchedulerComponents schedulerComponents = createSchedulerComponents(
jobGraph.getScheduleMode(),
jobGraph.isApproximateLocalRecoveryEnabled(),
jobMasterConfiguration,
slotPool,
slotRequestTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,31 @@ public void testCyclicDependentRegionsAreMerged() {
assertSameRegion(r1, r2, r3, r4);
}

@Test
public void testPipelinedApproximateDifferentRegions() {
TestingSchedulingTopology topology = new TestingSchedulingTopology();

TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex();
TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex();
TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex();
TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex();

topology
.connect(v1, v2, ResultPartitionType.PIPELINED_APPROXIMATE)
.connect(v1, v3, ResultPartitionType.PIPELINED_APPROXIMATE)
.connect(v2, v4, ResultPartitionType.PIPELINED_APPROXIMATE)
.connect(v3, v4, ResultPartitionType.PIPELINED_APPROXIMATE);

Map<ExecutionVertexID, Set<SchedulingExecutionVertex>> pipelinedRegionByVertex = computePipelinedRegionByVertex(topology);

Set<SchedulingExecutionVertex> r1 = pipelinedRegionByVertex.get(v1.getId());
Set<SchedulingExecutionVertex> r2 = pipelinedRegionByVertex.get(v2.getId());
Set<SchedulingExecutionVertex> r3 = pipelinedRegionByVertex.get(v3.getId());
Set<SchedulingExecutionVertex> r4 = pipelinedRegionByVertex.get(v4.getId());

assertDistinctRegions(r1, r2, r3, r4);
}

// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void testRegionFailoverForVariousResultPartitionAvailabilityCombinations(
IntermediateResultPartitionID rp2ID = v2.getProducedResults().iterator().next().getId();

// -------------------------------------------------
// Combination1: (rp1 == available, rp == available)
// Combination1: (rp1 == available, rp2 == available)
// -------------------------------------------------
availabilityChecker.failedPartitions.clear();

Expand All @@ -191,7 +191,7 @@ public void testRegionFailoverForVariousResultPartitionAvailabilityCombinations(
verifyThatFailedExecution(strategy, v3).restarts(v3);

// -------------------------------------------------
// Combination2: (rp1 == unavailable, rp == available)
// Combination2: (rp1 == unavailable, rp2 == available)
// -------------------------------------------------
availabilityChecker.failedPartitions.clear();
availabilityChecker.markResultPartitionFailed(rp1ID);
Expand All @@ -201,7 +201,7 @@ public void testRegionFailoverForVariousResultPartitionAvailabilityCombinations(
verifyThatFailedExecution(strategy, v3).restarts(v1, v3);

// -------------------------------------------------
// Combination3: (rp1 == available, rp == unavailable)
// Combination3: (rp1 == available, rp2 == unavailable)
// -------------------------------------------------
availabilityChecker.failedPartitions.clear();
availabilityChecker.markResultPartitionFailed(rp2ID);
Expand Down Expand Up @@ -269,7 +269,7 @@ public void testRegionFailoverForMultipleVerticesRegions() throws Exception {
* |
* (blocking)
* </pre>
* Component 1: 1,2; component 2: 3,4; component 3: 5,6
* Component 1: 1; component 2: 2
*/
@Test
public void testRegionFailoverDoesNotRestartCreatedExecutions() {
Expand All @@ -287,6 +287,36 @@ public void testRegionFailoverDoesNotRestartCreatedExecutions() {
verifyThatFailedExecution(strategy, v2).partitionConnectionCause(v1out).restarts();
}

/**
* Tests approximate local recovery downstream failover .
* <pre>
* (v1) -----> (v2) -----> (v4)
* | ^
* |--------> (v3) --------|
* </pre>
*/
@Test
public void testRegionFailoverForPipelinedApproximate() {
final TestingSchedulingTopology topology = new TestingSchedulingTopology();

TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex(ExecutionState.RUNNING);
TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex(ExecutionState.RUNNING);
TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex(ExecutionState.RUNNING);
TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex(ExecutionState.RUNNING);

topology.connect(v1, v2, ResultPartitionType.PIPELINED_APPROXIMATE);
topology.connect(v1, v3, ResultPartitionType.PIPELINED_APPROXIMATE);
topology.connect(v2, v4, ResultPartitionType.PIPELINED_APPROXIMATE);
topology.connect(v3, v4, ResultPartitionType.PIPELINED_APPROXIMATE);

RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy(topology);

verifyThatFailedExecution(strategy, v1).restarts(v1, v2, v3, v4);
verifyThatFailedExecution(strategy, v2).restarts(v2, v4);
verifyThatFailedExecution(strategy, v3).restarts(v3, v4);
verifyThatFailedExecution(strategy, v4).restarts(v4);
}

private static VerificationContext verifyThatFailedExecution(
FailoverStrategy strategy,
SchedulingExecutionVertex executionVertex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public void testBlockingPartitionIsTracked() {
testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
}

@Test
public void testPipelinedApproximatePartitionIsTracked() {
testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED_APPROXIMATE);
}

private static void testReleaseOnConsumptionHandling(ResultPartitionType resultPartitionType) {
final JobMasterPartitionTracker partitionTracker = new JobMasterPartitionTrackerImpl(
new JobID(),
Expand All @@ -76,7 +81,7 @@ private static void testReleaseOnConsumptionHandling(ResultPartitionType resultP
resultPartitionType,
false));

assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(resultPartitionType.isBlocking()));
assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(resultPartitionType.isReconnectable()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import static org.apache.flink.runtime.jobgraph.ScheduleMode.EAGER;
import static org.apache.flink.runtime.jobgraph.ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* Tests for the factory method {@link DefaultSchedulerComponents#createSchedulerComponents(
Expand Down Expand Up @@ -67,9 +72,56 @@ public void testCreatingPipelinedRegionSchedulingStrategyFactoryByDefault() {
assertThat(components.getSchedulingStrategyFactory(), instanceOf(PipelinedRegionSchedulingStrategy.Factory.class));
}

@Test
public void testCreatingPipelinedRegionSchedulingStrategyFactoryWithApproximateLocalRecovery() {
final Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "region");

try {
createSchedulerComponents(configuration, true, EAGER);
fail("expected failure");
} catch (IllegalArgumentException e) {
assertTrue(
e.getMessage()
.contains("Approximate local recovery can not be used together with PipelinedRegionScheduler for now"));
Comment on lines +84 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could use FlinkMatchers.containsMessage

}
}

@Test
public void testCreatingLegacySchedulingStrategyFactoryWithApproximateLocalRecoveryInLazyMode() {
final Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy");

try {
createSchedulerComponents(configuration, true, LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
fail("expected failure");
} catch (IllegalArgumentException e) {
assertTrue(
e.getMessage()
.contains("Approximate local recovery can only be used together with EAGER schedule mode"));
Comment on lines +99 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

}
}

@Test
public void testCreatingLegacySchedulingStrategyFactoryWithApproximateLocalRecoveryInEagerMode() {
final Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy");

final DefaultSchedulerComponents components = createSchedulerComponents(configuration, true, EAGER);
assertThat(components.getSchedulingStrategyFactory(), instanceOf(EagerSchedulingStrategy.Factory.class));
}

private static DefaultSchedulerComponents createSchedulerComponents(final Configuration configuration) {
return createSchedulerComponents(configuration, false, LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
}

private static DefaultSchedulerComponents createSchedulerComponents(
final Configuration configuration,
boolean iApproximateLocalRecoveryEnabled,
ScheduleMode scheduleMode) {
return DefaultSchedulerComponents.createSchedulerComponents(
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
scheduleMode,
iApproximateLocalRecoveryEnabled,
configuration,
new TestingSlotPoolImpl(new JobID()),
Time.milliseconds(10L));
Expand Down