Skip to content

Conversation

@wsry
Copy link
Contributor

@wsry wsry commented Feb 13, 2020

What is the purpose of the change

To speed up checkpoint in the case of back pressure, this commit tries to reduce the amount of data in flight by reducing the default number of buffers per channel from 2 to 1. Together with the default 8 floating buffers, one buffer per channel should be enough for most cases without performance regression. And one can increase it if there are any performance issues.

Brief change log

  • Reduce the default number of buffers per channel from 2 to 1 and update document.

Verifying this change

  • This change is already covered by existing tests.
  • Performance is verified by benchmarks.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

…el from 2 to 1

To speed up checkpoint in the case of back pressure, this commit tries to reduce the amount of data in flight by reducing the default number of buffers per channel from 2 to 1. Together with the default 8 floating buffers, one buffer per channel should be enough for most cases without performance regression. And one can increase it if there are any performance issues.
@flinkbot
Copy link
Collaborator

flinkbot commented Feb 13, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 424b744 (Thu Sep 23 18:02:23 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 13, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@wsry
Copy link
Contributor Author

wsry commented Feb 18, 2020

Theoretically, reducing the number of buffers may break the data processing pipeline which can influence the performance. For verification, I hava tested the change using the flink micro benchmark and a simple benchmark job. Unfortunately, regressions are seen for both tests.

For micro benchmark, the following are some results with regression (Because of the unstable result, I run each test three times.):

Using 2 buffer:

Benchmark                             (channelsFlushTimeout)  (writers)   Mode  Cnt      Score      Error   Units

networkThroughput                            1000,100ms          1       thrpt   30  15972.952 ±  752.985  ops/ms
networkThroughput                            1000,100ms          4       thrpt   30  27650.498 ±  713.728  ops/ms

networkThroughput                            1000,100ms          1       thrpt   30  15566.705 ± 2007.335  ops/ms
networkThroughput                            1000,100ms          4       thrpt   30  27769.195 ± 1632.614  ops/ms

networkThroughput                            1000,100ms          1       thrpt   30  15598.175 ± 1671.515  ops/ms
networkThroughput                            1000,100ms          4       thrpt   30  27499.901 ± 1035.415  ops/ms

Using 1 buffer:

Benchmark                             (channelsFlushTimeout)  (writers)   Mode  Cnt      Score      Error   Units

networkThroughput                            1000,100ms          1       thrpt   30  13116.610 ±  325.587  ops/ms
networkThroughput                            1000,100ms          4       thrpt   30  22837.502 ± 1024.360  ops/ms

networkThroughput                            1000,100ms          1       thrpt   30  11924.883 ± 1038.508  ops/ms
networkThroughput                            1000,100ms          4       thrpt   30  22823.586 ±  892.918  ops/ms

networkThroughput                            1000,100ms          1       thrpt   30  12960.345 ± 1596.465  ops/ms
networkThroughput                            1000,100ms          4       thrpt   30  23028.803 ±  933.609  ops/ms

From the above results, we can see about 20% performance regression. For the benchmark job, there are also regressions (about 10% - 20%) in some cases where input channel numbers are small, for example 2 input channels, which means the number of buffer can be used is limited.

@pnowojski
Copy link
Contributor

@wsry thanks for the results. Have you tried increasing floating buffers to try to compensate fewer exclusive buffers? Also would be nice to check the micro benchmark results on our benchmarking machine, but I think @zhijiangW is already doing that.

@zhijiangW
Copy link
Contributor

Yes, I executed the micro-benchmark on our benchmarking machine yesterday and almost all the tests for network throughput has a bit regression except the data skew case.

I confirmed with @wsry to also verify the results for only reducing the receiver side, keeping the buffer same as before on sender side. And the results still have a bit regression, but better than reducing both receiver and sender sides.

Next step we can further test how many buffers are really needed for both output and input sides in different scales, without regression. Then we can decide how to tune this setting, based on job scale or dynamic option.

@wsry
Copy link
Contributor Author

wsry commented Feb 18, 2020

I did not increase the number of floating buffers. I guess we can make up the regression if we increase the number of floating buffers (maybe the only problem is that the floating buffer can't be always fulfilled). I'd like to do some test to see how many floating buffer we need to compensate the regression for the micro benchmark.

@zhijiangW
Copy link
Contributor

zhijiangW commented Feb 18, 2020

We ever had the jira ticket FLINK-9142 about reducing the exclusive buffer from 2 to 1, and then put them into floating buffers instead. That means the total number of buffers on receiver side is not changed.

We can also verify this way before determining how many floating buffers are required for keeping the performance. If the performance regression still exists based on the same buffer amount, then we should consider the cost difference between exclusive and floating buffer roles.

@pnowojski
Copy link
Contributor

Keep in mind that the purpose of this is to reduce the amount of in-flight data. So just moving the exclusive buffers to floating pool, is not helping us with this. I was hoping that in setups with 1000 channels, if we decrease the exclusive buffers from 2000 to 1000, while increasing floating from 7 to for example 30, might give us good balance.

maybe the only problem is that the floating buffer can't be always fulfilled

That's also true for exclusive buffers. If there is not enough memory/buffers on the TM, network stack will allocate only one exclusive buffer per channel. That also means, for some users (I would guess for quite a lot of them) decreasing the exclusive buffers from 2 to 1, will not reduce the amount of in-flight records.

@zhijiangW
Copy link
Contributor

So just moving the exclusive buffers to floating pool, is not helping us with this.

Yes, that is not the formal solution for our final purpose. My suggestion was only for analyzing the performance regression step by step for better tracing. I wonder that the performance might be sensitive with two factors. One factor is the total amount buffers on receiver side, and another factor is the ratios of exclusive and floating buffers distribution.

So if we keep the total buffer amount same, only change the buffer role. If there is no regression for benchmark at all, then we are relieved to only focus on the factor of total floating buffers amount. If not, we should also keep an eye on whether there are extra cost for requesting floating buffers.

I hope the second factor is unnecessary worried.

@wsry
Copy link
Contributor Author

wsry commented Feb 19, 2020

I have tested reducing the exclusive buffer from 2 to 1, and then put them into floating buffers instead. It has no evident influence on the performance.
I also tried to tune the floating buffer per gate to see how many floating buffers can compensate the regression, unfortunately, the results show that we need about 1000 floating buffers which means we can't reduce the buffer of upstream if we expect for no performance regression.
Can we implementation dynamic policy directly without first reducing the buffer per channel statically? Any suggestions? @pnowojski @zhijiangW

@wsry
Copy link
Contributor Author

wsry commented Feb 27, 2020

I did some further performance tests using the micro benchmark with changes in #11155 and the results are as follows. From the results we can see that if we reduce the buffer per channel of both upstream and downstream, then we need almost equivalent number of floating buffers to compensate the reduced ones to mitigate the performance regression. If we reduce the exclusive buffer of downstream only and keeps upstream unchanged, there is no visible regression and the performance of the fast flush cases (1ms flushTimeout) becomes even better, IMO, the reason is that less credit prevent the upstream from sending too many small buffers to downstream which can increase the TPS.
It seems that the performance regression mainly comes from upstream. Why upstream is that sensitive to the reduction of buffers? IMO, it is because that our micro benchmark case is a little special, more specifically, it can request a large amount of buffers in a really short time when the first round of buffers is full. Because we use a round robin partitioner and send long record, so all the subpartitions will be filled up almost at the same time and will request next buffer at the same time, that is, requesting a large number of buffers in a really short time which in result leads to the sensibility.

What do you think? Should we reduce the number of buffer statically? @pnowojski @zhijiangW

settings: 2 buffers per outgoing channel/2 buffer per incoming buffer/8 floating buffer
round 1#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  18206.419 ±  533.180  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  25378.527 ±  747.900  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  27937.504 ±  553.818  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   7600.337 ± 1264.666  ops/ms

round 2#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  17277.088 ±  925.827  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  24673.112 ± 3202.755  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  27124.349 ± 1001.959  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   6991.067 ±  497.079  ops/ms

round 3#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score     Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  16575.334 ± 856.037  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  25445.706 ± 751.636  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  27552.088 ± 761.005  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   7328.127 ± 336.441  ops/ms

settings: 1 buffers per outgoing channel/1 buffer per incoming buffer/8 floating buffer
round 1#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  18317.961 ±  937.388  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  19269.037 ± 2387.532  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  22981.454 ± 2860.808  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   7236.713 ±  302.389  ops/ms

round 2#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  19105.135 ±  575.752  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  19845.468 ± 2697.928  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  22150.188 ±  953.592  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   6960.408 ±  901.595  ops/ms

round 3#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  17473.902 ± 1233.335  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  19616.336 ±  743.647  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  21883.304 ± 2610.933  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   6560.487 ± 1349.803  ops/ms

settings: 1 buffers per outgoing channel/1 buffer per incoming buffer/508 floating buffer
round 1#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  17930.067 ±  990.492  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  24189.003 ±  532.873  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  26015.059 ±  827.189  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   6614.732 ± 1127.513  ops/ms

round 2#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  16874.222 ± 1501.294  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  23835.932 ±  676.615  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  25974.407 ±  739.122  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   6942.174 ±  860.100  ops/ms

round 3#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  17459.868 ± 2264.240  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  22951.319 ±  430.990  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  25847.325 ± 1076.960  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   7060.140 ±  770.352  ops/ms

1 buffers per outgoing channel/1 buffer per incoming buffer/1008 floating buffer
round 1#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  17528.332 ± 2490.253  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  24555.332 ±  696.186  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  27949.803 ±  523.245  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   7055.183 ±  833.954  ops/ms

round 2#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  17191.842 ± 2327.408  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  25676.208 ± 2993.290  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  27522.535 ±  695.564  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   7096.016 ±  434.702  ops/ms

round 3#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  15859.857 ± 2824.002  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  25809.716 ± 3054.423  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  28080.639 ±  653.612  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   6841.500 ± 1163.974  ops/ms

If we only reduce the exclusive buffer of downstream and keep the upstream unchanged:
2 buffers per outgoing channel/1 buffer per incoming buffer/8 floating buffer
round 1#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  18823.216 ± 1014.368  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  24954.098 ±  924.354  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  26578.547 ±  570.707  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   6903.210 ±  300.143  ops/ms

round 2#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  16434.840 ± 1156.454  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  26084.775 ±  938.799  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  27116.358 ±  983.822  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   6627.874 ± 1159.660  ops/ms

round 3#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  18914.084 ± 1073.938  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  24259.843 ± 2971.212  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  27398.183 ±  826.757  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   6510.502 ± 1115.598  ops/ms

round 4#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  17077.960 ± 1069.516  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  24807.979 ± 3180.461  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  27735.840 ±  717.570  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   7248.710 ±  334.596  ops/ms

round 5#

Benchmark                                                                 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                     N/A  thrpt   30  17437.883 ±  706.215  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                              1000,1ms  thrpt   30  26347.947 ±  866.912  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms  thrpt   30  28161.395 ±  819.070  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                        1000,100ms,SSL  thrpt   30   6785.905 ± 1137.939  ops/ms

@zhijiangW
Copy link
Contributor

Thanks for the updates @wsry !

If we only adjust the exclusive buffer from 2 to 1 on receiver side, and still keep 2 on sender side, then the micro-benchmark has no regressions. We could further verify it in cluster jobs because the network transport delay might behave different with micro-benchmark.

If the conclusion is the same, then I think it is reasonable to make this change for receiver side. We can save 25% in-flight buffers almost. But we might need to introduce another buffer setting parameter which would break the compatibility.

Regarding the upstream side, I guess it might not be feasible to give a static buffer setting when setup. And it is more likely with the data distribution among subpartitions. The hash partitioner might have the similar behavior with rebalance partitioner in practice. And we can further consider the dynamic way on sender side in another ticket.

@pnowojski
Copy link
Contributor

Yes, thank you for the results @wsry. Indeed it looks like idea of decreasing the exclusive buffers on the receiver side by default to 1 is worth pursuing.

Actually I'm a bit surprised that reducing exclusive buffers to 1 on the sender side is causing problems. The number of 2 exclusive buffers + ~8 floating was estimated to provide good bandwidth for single channel performance. Assuming 1ms round trip for requesting/receiving a network credit 10 buffers alone should support ~320MB/s network traffic. (32KB*10/1ms = 320MB/s). What is the network traffic of our network benchmarks?

28161 ops/ms * 32 bytes/ops = ~900MB/s. From this, I would expect 30 floating buffers to suffice on your machine, unless the round trip of credit assignment on local host is more than 1ms?

Regardless of that, as I discussed with @zhijiangW offline. For many scenarios (probably most) we can accept a performance regression in favour of smaller memory requirements and faster checkpointing under back pressure - at least as an alternative configuration of Flink. In other words, we could document two recommended configuration setups:

  1. (current one) hight throughput mode
  2. (limited exclusive buffers, as we are discussing, maybe down to 0) fast checkpointing/low in-flight data mode.

@wsry
Copy link
Contributor Author

wsry commented Mar 1, 2020

@pnowojski IMO, the reason why reducing buffers of sender side is causing problem is that our benchmark can request a large amount of buffers in a really short time. Our benchmark uses roundrobin partitioner and the size of record is fixed, so as a result, almost all subpartitions will fill up the current buffer and need to request next one. If there are no enough buffers at the time, the sender thread need to wait the all previously sent buffers to be recycled which need some time.

I agree that we need to document the configuration setups you mentioned and except for reducing the number of buffers, users can also try to reduce the size of network buffer to reduce in-flight data in cases of back pressure (or maybe we can reduce the size of buffer dynamically).

@pnowojski
Copy link
Contributor

I guess you might be right @wsry. If the distribution was random, the buffers would be filling up gradually and fewer floating buffers should suffice for the smooth progress. We could also explore this option @wsry - replacing the RoundRobinSelector with some trivial pseudo random like this?

@wsry
Copy link
Contributor Author

wsry commented Mar 3, 2020

@pnowojski
I will try to replace the RoundRobinSelector with some random partitioner and will update if there are any new results.

@wsry
Copy link
Contributor Author

wsry commented Mar 20, 2020

Sorry for the late update. I implement an imbalance partitioner which emits n to channel n and the following are the results (I ran the test multi-times and the results can be reproduced). After reducing the buffer per channel from 2 to 1 (including both upstream and downstream), if 8 floating buffers are used, there is still regression (about 10%), after I increase the floating buffers to 128 (64 is not enough and other values between 64 and 128 are not tested), the performance seems catch up.

@pnowojski @zhijiangW What do you think about the results, should be reduce both the upstream and downstream buffer per channel to 1 or we only reduce the downstream buffer (we may need to add a new config option)?

2 buffers per channel and 8 floating buffers per gate

Benchmark                                                                                        (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
CustomPartitionerStreamNetworkThroughputBenchmarkExecutor.imbalancePartitionerNetworkThroughput                     N/A  thrpt   30  34689.075 ± 4374.022  ops/ms
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                                            N/A  thrpt   30  18461.286 ± 1127.107  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                                     1000,1ms  thrpt   30  23221.891 ± 4628.278  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                                   1000,100ms  thrpt   30  29338.881 ± 1233.042  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                               1000,100ms,SSL  thrpt   30   7203.058 ±  822.253  ops/ms

1 buffers per channel and 8 floating buffers per gate

Benchmark                                                                                        (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
CustomPartitionerStreamNetworkThroughputBenchmarkExecutor.imbalancePartitionerNetworkThroughput                     N/A  thrpt   30  29087.328 ± 4030.118  ops/ms
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                                            N/A  thrpt   30  18973.591 ± 1241.189  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                                     1000,1ms  thrpt   30  22216.006 ± 1017.993  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                                   1000,100ms  thrpt   30  23267.835 ±  976.012  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                               1000,100ms,SSL  thrpt   30   7403.758 ±  820.510  ops/ms

1 buffers per channel and 128 floating buffers per gate

Benchmark                                                                                        (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
CustomPartitionerStreamNetworkThroughputBenchmarkExecutor.imbalancePartitionerNetworkThroughput                     N/A  thrpt   30  33130.029 ±  676.771  ops/ms
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                                            N/A  thrpt   30  18229.952 ± 2121.084  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                                     1000,1ms  thrpt   30  21549.288 ± 2643.449  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                                   1000,100ms  thrpt   30  25414.551 ± 1287.725  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                               1000,100ms,SSL  thrpt   30   7341.885 ±  872.192  ops/ms

@pnowojski
Copy link
Contributor

Thanks for the update @wsry, good that we at least confirmed what is the issue here. And good job on figuring this out.

Let's maybe try to gather the data for the 0 exclusive buffers first. Since 1 exclusive buffer can cause some performance regressions in some rare cases, we might prefer in the end to just advertise two main different setups: full throughput with 2 exclusive buffers and low throughput with 1/0 exclusive buffers. Anything in between would be for power users to self tune?

@wsry
Copy link
Contributor Author

wsry commented Apr 1, 2020

@pnowojski I will update the results with 0 exclusive buffer (only downstream) latter after the dead lock problem is solved by FLINK-16641. For upstream, at least 1 buffer is needed for each subpartition currently, so 0 exclusive buffer is not enough. (Maybe an easy way to fix it is to further split buffer into smaller ones if the number of available buffers is not enough)

@pnowojski
Copy link
Contributor

try to gather the data for the 0 exclusive buffers first

I meant 0 exclusive buffers for the receiver. Yes, sender needs at least 1 buffer :)

(Maybe an easy way to fix it is to further split buffer into smaller ones if the number of available buffers is not enough)

Let's not go this way just now. Easy hot fix for a user is to configure smaller buffers.

@zhijiangW
Copy link
Contributor

we might prefer in the end to just advertise two main different setups: full throughput with 2 exclusive buffers and low throughput with 1/0 exclusive buffers. Anything in between would be for power users to self tune?

Agree, i think it is the right way to go. After we solve the deadlock issue for 0 exclusive buffers on receiver side, we can accept any settings by users and only give some explanations how different setting would impact the performance.

Adjust the default floating buffers might not make sense ATM, because we do not fully know yet how many floating buffers are really needed for different scale and partitioner mode to not impact performance.

I guess finally we still need a separate exclusive buffer configuration for receiver side different with sender side. Then users can tune exclusive buffer as 0 for receiver side separately, but keep at-least 1 exclusive buffer for sender side. Even the default exclusive buffer for receiver can be adjusted as 1 instead of 2, as we already verified it would not impact performance.

@Jiayi-Liao
Copy link
Contributor

Really impressive discussion! I've learnt a lot :).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants