Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-13124] Don't forward exceptions when finishing SourceStreamTask #9090

Closed

Conversation

aljoscha
Copy link
Contributor

What is the purpose of the change

Before, exceptions that occurred after cancelling a source (as the
KafkaConsumer did, for example) would make a job fail when attempting a
"stop-with-savepoint". Now we ignore those exceptions.

Brief change log

  • add a isFinished flag in SourceStreamTask
  • check this flag before re-throwing exceptions

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests in SourceStreamTaskTest

Does this pull request potentially affect one of the following parts:

  • Checkpointing, yes, because it fixes stop-with-savepoint

Documentation

  • Does this pull request introduce a new feature? no

@aljoscha aljoscha requested a review from kl0u July 11, 2019 13:46
@aljoscha
Copy link
Contributor Author

cc @knaufk

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 11, 2019

CI report:

@knaufk
Copy link
Contributor

knaufk commented Jul 11, 2019

stop now works with the fix in my test setup. Thanks @aljoscha.

Copy link
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work @aljoscha ! I had some comments that I left on the PR.

* want to ignore exceptions thrown after finishing, to ensure shutdown works smoothly.
*/
private volatile boolean isFinished = false;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to move this flag to the StreamTask. The reason is that although for now it is used only in the SourceStreamTask, know if the job was normally terminated or cancelled is a property relevant to all the StreamTasks.

@@ -147,6 +155,7 @@ protected void cancelTask() {

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the above comments in mind, I would suggest to have a method in the StreamTask like the following:

@VisibleForTesting
 void finish() throws Exception {
		finished = true;
		finishTask(); // this is implemented by subclasses
	}

(needed in the finishingIgnoresExceptions() test).

And with this, the end of the performCheckpoint should become:

if (isRunning && syncSavepointLatch.isSet()) {

			final boolean checkpointWasAcked =
					syncSavepointLatch.blockUntilCheckpointIsAcknowledged();

			if (checkpointWasAcked) {
				finishTask();
			}
		}

@@ -118,7 +124,9 @@ protected void performDefaultAction(ActionContext context) throws Exception {
}

sourceThread.join();
sourceThread.checkThrowSourceExecutionException();
if (!isFinished) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the above comment, this can be a protected getter method for the finished flag in the StreamTask, available to all StreamTasks.

@aljoscha
Copy link
Contributor Author

Master has moved since I opened this PR, the fix still works, with a change, but I want to investigate a bit further because that change on master removes other new behaviour that my PR was testing in one of the test cases.

@aljoscha aljoscha force-pushed the flink13124-stop-with-savepoint-fix branch from 6c22974 to b1330cb Compare July 13, 2019 22:45
@aljoscha
Copy link
Contributor Author

PTAL (please take another look). I now changed this to work with latest master. (there was a change in how the mailbox worked.)

Before, exceptions that occurred after cancelling a source (as the
KafkaConsumer did, for example) would make a job fail when attempting a
"stop-with-savepoint". Now we ignore those exceptions.
@aljoscha aljoscha force-pushed the flink13124-stop-with-savepoint-fix branch from b1330cb to 6b32e06 Compare July 14, 2019 05:23
@kl0u kl0u self-requested a review July 15, 2019 10:15
Copy link
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aljoscha Thanks for the work!

Changes look good for the master, but the changes in the mailbox that you refer to, are not present in the release-1.9 branch. I can merge this PR to the master, but for the release-1.9 you should open a new PR, potentially with the solution included in this PR before your last rebasing.

WDYT?

@aljoscha
Copy link
Contributor Author

ok, thanks! could you please merge? (just arrived in china 😅 )

@kl0u
Copy link
Contributor

kl0u commented Jul 15, 2019

Sure! I will merge now.

@kl0u kl0u closed this Jul 15, 2019
@kl0u
Copy link
Contributor

kl0u commented Jul 15, 2019

Merged on master

// We tell the mailbox to finish, to prevent any exceptions that might occur during
// finishing from leading to a FAILED state. This could happen, for example, when cancelling
// sources as part of a "stop-with-savepoint".
mailboxProcessor.allActionsCompleted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being late in this PR.
Is there a reason why in master branch version there is mailboxProcessor.allActionsCompleted(); in finishTask() but in release-1.9 branch doesn't have it (#9115)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the release-1.9 branch StreamTask.cancel() also doesn't use this method, in fact the MailboxProcessor doesn't yet exist and isn't used like this. However, this solution leads to a race condition, see also #9125.

@aljoscha aljoscha deleted the flink13124-stop-with-savepoint-fix branch July 25, 2019 15:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants