Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-28077][checkpoint] Fix the bug that tasks get stuck during cancellation in ChannelStateWriteRequestExecutorImpl #19993

Merged
merged 1 commit into from Jun 21, 2022

Conversation

1996fanrui
Copy link
Member

What is the purpose of the change

Fix the bug that tasks get stuck during cancellation in ChannelStateWriteRequestExecutorImpl

Currently, just call the writer.fail(e); and don't throw InterruptedException when caught the InterruptedException in ChannelStateWriterThread. So ChannelStateWriterThread cannot exit when meet InterruptedException. We should throw InterruptedException, and the ChannelStateWriterThread will be finished.

Brief change log

Catch InterruptedException and throw it when caught the InterruptedException in ChannelStateWriterThread.

Verifying this change

This change is already covered by existing tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: yes
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: yes

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not documented

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 16, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@zentol
Copy link
Contributor

zentol commented Jun 16, 2022

AFAICT this hasn't fixed the issue; I can still reproduce it locally, with the task still getting stuck in the same place.

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we have a test for this behavior? This can crash a TM after all.

Comment on lines 101 to 103
} catch (InterruptedException e) {
writer.fail(e);
throw e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this maybe belong rather to FLINK-27792? AFAICT this isn't required for the issue at hand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you think it's related to FLINK-27792?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm interpreting that ticket as a more general "how to handle InterruptedException" ticket.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @zentol @pnowojski , after I analyzed, I found we don't need change this.

This change compared to before just calling writer.fail(e); when InterruptedException is caught. The previous code did not catch InterruptedException, so it will continue to throw when InterruptedException is encountered.

We don't need call the writer.fail(e); when InterruptedException is caught. Because the dispatcher.fail() will be called before ChannelStateWriteThread dead. code link.

The dispatcher.fail will call all writers's fail() method, so I think these change isn't necessary. I will revert it in next commit. We will focus on next change, that is, improve the discardAction.

Comment on lines 115 to 117
if (!dataFuture.isDone()) {
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this will likely solve the issue I'm not sure if it is the correct solution.

We could see in the logs that this future would eventually be completed, with several buffers being contained within. Admittedly this happened after the hosting TM shut down (so I'm not sure if it can happen in production where the JVM would go with it), but I do wonder if this couldn't cause a buffer leak.

Would there be any down-side of doing the clean-up like this:

dataFuture.thenAccept(
        buffers -> {
            try {
                CloseableIterator.fromList(buffers, Buffer::recycleBuffer)
                        .close();
            } catch (Exception e) {
            }
        });

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the dataFuture not being completed in the first place? Isn't that the real issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question I would ask is why this can even result in a TM crash; shouldn't the waiting be interrupted instead.

Copy link
Contributor

@pnowojski pnowojski Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @zentol that this doesn't look good and I would be afraid it could lead to some resource leaks.

Why is the dataFuture not being completed in the first place? Isn't that the real issue?

It looks to me like the issue is that dataFuture is being cancelled from the chain: PipelinedSubpartition#release() <- ... <- ResultPartition#release <- ... <- NettyShuffleEnvironment#close. Which happens after StreamTask#cleanUp (which is waiting for this future to complete), leading to a deadlock.

We would either need to cancel the future sooner (StreamTask#cleanUp?)`, or do what @zentol proposed. I think the latter is indeed a good option. We don't need to blockingly wait. Let's just not completely ignore exceptions here. Logging error should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question I would ask is why this can even result in a TM crash; shouldn't the waiting be interrupted instead.

Why should it be interrupted? We are only using interrupts to wake up user code or 3rd party libraries. Our own code should be able to shutdown cleanly without interruptions. We even explicitly disallow SIGINTs during StreamTask cleanup (StreamTask#disableInterruptOnCancel), once task thread exists from user code as otherwise this could lead to resource leaks. If we can not clean up resources, we have to relay on the TaskCancelerWatchDog that will fail over whole TM after a time out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBF I haven't seen an interrupt there myself; that idea is purely based on the proposed change.

Copy link
Contributor

@pnowojski pnowojski Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought interrupt can probably happen in this thread, when the thread was operating normally/writing something and is being cancelled (ChannelStateWriteRequestExecutorImpl#close). That's fine, interrupt can be used to wake up the thread doing some normal operation (especially writing to a file that can be blocked). What definitely shouldn't happen is interrupting causing some resource leak.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @zentol @pnowojski , I replied in FLINK-27792. And I think the root cause of FLINK-27792 is we don't have a reasonable cleanup dataFuture in discardAction, so we should improve the discardAction.

And I think the FLINK-27792 is the same root cause as FLINK-28077. Defective discardAction resulted in FLINK-27792 and FLINK-28077. Once this PR is done, they will be resolved.

I know this change still has some bugs, it may cause network memory leak. I will think more and think about how to solve it.

Do you think we can handle FLINK-27792 and FLINK-28077 by this PR? If anything is wrong, please correct me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @zentol @pnowojski , as I understand, there are four places in ChannelStateWriterThread that may be interrupted:

  1. Write a file
  2. deque.take() : pull request from deque
  3. dataFuture.get() in action: Write the file normally.
  4. dataFuture.get() in discardAction: Executed when canceling a request.

1,2 and 3 are all executed in ChannelStateWriteRequestExecutorImpl#loop, and InterruptedException is caught in the loop, so 1, 2 and 3 will not cause job InterruptedException. But case 4 may be called by ChannelStateWriteRequestExecutorImpl#cleanupRequests without catching InterruptedException, so case 4 may cause job InterruptedException(this is the root cause of FLINK-27792).

Why FLINK-28077? Why stuck in dataFuture.get() of the discardAction?

ChannelStateWriteRequestExecutorImpl#cleanupRequests will call the discardAction of all rest requests. The thread.interrupt() takes effect only once, if there are multiple requests that need to wait for dataFuture.get(), only the first request will be interrupted, and the remaining requests will be stuck(this is the root cause of FLINK-28077).

If it is changed to the proposal of @zentol , both FLINK-27792 and FLINK-28077 will be resolved. I have updated the PR. And I improved the test: ChannelStateWriteRequestExecutorImplTest#testCanBeClosed. For old code, the unit-test gets stuck in the second dataFuture.get(). After the change, the unit-test worked fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we can handle FLINK-27792 and FLINK-28077 by this PR?

Yes, we can fix FLINK-28077 in this PR, and then close FLINK-27792 as for example duplicate of FLINK-28077.

@zentol zentol self-assigned this Jun 17, 2022
@1996fanrui 1996fanrui force-pushed the 28077/throw-interrupted branch 3 times, most recently from 6154086 to 16d692f Compare June 20, 2022 05:13
Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @1996fanrui. LGTM % one minor comment.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the update! LGTM. @zentol , do you want to take a look as well?

…cellation in ChannelStateWriteRequestExecutorImpl
@zentol zentol merged commit 0912765 into apache:master Jun 21, 2022
@1996fanrui 1996fanrui deleted the 28077/throw-interrupted branch August 4, 2023 06:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants