Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11044] Add support for gracefully aborting workers. #12994

Merged
merged 8 commits into from
Oct 8, 2020

Conversation

tudorm
Copy link
Contributor

@tudorm tudorm commented Oct 1, 2020

On complete_work_status received (by the work progress updater thread), abort the corresponding map task executor thread. This relies on Thread.interrupt() to unblock the worker thread if blocked and raise an exception -- the exception is actually ignored by the backend since the backend already decided to abort this thread. Must also check the current thread for interrupts in the valuesiterator when it consumes a stream of values behind the same key, else control does not return to the runReadLoop() to check the interruption.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

…received (by the work progress updater thread), abort the corresponding map task executor thread. This relies on Thread.interrupt() to unblock the worker thread if blocked and raise an exception -- the exception is actually ignored by the backend since the backend already decided to abort this thread. Must also check the current thread for interrupts in the valuesiterator when it consumes a stream of values behind the same key, else control does not return to the runReadLoop() to check the interruption.
@tudorm
Copy link
Contributor Author

tudorm commented Oct 1, 2020

R: @kennknowles

@kennknowles kennknowles self-requested a review October 1, 2020 23:24
@tudorm
Copy link
Contributor Author

tudorm commented Oct 2, 2020

retest this please

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. One nit about error messages when the test fails. After that, I am happy to squash the commits and run spotlessApply to satisfy the linter if the PR is set up to allow maintainers to edit.

executor.execute();
fail("Should have aborted");
} catch (Exception e) {
Assert.assertTrue(e instanceof InterruptedException);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice for the next person if there is a diagnostic error message attached to this failure, lest we get a failure that says "false is not true".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Assert.assertTrue(e instanceof InterruptedException);
Mockito.verify(o).abort();
}
Assert.assertTrue(Thread.currentThread().isInterrupted());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@kennknowles
Copy link
Member

Run Java PreCommit

for (Operation op : operations) {
Preconditions.checkState(op instanceof ReadOperation || op instanceof ReceivingOperation);
if (op instanceof ReadOperation) {
((ReadOperation) op).abortReadLoop();
}
}
synchronized (this) {
if (currentExecutorThread != null) {
currentExecutorThread.interrupt();
Copy link
Member

@lukecwik lukecwik Oct 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no requirement for user code to handle being interrupted arbitrarily and there are enough instances that I have seen where this would not be handled gracefully by the user.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give an example to consider here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of this change is that user code will typically finish a callback and the abort will occur when control returns to the worker. To a user it should be transparently like other failures. Is that not the case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interrupting the GCS writer has caused issues with how the pipe that is used to transfer data is not gracefully shutdown leaving a blocked thread sitting around indefinitely.

Copy link
Member

@lukecwik lukecwik Oct 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally within Flume we saw with the C++ worker where user code didn't handle thread::cancel correctly and were relying on process crash to not get stuck.

I would generally love for us to be able to interrupt random code arbitrarily as it would be the best and cleanest way to do this and if we want to do this we should really have a way for users to opt-out incase it doesn't work for them. We can watch how many people opt-out and find out why and possibly make this the default forever or choose to make this the default in portable execution as that will require a migration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be misunderstanding, but this is a cooperative interrupt. The interrupt only happens when the user code return control.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline: "user" code can mean many things, and there are some common Java errors that this change could trigger:

  1. User's (who usually won't check the bit) doing catch-all error handling without separating out InterruptedException.
  2. IO libraries (which may often check the bit) doing incomplete or incorrect cleanup and being left in an inconsistent state or leaking resources.

These would ideally both be noticed and handled by higher-level mechanisms since in both cases something should be notably unhealthy about the thread, process, or VM. For now just aborting without interrupting the thread is safer.

@kennknowles
Copy link
Member

Just adding the evidence to our bugs tracking the flakes. I will keep running a few more times.

@kennknowles
Copy link
Member

Run Java PreCommit

@tudorm
Copy link
Contributor Author

tudorm commented Oct 6, 2020

Per offline discussion, remove the delivery of thread interrupt to the worker thread, and instead added async abort signaling up to the GroupingShuffleReader's (values) iterator.

@tudorm
Copy link
Contributor Author

tudorm commented Oct 7, 2020

Run Java PreCommit

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a JIRA associated with this task.

// record it advances over (i.e., for every distinct key), we skip the check when at
// the first value as that is redundant. Signal by thread interruption may be better, but
// it may also have unintended side-effects.
if (!atFirstValue && aborted.get()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking atFirstValue and aborted will likely perform worse then just checking aborted all the time. It may seem redundant but the abort happens asynchronously so we may have gotten past the check in the ReadOperation already.

@tudorm tudorm changed the title Add support for gracefully aborting workers. [BEAM-11044] Add support for gracefully aborting workers. Oct 8, 2020
@lukecwik lukecwik merged commit 1958eae into apache:master Oct 8, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants