Skip to content

Conversation

zhijiangW
Copy link
Contributor

What is the purpose of the change

Assuming two remote channels as listeners in LocalBufferPool, the deadlock happens as follows:

  1. While the Canceler thread calling ch1#releaseAllResources, it will occupy the bufferQueue lock and try to call ch2#notifyBufferAvailable.

  2. While task thread exiting to call CachedBufferStorage#close, it might release exclusive buffers for ch2. Then ch2 will occupy the bufferQueue lock and try to call ch1#notifyBufferAvailable.

  3. ch1 and ch2 will both occupy self bufferQueue lock and wait for other side's bufferQueue lock to cause deadlock.

Regarding the solution, we can check the released state outside of bufferQueue lock in RemoteInputChannel#notifyBufferAvailable to return immediately.

Brief change log

  • Check the isReleased state before entering synchronized in RemoteInputChannel#notifyBufferAvailable

Verifying this change

By the failure StreamFaultToleranceTestBase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

…eler thread in RemoteInputChannel

Assuming two remote channels as listeners in LocalBufferPool, the deadlock happens as follows
1. While the Canceler thread calling ch1#releaseAllResources, it will occupy the bufferQueue lock and try to call ch2#notifyBufferAvailable.
2. While task thread exiting to call CachedBufferStorage#close, it might release exclusive buffers for ch2. Then ch2 will occupy the bufferQueue
lock and try to call ch1#notifyBufferAvailable.
3. ch1 and ch2 will both occupy self bufferQueue lock and wait for other side's bufferQueue lock to cause deadlock.

Regarding the solution, we can check the released state outside of bufferQueue lock in RemoteInputChannel#notifyBufferAvailable to return immediately.
@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 356e564 (Thu Jul 16 10:05:14 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@zhijiangW
Copy link
Contributor Author

I considered some options to resolve this issue:

  1. Get ride of canceler thread completely to avoid race condition issue in SingleInputGate and respective channel, by delegating it to mailbox mechanism. It fits for the long-term goal, but involves in many changes which should be done in future in separate ticket.

  2. Introduces somehow BufferListener#isReleased interface method or explicitly remove listeners from LocalBufferPool if the respective channel is released. It might bring some load for interface and add complex for release procedure.

  3. The current PR way to allow notifying available buffer to the released channel, then the channel will check the state out of synchronized firstly to exit immediately.

Regarding the verify, I can not reproduce this issue locally via the reported StreamFaultToleranceTestBase. I can also supplement a unit test to verify it if necessary. I remember that there was a discussion about whether it is necessary to bring unit tests like RemoteInputChannel#testConcurrentRecycleAndRelease to verify the concurrent issue, and the conclusion seems to rely on existing ITCase if possible, so I do not write new unit tests in this case.

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 16, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@rkhachatryan
Copy link
Contributor

@flinkbot run azure

@curcur
Copy link
Contributor

curcur commented Jul 19, 2020

  1. Yes, I like Option 1. It looks very strange that two threads are trying to release the network resources at the same time.

  2. Wondering whether releasing the input channel in the same order in both places would resolve the problem as well, but not sure how difficult in this case.

  3. The code change seems to based on an old interface, so may need some adjustments I guess. The new one is like this:
    public void notifyBufferAvailable(int numAvailableBuffers) throws IOException ...

  4. From purely reading the code, I think the current solution should be able to resolve the deadlock. But I have to admit I do not understand the details of how the exact notifications of available buffers work.

Notice that isReleased is set to true in RemoteChannel#releaseAllResources, which is called in the Canceler thread, but not in the task thread (you can verify it through the stack trace).

For the deadlock,
Case1:
Canceler thread, set ch1 released, grab the bufferQueue lock for ch1, waiting for the bufferQueue lock of ch2 for notification.

Task thread, grab bufferQueue lock for ch2, get notification from ch1 because ch1 has already set released

This won't cause deadlock.

Case2:
Task thread, grab bufferQueue lock for ch2, grab bufferQueue lock for ch1 before ch1 is set released by the Canceler thread.

the Canceler is able to set releaseAllResources, but not able to grab ch1's lock.

This won't cause deadlock as well.

@zhijiangW
Copy link
Contributor Author

@curcur
Re 2. I did not get your point here. Can you explain it a bit?
Re 3. this interface was changed after release-1.11, this PR is based on release-1.10 fix. For release-1.11 and master branch, we should submit a separate PR according.

Notice that isReleased is set to true in RemoteChannel#releaseAllResources, which is called in the Canceler thread, but not in the task thread (you can verify it through the stack trace).

RemoteInputChannel#releaseAllResources was called by canceler thread. After the isReleased state is set true, it has no effect even when task thread calls it again later. The deadlock is actually caused by task thread calling SteamTask#cleanup, which will release exclusive buffers while executing CachedBufferStorage#close. While recycling exclusive buffers, it will take the lock for ch2 and trigger ch1#notifyBufferAvailable to wait for lock of ch1.

So as long as two threads might recycle buffer concurrently, then it would cause this potential deadlock. E.g. while canceler thread is releasing the channel to recycle received buffer, the task thread might also recycle the buffer meanwhile when the buffer is consumed completely.

@curcur
Copy link
Contributor

curcur commented Jul 21, 2020

Hey, @zhijiangW , I do not disagree with the solution for checking isReleased.

For the second point, I mean if the cleanup of channels (sort by channel id for example) in the same order in the two threads, would that resolve the deadlock problem as well.

@zhijiangW
Copy link
Contributor Author

For the second point, I mean if the cleanup of channels (sort by channel id for example) in the same order in the two threads, would that resolve the deadlock problem as well.

@curcur thanks for the explanation and I got your point now. But it is not the case of current deadlock. In current situation, the two threads will not cleanup input channels concurrently. Actually only one thread will release channels, but the other thread will recycle buffers to further touch the internal lock of respective input channel. So we can not handle the sort or control anything for the recycled buffers which was maintained inside CachedBufferStorage in this case.

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@zhijiangW
Copy link
Contributor Author

zhijiangW commented Jul 28, 2020

Thanks for review @rkhachatryan and @curcur , the azure failure seems unrelated, so merging!

@zhijiangW zhijiangW merged commit e268680 into apache:release-1.10 Jul 28, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants