Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19693][runtime] Downstream Failover for Approximate Local Recovery #13880

Closed
wants to merge 3 commits into from

Conversation

curcur
Copy link
Contributor

@curcur curcur commented Nov 2, 2020

What is the purpose of the change

This PR includes three changes:

  • Enables downstream failover for approximate local recovery. That says if a task fails, all its downstream tasks restart, including itself. This is achieved by reusing the existing RestartPipelinedRegionFailoverStrategy --- treat each individual task connected by ResultPartition.Pipelined_Approximate as a separate region.
  • Expose Approximate downstream failover flag as an internal feature flag in CheckpointConfig: approximateLocalRecovery
  • ITcases downstream failover

Brief change log

  • introduce an attribute "reconnectable" in ResultPartitionType to indicate whether the partition is reconnectable. Notice that this is only a temporary solution for now. It will be removed after:
    1. Approximate local recovery has its won failover strategy to restart the failed set of tasks instead of
      restarting downstream of failed tasks depending on {@code RestartPipelinedRegionFailoverStrategy}
    2. FLINK-19895: Unify the life cycle of ResultPartitionType Pipelined Family. There is also a good discussion on this in FLINK-19632.
  • PipelinedRegionComputeUtil#buildRawRegions to build each task connected by PIPELINED_APPROXIMATE as a region.
  • JobMasterPartitionTrackerImpl#startTrackingPartition to track the pipelined_approximate partition
  • Introduce an internal flag approximateLocalRecovery in CheckpointConfig to enable the feature.
  • ITCase (examples) how to use Approximate downstream failover.

Verifying this change

Unittests + ITCases

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
    An internal feature only, not ready for public usage.

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 2, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 55b1489 (Mon Nov 02 11:28:07 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@curcur curcur force-pushed the single_task_scheduler_change_PR branch 3 times, most recently from b9910ec to 158a612 Compare November 2, 2020 11:57
@flinkbot
Copy link
Collaborator

flinkbot commented Nov 2, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @curcur. The changes itself look good to me. I had some comments concerning the IT case and the way the PIPELINED_APPROXIMATE result partitions are set. Please take a look.

Comment on lines 721 to 723
if (streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled()) {
return ResultPartitionType.PIPELINED_APPROXIMATE;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be integrated with the GlobalDataExchangeMode and the shuffleMode as well? Otherwise we need to document that approximate local recovery disables the global data exchange mode setting and only works if shuffleMode is UNDEFINED.

Copy link
Contributor Author

@curcur curcur Nov 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've integrated "isApproximateLocalRecoveryEnabled" with GlobalDataExchangeMode in the updated PR in this way:

  • GlobalDataExchangeMode is decided based on whether "isApproximateLocalRecoveryEnabled";
  • Compatibility with ApproximateLocalRecoveryEnabled is also checked here (unaligned checkpoint can not be used together with approximate local recovery for now).

@Before
public void setup() throws Exception {
Configuration config = new Configuration();
config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this required? Per default we will have the pipelined region scheduler activated.

Copy link
Contributor Author

@curcur curcur Nov 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, "region" strategy is not working with Approximate local recovery for several reasons:

  1. I had a very simple version of restart individual task failover strategy here. In this case, only the failed task restarts. The region scheduling does not work due to the check PipelinedRegionSchedulingStrategy#maybeScheduleRegion -- checkState(areRegionVerticesAllInCreatedState(region)

  2. After switching to RestartPipelinedRegionFailoverStrategy strategy, it fails during the initialization stage:
    PipelinedRegionSchedulingStrategy#init() --- checkState(partition.getResultType().isBlocking());. Since the regional failover is going be temporally used, I haven't digged into details and it seems the scheduling strategy is depending on some assumption that the result partition has to be a blocking type.

I will probably need to work on this a bit more to make the pipelined region scheduler compatible with approximate local recovery.

I probably need to enforce the scheduling strategy in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be resolved. We are planning on removing the legacy scheduler with the next release. Moreover, in the current state users would have to switch the scheduler to use the approximate failover strategy.

@curcur curcur force-pushed the single_task_scheduler_change_PR branch from 848a95d to c580ce7 Compare November 5, 2020 16:35
@curcur
Copy link
Contributor Author

curcur commented Nov 5, 2020

Hey @tillrohrmann , thank you so much for reviewing the code!

I believe I’ve addressed most of your comments except the “legacy scheduler strategy” problem.

I will read the code of the new regional strategy tomorrow. If it turns out non-trivial of the change, what we are going to do with it…

It is probably not very safe to make non-trivial changes on that critical piece just before the freeze date, especially I am not familiar with that part of the code?

But no matter what, I would take a look at the code and evaluate the change first.

@curcur
Copy link
Contributor Author

curcur commented Nov 6, 2020

Hey @tillrohrmann , I think I roughly know the reason why the region scheduler does not work for approximate mode.

Here is what I found: in the case where approximate_failover is enabled, only source task is scheduled to be deployed, and consumer tasks never deployed. So I guess it may be related to how the region is used in the region PipelinedRegionSchedulingStrategy.

  • For pipeline_(bounded), all vertices connected are considered as one region; there are not any dependent relations between regions, so PipelinedRegionSchedulingStrategy works roughly the same as EagerSchedulingStrategy
  • For blocking, regions are scheduled after dependent regions’ produced partitions are consumable.
  • From this point, it sounds like PipelinedRegionSchedulingStrategy should also work for pipeline_(approximate), but depends on how “produced partitions are consumable” is notified.

The current version of how “produced partitions are consumable” is notified is very “blocking” specific.

consumerRegions are maybe scheduled upon
PipelinedRegionSchedulingStrategy#onExecutionStateChange
First of all, it needs the “executionState == ExecutionState.FINISHED”;
Second, only FINISHED and FAILED are notifiable in SchedulerBase#updateTaskExecutionState
and e.t.c.

I think if we make the “produced partitions are consumable” notification propagated properly for pipeline_approximated, it should work with pipeline_approximated as well.

But the question is whether it is worthing the change? Because later, we probably won’t make each task in approximate mode a region after it has its own restart strategy?

In short, the reason is approximate failover is restarted regionally but expected to be deployed as one region (if connected).

@curcur curcur force-pushed the single_task_scheduler_change_PR branch from c580ce7 to efd6909 Compare November 6, 2020 12:04
@curcur
Copy link
Contributor Author

curcur commented Nov 6, 2020

rebase (resolve conflicts) + address @pnowojski 's suggestion.

@curcur curcur requested a review from pnowojski November 6, 2020 12:05
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR and the analysis for why the pipelined region scheduler does not work with approximate local recovery. I had a couple of additional comments. Please take a look.

Concerning supporting the new scheduler, I would suggest that we enforce that the legacy scheduler is being configured when using approximate local recovery. Then we have to file a follow up issue which adds support for this feature when using the pipelined region scheduler because we are about to remove the legacy scheduler from Flink.

One possible way to solve the problem is to fix https://issues.apache.org/jira/browse/FLINK-19895 and then to allow the pipelined region scheduler to also start scheduling of partial pipelined regions (given that the other part is already running).

checkState(
!checkpointConfig.isUnalignedCheckpointsEnabled(),
"Approximate Local Recovery and Unaligned Checkpoint can not be used together yet");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do a similar check for the scheduler configuration? I think we should fail hard if we see that we are not using the legacy scheduler. Moreover, an explanatory exception message would be helpful as well.

Copy link
Contributor Author

@curcur curcur Nov 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried out different ways to do the check. It is slightly different from the unaligned checkpoint check, because the PipelinedRegionScheduler is configured in the JobMaster level, and approximate local recovery is configured at the job level. (BTW, I can kind of guessing the reason why it is a job-master level config, but still curious why because I probably would have the same question later when I am constructing a separate failover strategy. We can probably discuss this later).

The check should happen in the same place where either PIPELINED_REGION_SCHEDULING or LEGACY_SCHEDULING is chosen, where the type SchedulingStrategyFactory is decided.

  1. In JobGraph, the most reasonable place to put the config "isApproximateLocalRecoveryEnabled" seems to be JobCheckpointingSettings#CheckpointCoordinatorConfiguration, similar to unaligned checkpoint's config. However CheckpointCoordinatorConfiguration as its name, is for CheckpointCoordinator and will be serialized to CheckpointCoordinator. But in fact, CheckpointCoordinator does not need isApproximateLocalRecoveryEnabled for anything, and it breaks a lot of tests, so, at this point, it is probably not the good place to put.

  2. So I put isApproximateLocalRecoveryEnabled in a similar place as scheduleMode in JobGraph. It will be removed together with scheduleMode later when removing scheduleMode. This flag is only used to make sure ApproximateLocalRecovery is not used together with JobManagerOptions.SCHEDULING_STRATEGY to region

  3. If JobManagerOptions.SCHEDULING_STRATEGY is set to legacy, EAGER strategy is enforced in StreamGraphGenerator#configureStreamGraph, but I still put a check there.

} else {
	graph.setStateBackend(stateBackend);
	graph.setScheduleMode(ScheduleMode.EAGER);

	if (checkpointConfig.isApproximateLocalRecoveryEnabled()) {
		checkApproximateLocalRecoveryCompatibility();
		graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
	} else {
		graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
	}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will also add a test for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think that we need to do the check on the cluster/JobMaster-side. At the moment we haven't exposed this configuration on a per-job basis because ideally we only have a single scheduler which is able to serve all workloads.

…very

Enables downstream failover for approximate local recovery. That says if a task fails,
all its downstream tasks restart, including itself. This is achieved by reusing the existing
RestartPipelinedRegionFailoverStrategy --- treat each individual task connected by
ResultPartition.Pipelined_Approximate as a separate region.

To achieve this, we introduced an attribute "reconnectable" in ResultPartitionType
to indicate whether the partition is reconnectable. Notice that this is only a temporary
solution for now. It will be removed after:
 - Approximate local recovery has its won failover strategy to restart the failed set of
   tasks instead of restarting downstream of failed tasks depending on
   {@code RestartPipelinedRegionFailoverStrategy}
 - FLINK-19895: Unify the life cycle of ResultPartitionType Pipelined Family.
   There is also a good discussion on this in FLINK-19632.
@curcur curcur force-pushed the single_task_scheduler_change_PR branch from efd6909 to 62bcae9 Compare November 8, 2020 09:56
@curcur curcur force-pushed the single_task_scheduler_change_PR branch 2 times, most recently from 1f68be3 to 1917662 Compare November 8, 2020 11:18
@curcur
Copy link
Contributor Author

curcur commented Nov 8, 2020

Hey Till @tillrohrmann, thanks so much for reviewing!!

Addressed your comments.

Left some consideration why put the isApproximateLocalRecoveryEnabled check vs PipelinedRegionScheduler in the current place.

@curcur curcur force-pushed the single_task_scheduler_change_PR branch from 1917662 to 46b3d4b Compare November 8, 2020 12:12
@curcur
Copy link
Contributor Author

curcur commented Nov 8, 2020

@flinkbot run azure

@curcur
Copy link
Contributor Author

curcur commented Nov 8, 2020

https://issues.apache.org/jira/browse/FLINK-20048

Make a ticket to track this issue:
Make Approximate Local Recovery Compatible With PipelinedRegionSchedulingStrategy

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for resolving my comments @curcur. The changes look good to me. I will address my last remaining comments myself before merging this PR. Thanks a lot for your good work!

Comment on lines +84 to +86
assertTrue(
e.getMessage()
.contains("Approximate local recovery can not be used together with PipelinedRegionScheduler for now"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could use FlinkMatchers.containsMessage

Comment on lines +99 to +101
assertTrue(
e.getMessage()
.contains("Approximate local recovery can only be used together with EAGER schedule mode"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

checkState(
!checkpointConfig.isUnalignedCheckpointsEnabled(),
"Approximate Local Recovery and Unaligned Checkpoint can not be used together yet");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think that we need to do the check on the cluster/JobMaster-side. At the moment we haven't exposed this configuration on a per-job basis because ideally we only have a single scheduler which is able to serve all workloads.

@tillrohrmann
Copy link
Contributor

Merged via 0b3f15e

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants