Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk #13628

Closed
wants to merge 4 commits into from

Conversation

zhuzhurk
Copy link
Contributor

What is the purpose of the change

This PR fixes the issue reported in FLINK-19552, that JM can crash if concurrent failures happens.
The root cause is that MergingSharedSlotProfileRetrieverFactory incorrectly retrieves preferred locations for vertices which are not scheduled yet, while some of their producer vertex are canceled and so are their location futures.

Verifying this change

This change added tests and can be verified as follows:

  • UTs in SlotSharingExecutionSlotAllocatorTest and MergingSharedSlotProfileRetrieverTest
  • IT: PipelinedRegionSchedulingConcurrentFailureTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 6187d12 (Wed Oct 14 07:28:15 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

…t allow immediate failed logical slot request

A  newly created logical slot request should be either pending, or fulfilled successfully with an available physical slot from slot pool.
…ringGroup only for its executions from the scheduled bulk

Otherwise the preferred location futures of un-scheduled executions can be pending for long, or even canceled/failed which results in unexpected errors.
@flinkbot
Copy link
Collaborator

flinkbot commented Oct 14, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@tillrohrmann tillrohrmann self-assigned this Oct 14, 2020
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this fix @zhuzhurk. If I understand the problem correctly, then I think the PR fixes the problem. However, I have a couple of related questions:

  1. PipelinedRegionSchedulingConcurrentFailureTest. testNoImmediateSlotAllocationFailureOnConcurrentFailure seems to be able to reproduce the problem because the RestartPipelinedRegionFailoverStrategy also cancels not yet started downstream consumers. Why is this the case? Shouldn't it only be necessary to cancel Executions which are not CREATED?
  2. ExecutionGraphToInputsLocationsRetrieverAdapter seems to solve the problem of deadlocking the MergingSharedSlotProfileRetriever by returning an input future of an execution which is about to be scheduled by checking on the CREATED state. However, it returns the input location future if the Execution has a different state. To me this seems an odd responsibility of an InputLocationsRetriever. I think it would make more sense to filter such dependencies out at a different level.
  3. Why does MergingSharedSlotProfileRetriever.getSlotProfileFuture returns a future? At the time of scheduling a pipelined region, shouldn't all its inputs and their locations be known?
  4. The fix limits the set of input locations to the specified bulk. However, the bulk does need to contain all Executions which can share the slot. Hence, by doing this filtering, we give up potential information about other input locations for tasks which can also run in the same slot. Shouldn't we try to obtain their input locations and only ignore them if we cannot get them?
  5. In PipelinedRegionSchedulingConcurrentFailureTest, it seems as if Executions of v3 and v4 are in the same ExecutionSlotSharingGroup as Executions of v1 and v2. I think this is also the reason why these ExecutionVertices are actually asked for their input locations. Given that v2 and v3 is separated by a blocking data exchange, how can Executions of these vertices share a slot? Wouldn't it also mean that we are asking for a too large slot because SlotSharingExecutionSlotAllocator.getPhysicalSlotResourceProfile includes v1, v2, v3, v4 in one slot?

If there is no strong need for waiting for a future input location, I would say that it would probably be easiest if the MergingSharedSlotProfileRetriever checks all Executions of its ExecutionSlotSharingGroup and collects the available input locations. Based on this information it can then make an approximate decision. That way, it should also ignore currently failed tasks. Moreover, we should check how the ExecutionSlotSharingGroups are calculated. There might be small inaccuracy in it.

cc @azagrebin

@tillrohrmann
Copy link
Contributor

tillrohrmann commented Oct 14, 2020

A somewhat related question concerning the ExecutionSlotSharingGroup is the following: How will the ExecutionSlotSharingGroup be calculated if we have the following JobGraph:

v1 --> v2
v3 --> v4

here v1, v2, v3, v4 are JobVertices and --> is a blocking data exchange. Given this specification we can say that neither v1 and v2 can share slots because of the blocking data exchange. The same applies for v3 and v4. However, v3 or v4 could share the slot with v1. How is this solved? Will the effective execution slot sharing group for v1 contain v3 and v4 or will we say v1 can share with v3 but not v4?

@zhuzhurk
Copy link
Contributor Author

A somewhat related question concerning the ExecutionSlotSharingGroup is the following: How will the ExecutionSlotSharingGroup be calculated if we have the following JobGraph:

v1 --> v2
v3 --> v4

here v1, v2, v3, v4 are JobVertices and --> is a blocking data exchange. Given this specification we can say that neither v1 and v2 can share slots because of the blocking data exchange. The same applies for v3 and v4. However, v3 or v4 could share the slot with v1. How is this solved? Will the effective execution slot sharing group for v1 contain v3 and v4 or will we say v1 can share with v3 but not v4?

ExecutionSlotSharingGroup relies on how we set the SlotSharingGroup to JobVertex. This means, if v1, v2, v3, v4 are in the same SlotSharingGroup, then we can have an ExecutionSlotSharingGroup {ev11, ev21, ev31, ev41}. So I think your question here is about how we set SlotSharingGroup of job vertices.
The ways to set default SlotSharingGroup for job vertices are currently different for streaming jobs(including DataStream and Table/SQL streaming), blink planner batch jobs and DataSet jobs.

  • For streaming jobs, all job vertices are by default in the same "default" SlotSharingGroup.
  • For blink planner jobs, each logical pipelined region corresponds to a SlotSharingGroup.
  • For DataSet jobs, all job vertices are by default in the same "default" SlotSharingGroup. This is not ideal and I think should be changed to be aligned with blink planner jobs.

Regarding whether we should make v1 and v3 in the same SlotSharingGroup, I think it's not very necessary while it can complicate things. I feel it is not necessary because their slot sharing will not improve input locality. And unlike streaming jobs, it does not simplify the resource reasoning.

@zhuzhurk
Copy link
Contributor Author

zhuzhurk commented Oct 14, 2020

Thanks for creating this fix @zhuzhurk. If I understand the problem correctly, then I think the PR fixes the problem. However, I have a couple of related questions:

  1. PipelinedRegionSchedulingConcurrentFailureTest. testNoImmediateSlotAllocationFailureOnConcurrentFailure seems to be able to reproduce the problem because the RestartPipelinedRegionFailoverStrategy also cancels not yet started downstream consumers. Why is this the case? Shouldn't it only be necessary to cancel Executions which are not CREATED?
  2. ExecutionGraphToInputsLocationsRetrieverAdapter seems to solve the problem of deadlocking the MergingSharedSlotProfileRetriever by returning an input future of an execution which is about to be scheduled by checking on the CREATED state. However, it returns the input location future if the Execution has a different state. To me this seems an odd responsibility of an InputLocationsRetriever. I think it would make more sense to filter such dependencies out at a different level.
  3. Why does MergingSharedSlotProfileRetriever.getSlotProfileFuture returns a future? At the time of scheduling a pipelined region, shouldn't all its inputs and their locations be known?
  4. The fix limits the set of input locations to the specified bulk. However, the bulk does need to contain all Executions which can share the slot. Hence, by doing this filtering, we give up potential information about other input locations for tasks which can also run in the same slot. Shouldn't we try to obtain their input locations and only ignore them if we cannot get them?
  5. In PipelinedRegionSchedulingConcurrentFailureTest, it seems as if Executions of v3 and v4 are in the same ExecutionSlotSharingGroup as Executions of v1 and v2. I think this is also the reason why these ExecutionVertices are actually asked for their input locations. Given that v2 and v3 is separated by a blocking data exchange, how can Executions of these vertices share a slot? Wouldn't it also mean that we are asking for a too large slot because SlotSharingExecutionSlotAllocator.getPhysicalSlotResourceProfile includes v1, v2, v3, v4 in one slot?

If there is no strong need for waiting for a future input location, I would say that it would probably be easiest if the MergingSharedSlotProfileRetriever checks all Executions of its ExecutionSlotSharingGroup and collects the available input locations. Based on this information it can then make an approximate decision. That way, it should also ignore currently failed tasks. Moreover, we should check how the ExecutionSlotSharingGroups are calculated. There might be small inaccuracy in it.

cc @azagrebin

Thanks for the thoughtful comments. @tillrohrmann
I agree that we can simply MergingSharedSlotProfileRetriever to directly return SlotProfile instead of a future by just collecting available input locations. This should be enough for pipelined region scheduling which schedules a region only if all its inputs regions have finished.
Regarding the inaccuracy of ExecutionSlotSharingGroups, does #5 or the other comment answers your question?

Below are replies to each listed question:

  1. This is unwanted side effect of restarting all downstream tasks of a failed task to avoid indeterministic issues. I agree we should skip canceling tasks in CREATED. This can reduce a lot of unnecessary canceling logs, especially for large scale batch jobs.
  2. Agreed it's odd. But given that EAGER/LAZY scheduling which will be kept for a while, and location future is still needed for them, maybe we can leave it as is. While for pipelined region scheduling, we can introduce PreferredLocationsRetrieverV2 and InputsLocationsRetrieverV2 which directly returns locations instead of futures? Only available locations will be considered in this case.
  3. If considering the whole ExecutionSlotSharingGroup, some vertices may have not been scheduled so their inputs may not be decided. However, we can change it to just consider the available locations at that time, like mentioned in #2
  4. Yes you are right. It would be better to take those un-scheduled vertices into consideration when calculating preferred input locations.
  5. The case mocks a DataSet job. Currently, job vertices of a DataSet job will be put into the same SlotSharingGroup. I think we need to change it to assign each logical pipelined region a different slot sharing group, similarly like how blink planner batch jobs do. This is something we have missed after making pipelined region scheduling for all kinds of jobs.

In summary, here are the actions to take:

  1. Fix FLINK-19552 in a different way. i.e. introducing PreferredLocationsRetrieverV2 and InputsLocationsRetrieverV2 which directly returns locations by only available ones
  2. Change default SlotSharingGroup setting for DataSet jobs by assigning each logical pipelined region a different slot sharing group
  3. Change region failover to skip restarting CREATED tasks

WDYT?

@tillrohrmann
Copy link
Contributor

Hi @zhuzhurk, thanks for answering my questions so swiftly. I agree with your proposal how to fix the problem with the 3 steps.

  1. Fix FLINK-19552 in a different way. i.e. introducing PreferredLocationsRetrieverV2 and InputsLocationsRetrieverV2 which directly returns locations by only available ones
  2. Change default SlotSharingGroup setting for DataSet jobs by assigning each logical pipelined region a different slot sharing group
  3. Change region failover to skip restarting CREATED tasks

@azagrebin
Copy link
Contributor

I generally agree with the proposed steps. We should try to take into account locations of all available producers for a given SlotSharingGroup, hence futures are not needed in case of pipelined region scheduling.

I have a note about SlotSharingGroup assignment for batch scheduling:

each logical pipelined region has a different SlotSharingGroup

Although, this assignment simplifies things,
given @tillrohrmann 's example for the parallelism of one (as I understand v1 --> v2 and v3 --> v4 are basically disjoint pipelines):

v1 --> v2
v3 --> v4

here v1, v2, v3, v4 are JobVertices and --> is a blocking data exchange

v11 and v31 executions cannot run at the same time if there is only one slot.
Not sure, how important it is for performance, probably depends on the use case.
On the other hand, deciding about such fine-grained slot sharing for some bigger example may be a non-trivial task and up to the user to decide using API.

@zhuzhurk
Copy link
Contributor Author

@azagrebin I feel that it is not very necessary to put v1 and v3 in the same SlotSharingGroup. The reasons are:

  1. For streaming jobs, put all vertices in the same slot sharing group make it easier for users to decide the job parallelism with regard to the cluster resources. But for batch jobs, it is not needed.
  2. v1 and v3 do not have input dependency, so putting them into the same slot does not reduce data exchange cost, compared to tasks in the same pipelined region
  3. If we can have one slot which runs v11 and v31 together, it indicates that the resources are actually enough to be split into 2 slots, one for v11 and one for v31.

So I think assigning each logical pipelined region to a different SlotSharingGroup is enough and simpler.
Moreover, as you mentioned, this is just the default behavior and we can enable users to set SlotSharingGroup for tasks via API if there were strong requirements for that.

@azagrebin
Copy link
Contributor

  1. v1 and v3 do not have input dependency, so putting them into the same slot does not reduce data exchange cost, compared to tasks in the same pipelined region

I meant execution speed up by running V1 and v3 at the same time in the same slot (makes sense w/o dynamic slot allocation as it is mostly the case now).

  1. If we can have one slot which runs v11 and v31 together, it indicates that the resources are actually enough to be split into 2 slots, one for v11 and one for v31.

This is indeed not an issue when we have dynamic slot allocation.

So I think assigning each logical pipelined region to a different SlotSharingGroup is enough and simpler.
Moreover, as you mentioned, this is just the default behavior and we can enable users to set SlotSharingGroup for tasks via API if there were strong requirements for that.

Agree, it is generally hard for Flink to decide which regions can run together in batch.

@tillrohrmann
Copy link
Contributor

@zhuzhurk and @azagrebin have you already created the follow up issues?

@zhuzhurk
Copy link
Contributor Author

@tillrohrmann I just opened FLINK-19712 and FLINK-19714 to track them.

@azagrebin
Copy link
Contributor

Closing in favour of #13730.

@azagrebin azagrebin closed this Nov 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants