Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers #11877

Merged
merged 10 commits into from
Jul 12, 2021

Conversation

wsry
Copy link
Contributor

@wsry wsry commented Apr 23, 2020

What is the purpose of the change

This is the second ingredient besides FLINK-16404 to solve the deadlock problem without exclusive buffers.

The scenario is as follows:

The data in subpartition with positive backlog can be sent without doubt because the exclusive credits would be feedback finally.
Without exclusive buffers, the receiver would not request floating buffers for 0 backlog. But when the new backlog is added into such subpartition, it has no way to notify the receiver side without positive credits ATM.
So it would result in waiting for each other between receiver and sender sides to cause deadlock. The sender waits for credit to notify backlog and the receiver waits for backlog to request floating credits.
To solve the above problem, the sender needs a separate message to announce backlog sometimes besides existing BufferResponse. Then the receiver can get this info to request floating buffers to feedback.

In the current implementation, the upstream task will announce backlog to the downstream task and the downstream task will allocate buffers (credit) to receive data from upstream. This PR enhances the current implementation in three main aspects:

  1. If there is no initial credit, the upstream producer task will announce the available backlog to the downstream consumer task when available data is notified.
  2. The downstream consumer task will release all allocated buffers (credit) on receiving the aligned checkpoint barrier. Besides, it will never allocate any credit before checkpoint completion.
  3. For empty buffers of upstream task, instead of released directly, they will be sent to the downstream task to release the buffers (credit) allocated for these empty buffers.

Brief change log

  • If there is no initial credit, the upstream producer task will announce the available backlog to the downstream consumer task when available data is notified.
  • The downstream consumer task will release all allocated buffers (credit) on receiving the aligned checkpoint barrier. Besides, it will never allocate any credit before checkpoint completion.
  • For empty buffers of upstream task, instead of released directly, they will be sent to the downstream task to release the buffers (credit) allocated for these empty buffers.

Verifying this change

This change is verified by both existing tests and newly added tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 23, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 4609625 (Thu Sep 23 18:01:49 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 23, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@wsry wsry changed the title [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers May 11, 2020
wsry added a commit to wsry/flink that referenced this pull request May 13, 2020
wsry added a commit to wsry/flink that referenced this pull request May 13, 2020
The filed initialCredit of RemoteInputChannel is set only once and can be accessed by multi threads. This patch makes the filed final and moves the initialization to the constructor of RemoteInputChannel to avoid potential thread safety issues.

This closes apache#11877.
wsry added a commit to wsry/flink that referenced this pull request May 13, 2020
wsry added a commit to wsry/flink that referenced this pull request May 13, 2020
…o BufferManager#unsynchronizedGetExclusiveBuffers

This closes apache#11877.
wsry added a commit to wsry/flink that referenced this pull request May 13, 2020
The filed initialCredit of RemoteInputChannel is set only once and can be accessed by multi threads. This patch makes the filed final and moves the initialization to the constructor of RemoteInputChannel to avoid potential thread safety issues.

This closes apache#11877.
wsry added a commit to wsry/flink that referenced this pull request May 13, 2020
@wsry
Copy link
Contributor Author

wsry commented Jul 9, 2021

@pnowojski I have rebased the latest master branch and appended a fixup commit which fixes the comments and enables 0 exclusive credit for unaligned checkpoint itcase.

if (type == ResultPartitionType.PIPELINED_APPROXIMATE) {
subpartitions[i] =
new PipelinedApproximateSubpartition(
i, networkBuffersPerChannel, pipelinedPartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename networkBuffersPerChannel -> configuredNetworkBuffersPerChannel to better reflect that we are actually overriding this value for the output?

super(index, parent);

checkArgument(buffersPerChannel >= 0, "Buffers per channel must be non-negative.");
this.buffersPerChannel = buffersPerChannel;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this property to something like receiverExclusiveBuffersPerChannel? Because actually this is not the number of buffersPerChannel for the sender.

@pnowojski
Copy link
Contributor

Thanks @wsry for all of the changes and your work with addressing the feedback. I think this looks almost good to me % a couple of renames and we need to update the config option descriptions to describe the actual state that there never can be 0 buffers per channel on the sender side. Currently the variable naming in the code and docs/description are misleading in this regard.

@wsry
Copy link
Contributor Author

wsry commented Jul 9, 2021

Thanks @wsry for all of the changes and your work with addressing the feedback. I think this looks almost good to me % a couple of renames and we need to update the config option descriptions to describe the actual state that there never can be 0 buffers per channel on the sender side. Currently the variable naming in the code and docs/description are misleading in this regard.

@pnowojski Thanks very much for the review and feedback. I will update the PR soon accordingly.

wsry added a commit to wsry/flink that referenced this pull request Jul 10, 2021
wsry added a commit to wsry/flink that referenced this pull request Jul 10, 2021
wsry added a commit to wsry/flink that referenced this pull request Jul 10, 2021
wsry added a commit to wsry/flink that referenced this pull request Jul 10, 2021
@wsry
Copy link
Contributor Author

wsry commented Jul 12, 2021

Hi @pnowojski , I have updated the PR with a fixup commit.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for bothering you, but I hope that's the last one nitty comment 😳

@@ -30,7 +30,7 @@
<td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
<td style="word-wrap: break-word;">2</td>
<td>Integer</td>
<td>Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/inputchannel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.</td>
<td>Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/input channel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization. The minimum valid value can be configured is 0 and when 0 is configured, the exclusive network buffers used per downstream incoming channel will be 0, but for each upstream outgoing channel, max(1, configured value) will be used which means that at least one buffer is needed and this behavior is for performance. If it's not guaranteed that each outgoing channel can get at least one buffer, more partial buffers with little data will be outputted to network/disk and recycled to be used by other channels which can not get a buffer for data caching.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a minor reword to:

The minimum valid value that can be configured is 0. When 0 buffers-per-channel is configured, the exclusive network buffers used per downstream incoming channel will be 0, but for each upstream outgoing channel, max(1, configured value) will be used. In other words we ensure that, for performance reasons, there is at least one buffer per outgoing channel regardless of the configuration.

wsry added 10 commits July 12, 2021 18:30
…ogAnnouncement which can bring the upstream buffer backlog to the downstream
…fer for BoundedBlockingSubpartitionDirectTransferReader

Currently, the BoundedBlockingSubpartitionDirectTransferReader does not distinguish data buffer and event buffer but it does not allocate floating credits for events, which means it relies on at least one exclusive credit to send the events. This patch changes the logic and distinguishes data buffer and event buffer for BoundedBlockingSubpartitionDirectTransferReader, after which the BoundedBlockingSubpartitionDirectTransferReader does not rely on the exclusive credits any more and we can set the exclusive credit to 0 after we finish FLINK-16641.
…log to the downstream tasks

This batch introduce the ability of announcing upstream backlog to the downstream tasks through the BacklogAnnouncement message when the exclusive credit is 0. This gives the upstream tasks the ability to actively allocate credits from the downstream tasks, which is needed by FLINK-16641.
…s of RemoteInputChannel on receiving any channel blocking event if the exclusive credit is 0

This patch tries to release all allocated floating buffers of RemoteInputChannel on receiving any channel blocking event if the exclusive credit is 0 because a blocked channel does not need any credit and after that, these released credits can be used by other active channels. This can avoid the deadlock where credits are assigned to channels which do need them and those channels who need credits can not get any when the exclusive credit is 0.
…tasks to release the allocated credits if the exclusive credit is 0

Currently,the empty buffers are not sent to the downstream tasks. This patch changes the logic and sends empty buffers to the downstream tasks when the exclusive credit is 0 release the allocated floating credits. If we do not do that, the downstream task may allocate more credits than needed which may lead to dead lock without exclusive credits.
…nnel to 0

This PR enables to set the number of network buffer per channel (taskmanager.network.memory.buffers-per-channel) to 0. Previously, the value can not be set to 0 because of dead lock, FLINK-16641 solves the problem and we can set it to 0 now.
@wsry
Copy link
Contributor Author

wsry commented Jul 12, 2021

Hi @pnowojski , I have updated the PR.

@pnowojski
Copy link
Contributor

Thanks @wsry , merging.

@pnowojski pnowojski merged commit 60d015c into apache:master Jul 12, 2021
pnowojski pushed a commit that referenced this pull request Jul 12, 2021
pnowojski pushed a commit that referenced this pull request Jul 12, 2021
pnowojski pushed a commit that referenced this pull request Jul 12, 2021
pnowojski pushed a commit that referenced this pull request Jul 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants