Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery #13880

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public class JobGraph implements Serializable {
/** The mode in which the job is scheduled. */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;

/** Whether approximate local recovery is enabled. This flag will be removed together with legacy scheduling strategies. */
private boolean approximateLocalRecovery = false;

// --- checkpointing ---

/** Job specific execution config. */
Expand Down Expand Up @@ -231,6 +234,14 @@ public ScheduleMode getScheduleMode() {
return scheduleMode;
}

public void enableApproximateLocalRecovery(boolean enabled) {
this.approximateLocalRecovery = enabled;
}

public boolean isApproximateLocalRecoveryEnabled() {
return approximateLocalRecovery;
}

/**
* Sets the savepoint restore settings.
* @param settings The savepoint restore settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@

import java.util.function.Consumer;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Components to create a {@link DefaultScheduler} which depends on the
* configured {@link JobManagerOptions#SCHEDULING_STRATEGY}.
Expand Down Expand Up @@ -82,19 +84,26 @@ ExecutionSlotAllocatorFactory getAllocatorFactory() {

static DefaultSchedulerComponents createSchedulerComponents(
final ScheduleMode scheduleMode,
final boolean isApproximateLocalRecoveryEnabled,
final Configuration jobMasterConfiguration,
final SlotPool slotPool,
final Time slotRequestTimeout) {

final String schedulingStrategy = jobMasterConfiguration.getString(JobManagerOptions.SCHEDULING_STRATEGY);
switch (schedulingStrategy) {
case PIPELINED_REGION_SCHEDULING:
checkArgument(
!isApproximateLocalRecoveryEnabled,
"Approximate local recovery can not be used together with PipelinedRegionScheduler for now! " +
"Please set JobManagerOptions.SCHEDULING_STRATEGY to legacy.");
return createPipelinedRegionSchedulerComponents(
scheduleMode,
jobMasterConfiguration,
slotPool,
slotRequestTimeout);
case LEGACY_SCHEDULING:
checkArgument(!isApproximateLocalRecoveryEnabled || !scheduleMode.allowLazyDeployment(),
"Approximate local recovery can only be used together with EAGER schedule mode!");
return createLegacySchedulerComponents(
scheduleMode,
jobMasterConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public SchedulerNG createInstance(

final DefaultSchedulerComponents schedulerComponents = createSchedulerComponents(
jobGraph.getScheduleMode(),
jobGraph.isApproximateLocalRecoveryEnabled(),
jobMasterConfiguration,
slotPool,
slotRequestTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import static org.apache.flink.runtime.jobgraph.ScheduleMode.EAGER;
import static org.apache.flink.runtime.jobgraph.ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* Tests for the factory method {@link DefaultSchedulerComponents#createSchedulerComponents(
Expand Down Expand Up @@ -67,9 +72,56 @@ public void testCreatingPipelinedRegionSchedulingStrategyFactoryByDefault() {
assertThat(components.getSchedulingStrategyFactory(), instanceOf(PipelinedRegionSchedulingStrategy.Factory.class));
}

@Test
public void testCreatingPipelinedRegionSchedulingStrategyFactoryWithApproximateLocalRecovery() {
final Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "region");

try {
createSchedulerComponents(configuration, true, EAGER);
fail("expected failure");
} catch (IllegalArgumentException e) {
assertTrue(
e.getMessage()
.contains("Approximate local recovery can not be used together with PipelinedRegionScheduler for now"));
Comment on lines +84 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could use FlinkMatchers.containsMessage

}
}

@Test
public void testCreatingLegacySchedulingStrategyFactoryWithApproximateLocalRecoveryInLazyMode() {
final Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy");

try {
createSchedulerComponents(configuration, true, LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
fail("expected failure");
} catch (IllegalArgumentException e) {
assertTrue(
e.getMessage()
.contains("Approximate local recovery can only be used together with EAGER schedule mode"));
Comment on lines +99 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

}
}

@Test
public void testCreatingLegacySchedulingStrategyFactoryWithApproximateLocalRecoveryInEagerMode() {
final Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy");

final DefaultSchedulerComponents components = createSchedulerComponents(configuration, true, EAGER);
assertThat(components.getSchedulingStrategyFactory(), instanceOf(EagerSchedulingStrategy.Factory.class));
}

private static DefaultSchedulerComponents createSchedulerComponents(final Configuration configuration) {
return createSchedulerComponents(configuration, false, LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
}

private static DefaultSchedulerComponents createSchedulerComponents(
final Configuration configuration,
boolean iApproximateLocalRecoveryEnabled,
ScheduleMode scheduleMode) {
return DefaultSchedulerComponents.createSchedulerComponents(
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
scheduleMode,
iApproximateLocalRecoveryEnabled,
configuration,
new TestingSlotPoolImpl(new JobID()),
Time.milliseconds(10L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.environment;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobStatus;
Expand Down Expand Up @@ -84,6 +85,9 @@ public class CheckpointConfig implements java.io.Serializable {

private long alignmentTimeout = ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT.defaultValue().toMillis();

/** Flag to enable approximate local recovery. */
private boolean approximateLocalRecovery;

/** Cleanup behaviour for persistent checkpoints. */
private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;

Expand Down Expand Up @@ -122,6 +126,7 @@ public CheckpointConfig(final CheckpointConfig checkpointConfig) {
this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
this.alignmentTimeout = checkpointConfig.alignmentTimeout;
this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
this.forceCheckpointing = checkpointConfig.forceCheckpointing;
this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
Expand Down Expand Up @@ -504,6 +509,35 @@ public long getAlignmentTimeout() {
return alignmentTimeout;
}

/**
* Returns whether approximate local recovery is enabled.
*
* @return <code>true</code> if approximate local recovery is enabled.
*/
@Experimental
public boolean isApproximateLocalRecoveryEnabled() {
return approximateLocalRecovery;
}

/**
* Enables the approximate local recovery mode.
*
* <p>In this recovery mode, when a task fails, the entire downstream of the tasks (including the failed task) restart.
*
* <p>Notice that
* 1. Approximate recovery may lead to data loss. The amount of data which leads the failed task
* from the state of the last completed checkpoint to the state when the task fails is lost.
* 2. In the next version, we will support restarting the set of failed set of tasks only.
* In this version, we only support downstream restarts when a task fails.
* 3. It is only an internal feature for now.
*
* @param enabled Flag to indicate whether approximate local recovery is enabled .
*/
@Experimental
public void enableApproximateLocalRecovery(boolean enabled) {
approximateLocalRecovery = enabled;
}

/**
* Returns the cleanup behaviour for externalized checkpoints.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,8 @@ public enum GlobalDataExchangeMode {
POINTWISE_EDGES_PIPELINED,

/** Set all job edges {@link ResultPartitionType#PIPELINED_BOUNDED}. */
ALL_EDGES_PIPELINED
ALL_EDGES_PIPELINED,

/** Set all job edges {@link ResultPartitionType#PIPELINED_APPROXIMATE}. */
ALL_EDGES_PIPELINED_APPROXIMATE
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,23 @@ private void configureStreamGraph(final StreamGraph graph) {
setBatchStateBackendAndTimerService(graph);
} else {
graph.setStateBackend(stateBackend);
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
graph.setScheduleMode(ScheduleMode.EAGER);

if (checkpointConfig.isApproximateLocalRecoveryEnabled()) {
checkApproximateLocalRecoveryCompatibility();
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
} else {
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
}
}
}

private void checkApproximateLocalRecoveryCompatibility() {
checkState(
!checkpointConfig.isUnalignedCheckpointsEnabled(),
"Approximate Local Recovery and Unaligned Checkpoint can not be used together yet");
Comment on lines +302 to +304
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do a similar check for the scheduler configuration? I think we should fail hard if we see that we are not using the legacy scheduler. Moreover, an explanatory exception message would be helpful as well.

Copy link
Contributor Author

@curcur curcur Nov 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried out different ways to do the check. It is slightly different from the unaligned checkpoint check, because the PipelinedRegionScheduler is configured in the JobMaster level, and approximate local recovery is configured at the job level. (BTW, I can kind of guessing the reason why it is a job-master level config, but still curious why because I probably would have the same question later when I am constructing a separate failover strategy. We can probably discuss this later).

The check should happen in the same place where either PIPELINED_REGION_SCHEDULING or LEGACY_SCHEDULING is chosen, where the type SchedulingStrategyFactory is decided.

  1. In JobGraph, the most reasonable place to put the config "isApproximateLocalRecoveryEnabled" seems to be JobCheckpointingSettings#CheckpointCoordinatorConfiguration, similar to unaligned checkpoint's config. However CheckpointCoordinatorConfiguration as its name, is for CheckpointCoordinator and will be serialized to CheckpointCoordinator. But in fact, CheckpointCoordinator does not need isApproximateLocalRecoveryEnabled for anything, and it breaks a lot of tests, so, at this point, it is probably not the good place to put.

  2. So I put isApproximateLocalRecoveryEnabled in a similar place as scheduleMode in JobGraph. It will be removed together with scheduleMode later when removing scheduleMode. This flag is only used to make sure ApproximateLocalRecovery is not used together with JobManagerOptions.SCHEDULING_STRATEGY to region

  3. If JobManagerOptions.SCHEDULING_STRATEGY is set to legacy, EAGER strategy is enforced in StreamGraphGenerator#configureStreamGraph, but I still put a check there.

} else {
	graph.setStateBackend(stateBackend);
	graph.setScheduleMode(ScheduleMode.EAGER);

	if (checkpointConfig.isApproximateLocalRecoveryEnabled()) {
		checkApproximateLocalRecoveryCompatibility();
		graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
	} else {
		graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
	}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will also add a test for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think that we need to do the check on the cluster/JobMaster-side. At the moment we haven't exposed this configuration on a per-job basis because ideally we only have a single scheduler which is able to serve all workloads.

}

private void setBatchStateBackendAndTimerService(StreamGraph graph) {
boolean useStateBackend = configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND);
boolean sortInputs = configuration.get(ExecutionOptions.SORT_INPUTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private JobGraph createJobGraph() {

// make sure that all vertices start immediately
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
jobGraph.enableApproximateLocalRecovery(streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());

// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Expand Down Expand Up @@ -743,6 +744,8 @@ private ResultPartitionType determineResultPartitionType(StreamPartitioner<?> pa
}
case ALL_EDGES_PIPELINED:
return ResultPartitionType.PIPELINED_BOUNDED;
case ALL_EDGES_PIPELINED_APPROXIMATE:
return ResultPartitionType.PIPELINED_APPROXIMATE;
default:
throw new RuntimeException("Unrecognized global data exchange mode " + streamGraph.getGlobalDataExchangeMode());
}
Expand Down