Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-21400] Store attempt numbers outside ExecutionGraph #15019

Merged
merged 8 commits into from
Mar 12, 2021

Conversation

zentol
Copy link
Contributor

@zentol zentol commented Feb 25, 2021

Introduces a data structure to store the attempt numbers outside the ExecutionGraph. It's really just a simple Map<ExecutionVertexID, Integer> which ties a specific vertex+subtask to an attempt count.

Counts are set when an execution is registered at the EG, and retrieved when the ExecutionVertex creates a new Execution. The current attempt count is also still stored in the Execution, making the change less invasive (for example, resetForNewExecution continues to work without modifications).

One thing is that, as is, the semantics when it comes to rescaling are a bit funky.
ScaleUp:
If you begin with p=1 and an attempt count of 4, and then rescale to p=2, then what should the attempt count be for both subtasks?
In this version the attempt count for subtask 1 would be retained, while subtask 2 starts at 0.
Setting both to 0 would also make sense, but if we downscale again to p=1 then it would be nice if the attempt count had some relation to the original count.
Alternatively we could try to derive the attempt count for subtask 2 from other subtasks; in this example the obvious choice would be 2, because we're just replicating subtask 1.

ScaleDown:
The main issue arises when scaling down where the subtask with the largest index has the highest attempt count; currently this count would be lost. So you have p=2, and subtask 2 has an attempt count of 4, and now you scale down to p=1. The attempt count would now be solely determined by subtask 1, although we in essence just merged the two.

Overall, I don't think resetting attempt counts to 0 is an option, because they can be used to gauge the health of a vertex, and we'd run into collisions within metrics if we ever re-use a subtask+attempt combination.

The current approach is by far the simplest, and is the only option iff we want to adhere to these rules:

  • every combination of subtask + attempt count is only used once
  • the attempt counts for a given subtask over time always form a continuous series starting at 0

But I'm quite interested in what other people think about this.

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 021332c (Thu Feb 25 08:53:57 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 25, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@zentol
Copy link
Contributor Author

zentol commented Feb 25, 2021

One issues is that the semantics for the attempt numbers as a whole are not really well-defined; internally we use it to differentiate different deployments (for example, when we tried to create deterministic ExecutionAttemptIDs), but users probably think more of it like "how often did this subtask fail".

I'm not even sure how valuable the attempt number is as a whole; I'd think that users either want to identify instable TaskManagers (where the question is "how many failures occur on this TM") or operators ("how often do subtasks of this operator fail?"), but the attempt number fulfills neither because it is also incremented for subtasks did not actually cause a failure.

@tillrohrmann
Copy link
Contributor

The semantics of the execution attempts is a very good question. I think I would keep it as you've implemented it in the PR: We keep monotonously increasing attempt counters for all ever seen Executions.

  • In case of a scale up, new Executions with an attempt number 0 will be inserted
  • In case of a scale down, some of the Executions won't be restarted and, thus, their attempt counter stays as is
  • Increasing the attempt counter for those Executions which will be restarted.

Concerning the implementation, have you considered not introducing the counter to the ExecutionGraph but rather maintaining it outside? What this would require is the following:

  1. A way to give an immutable map of attempts to the EG when it is created in order to initialize the Executions.
  2. A way to retrieve the set of last attempt counters when the EG is terminated. This value could be used to update the external attempt counter.

@zentol zentol changed the title [DRAFT][FLINK-21400] Store attempt numbers outside ExecutionGraph [FLINK-21400] Store attempt numbers outside ExecutionGraph Mar 8, 2021
@zentol
Copy link
Contributor Author

zentol commented Mar 8, 2021

The PR now contains a proper implementation.

@@ -116,7 +116,7 @@ public void setUp() throws Exception {
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(clusterConfig)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.setNumberSlotsPerTaskManager(2)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is essentially the same issue as FLINK-21428

pom.xml Outdated
@@ -939,6 +939,9 @@ under the License.

<profile>
<id>enable-adaptive-scheduler</id>
<activation>
<jdk>[8,)</jdk>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is temporary and just for demonstration purposes

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @zentol. LGTM. I had few minor comments. +1 for merging after resolving them.

@zentol zentol force-pushed the 21400 branch 2 times, most recently from cdfd953 to 8f3d2b5 Compare March 11, 2021 16:20
@zentol zentol merged commit ce6fd3e into apache:master Mar 12, 2021
@zentol zentol deleted the 21400 branch March 19, 2021 08:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants