Skip to content

[FLINK-21258] Add Canceling state for DeclarativeScheduler#14909

Closed
rmetzger wants to merge 3 commits intoapache:masterfrom
rmetzger:FLINK-21258-cancelling
Closed

[FLINK-21258] Add Canceling state for DeclarativeScheduler#14909
rmetzger wants to merge 3 commits intoapache:masterfrom
rmetzger:FLINK-21258-cancelling

Conversation

@rmetzger
Copy link
Copy Markdown
Contributor

@rmetzger rmetzger commented Feb 9, 2021

This PR is based on #14879

What is the purpose of the change

Declarative Scheduler consists of a number of internal states.

Note that this change is currently not usable as-is, as the other parts of declarative scheduler are not merged yet (See for the prototype this PR is based on: https://github.com/tillrohrmann/flink/tree/declarative-scheduler)

Verifying this change

  • The change is adding unit tests.
  • Note that integration tests for the declarative scheduler will cover additional functionality.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

Will be handled in a separate PR.

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Feb 9, 2021

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit b1053be (Tue Feb 09 11:32:29 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Feb 9, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @rmetzger. The changes look already quite good. I had a couple of comments concerning the tests. Please take a look.

public void testTransitionToFinishedWhenCancellationCompletes() throws Exception {
try (MockStateWithExecutionGraphContext ctx = new MockStateWithExecutionGraphContext()) {
Canceling canceling = createCancelingState(ctx);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we don't have to call onEnter?

@Test
public void testTransitionToFinishedWhenCancellationCompletes() throws Exception {
try (MockStateWithExecutionGraphContext ctx = new MockStateWithExecutionGraphContext()) {
Canceling canceling = createCancelingState(ctx);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit hidden why this test works. I assume it is the case because the ExecutionGraph does not contain any Executions which are in DEPLOYING or RUNNING state. That's why a ExecutionGraph.cancel can directly cancel all Executions. Maybe we can make this a bit more explicit.

public void testTransitionToSuspend() throws Exception {
try (MockStateWithExecutionGraphContext ctx = new MockStateWithExecutionGraphContext()) {
Canceling canceling = createCancelingState(ctx);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that you don't call onEnter because it would make the state go to Finished. I think that we should stick to the convention of State and call onEnter. Otherwise we risk that we are testing something which is impossible to reach in production. The proper solution would be to ensure that the ExecutionGraph does not directly go to CANCELLED when onEnter is called.

ExecutionState.FAILED,
new RuntimeException()));
canceling.updateTaskExecutionState(update);
ctx.assertNoStateTransition();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also assert that the ExecutionGraph has been updated. Otherwise a passing implementation of the Canceling.updateTaskExecutionState could be an empty method.

Technically, we would also have to do this for the other StateWithExecutionGraph sub classes.

Comment on lines +167 to +185
public void notifyGlobalFailure(Throwable t) {
canceling.handleGlobalFailure(t);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For which test do we need this implementation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the testTaskFailuresAreIgnored() test.

I'm actually proposing to remove that test. It doesn't belong here. We need to test the proper handling of the updateTaskExecutionState() on the DeclarativeScheduler itself (which registers an InternalFailuresListener and forwards failures to the handleGlobalFailure() method of the State).
Since we are testing the handleGlobalFailure() in a unit test, having a test for updateTaskExecutionState() here doesn't make much sense.

@zentol: Can we add such a test to the DeclarativeScheduler skeleton PR?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not mistaken, then we don't need an InternalFailuresListener for testing the updateTaskExecutionState. I thought that his listener is only used when a failure in some other operation (e.g. deploy occurs). Testing that updateTaskExecutionState stays in the CANCELING state makes sense to me.

@rmetzger
Copy link
Copy Markdown
Contributor Author

I rebased this change to the latest master (thus fewer commits are included), and addressed all comments
Once you confirm that the testTaskFailuresAreIgnored() doesn't make sense, I'll remove it.

I once again introduced a MockExecutionGraph for this test. But extracting an interface from the ExecutionGraph is a really involved change that would not be worth it for my needs in this PR.
It is easier to introduce a ExecutionGraph interface in a separate change.

Copy link
Copy Markdown
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this PR @rmetzger. I think it makes sense to keep testTaskFailuresAreIgnored because we want to ensure that we don't leave the state on a failed task state update. Moreover, I think we shouldn't need TestInternalFailuresListener.

@rmetzger
Copy link
Copy Markdown
Contributor Author

Thanks for your review!

I'll remove the TestInternalFailuresListener and mock the ExecutionGraph to intercept the failGlobal() method, then merge the change.

@rmetzger rmetzger closed this in fe41328 Feb 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants