Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9676][network] clarify contracts of BufferListener#notifyBufferAvailable() and fix a deadlock #6254

Closed
wants to merge 1 commit into from

Conversation

NicoK
Copy link
Contributor

@NicoK NicoK commented Jul 4, 2018

What is the purpose of the change

When recycling exclusive buffers of a RemoteInputChannel and recycling (other/floating) buffers to the buffer pool concurrently while the RemoteInputChannel is registered as a listener to the buffer pool and adding the exclusive buffer triggers a floating buffer to be recycled back to the same
buffer pool, a deadlock would occur holding locks on LocalBufferPool#availableMemorySegments and RemoteInputChannel#bufferQueue but acquiring them in reverse order.

One such instance would be (thanks @zhijiangW for finding this):

Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle floating buffers
 -> lock(LocalBufferPool#availableMemorySegments) -> RemoteInputChannel2#notifyBufferAvailable
 -> try to lock(RemoteInputChannel2#bufferQueue)
Task thread -> RemoteInputChannel2#recycle
 -> lock(RemoteInputChannel2#bufferQueue) -> bufferQueue#addExclusiveBuffer -> floatingBuffer#recycleBuffer
 -> try to lock(LocalBufferPool#availableMemorySegments)

@pnowojski and @tillrohrmann can you also have a quick look so that this can get into 1.5.1?

Brief change log

  • clarify the contract of BufferListener#notifyBufferAvailable() (see in the code)
  • make sure that none of the places in RemoteInputChannel break this contract, i.e. wherever a lock on bufferQueue is taken, we may not recycle any buffer under this lock

Verifying this change

This change added tests and can be verified as follows:

  • added RemoteInputChannelTest#testConcurrentRecycleAndRelease2 which catches this deadlock quite quickly

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no (per buffer, but we're only moving recycling out of the synchronized block so if there's any effect, it should be positive)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? JavaDocs

@pnowojski
Copy link
Contributor

Please document in java doc AND in the commit message the contract of lock hierarchy that you are implementing here.

…rAvailable() and fix a deadlock

When recycling exclusive buffers of a RemoteInputChannel and recycling
(other/floating) buffers to the buffer pool concurrently while the
RemoteInputChannel is registered as a listener to the buffer pool and adding the
exclusive buffer triggers a floating buffer to be recycled back to the same
buffer pool, a deadlock would occur holding locks on
LocalBufferPool#availableMemorySegments and RemoteInputChannel#bufferQueue but
acquiring them in reverse order.

One such instance would be:

Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers
 -> lock(LocalBufferPool#availableMemorySegments) -> IC2#notifyBufferAvailable
 -> try to lock(IC2#bufferQueue)

Task thread -> IC2#recycle
 -> lock(IC2#bufferQueue) -> bufferQueue#addExclusiveBuffer -> floatingBuffer#recycleBuffer
 -> try to lock(LocalBufferPool#availableMemorySegments)
@NicoK
Copy link
Contributor Author

NicoK commented Jul 4, 2018

I updated the commit with a fixed unit test and added a big TODO on what else would need to be adapted in order to go with this solution (no updated commit message yet).

Since this is even more intrusive and ugly than the changes before, I'll close this PR in favour of another solution developed in #6257.

@NicoK NicoK closed this Jul 4, 2018
*/
int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
Tuple2<Integer, Optional<Buffer>> addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
exclusiveBuffers.add(buffer);
if (getAvailableBufferSize() > numRequiredBuffers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For more strict, getAvailableBufferSize() > numRequiredBuffers should be getAvailableBufferSize() == numRequiredBuffers + 1?
Otherwise, it may be understood the number of available buffers may exceed more than required buffers, but we only return one floating buffer each time. Actually it can only more than one in our design. Maybe it is out of this jira, and you can ignore it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, we could change that but the BufferQueue logic in addExclusiveBuffer and addFloatingBuffer does not enforce this contract (only outside code does). If outside code decides to break this contract (I can't think of a reason why) than the current one-for-one logic with getAvailableBufferSize() > numRequiredBuffers ensures that we do not further deviate from the target.
This is purely a theoretical concern and definitely out of the scope of this ticket - I would leave it unchanged for now since we don't gain anything with that change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is not critical. Just thought of it to exchange with you! :)

@@ -479,6 +508,9 @@ void onSenderBacklog(int backlog) throws IOException {

numRequiredBuffers = backlog + initialCredit;
while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) {
// TODO: this will take a lock in the LocalBufferPool as well and needs to be done
// outside the synchronized block (which is a bit difficult trying to acquire the
// lock only once!
Buffer buffer = inputGate.getBufferPool().requestBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my previous implementation, I added a new method in BufferProvider interface to request batch of buffers at a time. To do so, it only needs lock only once in LocalBufferPool and may be more efficient from lock side. The only concern is that the floating buffer distribution may be not fair in all the channels. So we want to implement two strategies, one fair strategy is requesting one buffer at a time, another greedy strategy is requesting all required buffers at a time, then comparing these strategies to check performance. Maybe it behaves different in different scenarios. I am planing to submit this JIRA soon. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this change since this may lead to lower-throughput channels being starved by high-throughput channels, but we can discuss further with a broader audience in the JIRA ticket once created (this is off-topic here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with your suggestion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants