Skip to content

Conversation

@wsry
Copy link
Contributor

@wsry wsry commented Feb 20, 2020

What is the purpose of the change

In network benchmark (for example 1000 channels benchmark with 4 record writers) StreamNetworkBenchmarkEnvironment#createInputGate creates 1000 input gates with 4 input channels each, which doesn't make much sense. It is expected that either having 4 receivers with single input gate with 1000 channels each, or a single receiver with 4 input gates, with 1000 channels each.

Brief change log

  • The receiving InputGate setup logic of StreamNetworkBenchmarkEnvironment is changed

Verifying this change

This change is already covered by existing tests, such as StreamNetworkThroughputBenchmarkTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@wsry wsry changed the title [FLINK-14818] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. [FLINK-14818][benchmark] Fix receiving InputGate setup of StreamNetworkBenchmarkEnvironment. Feb 20, 2020
@wsry wsry requested a review from zhijiangW February 20, 2020 10:23
@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit fee93ce (Thu Feb 20 10:25:48 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 20, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@wsry wsry force-pushed the FLINK-14818 branch 3 times, most recently from cfc691c to 21b36d4 Compare February 20, 2020 16:08
Copy link
Contributor

@zhijiangW zhijiangW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this adjustment @wsry .

I think it is reasonable to fix the invalid number of input gates in benchmark, especially for the buffer amount sensitive scenarios.

It is generally good to me, only left some inline comments. Besides that, I am curious of how this change would affect the existing benchmark results. Could you provide the comparison results?

@zhijiangW
Copy link
Contributor

@pnowojski I guess you might also be interested in taking a look. :)

@wsry wsry force-pushed the FLINK-14818 branch 4 times, most recently from bcba3ea to 8fe5e3d Compare February 21, 2020 07:09
@wsry
Copy link
Contributor Author

wsry commented Feb 21, 2020

I have updated the PR. @zhijiangW
I will attach the new benchmark results soon.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good change :) I'm happy that you managed to solve this issue and the change is not that complex.

+1 for the @zhijiangW's doubts about the benchmark code being fragile. If you can figure it out how to make it more stable, that would be great, but if you decide it's too hard/not worth, I'm fine with it.

I would also like to see the results changes (preferably from the benchmarking machine as well)

@zhijiangW
Copy link
Contributor

Thanks for the updates @wsry and it generally looks good to me now, just left two tiny comments.
As for the previous main concern, I left some suggestions here.

After you providing the micro-benchmark results, I would also execute it in our benchmark machine to convince us.

@wsry
Copy link
Contributor Author

wsry commented Feb 24, 2020

I ran the micro benchmark and found that some of the performance results become better and some go worse. The results are as follows:
Before the change:

Benchmark                                                                     (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                         N/A  thrpt   30  17079.534 ±  830.532  ops/ms
StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput                     N/A  thrpt   30    599.664 ±   13.325  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                 100,100ms  thrpt   30  45629.898 ± 1623.455  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                             100,100ms,SSL  thrpt   30   9817.421 ±  216.075  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                  1000,1ms  thrpt   30  25442.152 ±  968.340  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                1000,100ms  thrpt   30  27944.285 ±  518.106  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms,SSL  thrpt   30   7820.549 ±  895.862  ops/ms
StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1                                         N/A   avgt   30     13.184 ±    0.093   ms/op

And after the change:

Benchmark                                                                     (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                         N/A  thrpt   30  22758.888 ±  613.743  ops/ms
StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput                     N/A  thrpt   30    634.810 ±   14.375  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                 100,100ms  thrpt   30  32589.803 ± 3150.003  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                             100,100ms,SSL  thrpt   30   9966.284 ±  296.523  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                  1000,1ms  thrpt   30  26633.166 ± 1123.461  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                1000,100ms  thrpt   30  29845.823 ±  564.253  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms,SSL  thrpt   30   8740.787 ±  184.125  ops/ms
StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1                                         N/A   avgt   30     13.047 ±    0.155   ms/op

@wsry wsry force-pushed the FLINK-14818 branch 2 times, most recently from 07c2811 to 8f263fe Compare February 24, 2020 09:55
@wsry
Copy link
Contributor Author

wsry commented Feb 24, 2020

Thanks for the review and comments. @zhijiangW @pnowojski
I have updated the PR according to the comments and the benchmark results are also attached.

@wsry
Copy link
Contributor Author

wsry commented Feb 25, 2020

I find that the number of tcp connections is one of the factors which influence the performance. If I change the connectionIndex from gateIndex to channelIndex which means it remains unchanged compared with the behavior before this change (setup 1000 tcp connections per gate if the number of channels is 1000), the performance results are as follows. From the results, we can see that the performance goes down a bit compared with the results before this change, which should be caused by the decrease of floating buffers. Then what do you think? Should we keep the number of connections unchanged in this PR? @zhijiangW @pnowojski

Benchmark                                                                     (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                         N/A  thrpt   30  17345.574 ±  370.647  ops/ms
StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput                     N/A  thrpt   30    608.881 ±   12.054  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                 100,100ms  thrpt   30  41732.518 ± 1109.436  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                             100,100ms,SSL  thrpt   30   9689.525 ±  202.895  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                  1000,1ms  thrpt   30  24106.705 ± 2952.364  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                1000,100ms  thrpt   30  27509.665 ± 3246.965  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms,SSL  thrpt   30   7691.287 ±  927.775  ops/ms
StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1                                         N/A   avgt   30     12.758 ±    0.147   ms/op

@zhijiangW
Copy link
Contributor

Thanks for the updates @wsry .

As for the issue of connection index, I prefer to keeping it as previous value, then we can only measure the performance effect caused by gate/channel amount.

If we want to adjust the connection amount via connectionIndex, it should be a separate work to do in future if confirmed necessary.

If the performance regression still exists after getting ride of connection factor, caused by total buffers reduction as you analyzed above, I am a bit torn whether we should increase the default floating buffer setting for these benchmarks in order to keep the performance same as before. Otherwise we might have a bad/invalid curve to trace the performance changes in history term. WDYT? @pnowojski

@zhijiangW
Copy link
Contributor

zhijiangW commented Feb 25, 2020

Also attach the comparison results executed in benchmark machine:

Baseline    
DataSkewStreamNetworkThroughputBenchmarkExecutor 51977.13475  
StreamNetworkBroadcastThroughputBenchmarkExecutor 1164.463375  
StreamNetworkThroughputBenchmarkExecutor 69553.22069 100,100ms
StreamNetworkThroughputBenchmarkExecutor 17430.11747 100,100ms,SSL
StreamNetworkThroughputBenchmarkExecutor 51019.51607 1000,1ms
StreamNetworkThroughputBenchmarkExecutor 55423.23278 1000,100ms
StreamNetworkThroughputBenchmarkExecutor 14924.65899 1000,100ms,SSL
less gates with less connections    
DataSkewStreamNetworkThroughputBenchmarkExecutor 33215.6767  
StreamNetworkBroadcastThroughputBenchmarkExecutor 1189.239694  
StreamNetworkThroughputBenchmarkExecutor 72450.69576 100,100ms
StreamNetworkThroughputBenchmarkExecutor 17018.46785 100,100ms,SSL
StreamNetworkThroughputBenchmarkExecutor 45829.91493 1000,1ms
StreamNetworkThroughputBenchmarkExecutor 52943.20546 1000,100ms
StreamNetworkThroughputBenchmarkExecutor 13762.96631 1000,100ms,SSL
less gates with same connections    
DataSkewStreamNetworkThroughputBenchmarkExecutor 32342.12887  
StreamNetworkBroadcastThroughputBenchmarkExecutor 1136.44912  
StreamNetworkThroughputBenchmarkExecutor 75151.951622 100,100ms
StreamNetworkThroughputBenchmarkExecutor 16878.544466 100,100ms,SSL
StreamNetworkThroughputBenchmarkExecutor 47041.479598 1000,1ms
StreamNetworkThroughputBenchmarkExecutor 52384.517795 1000,100ms
StreamNetworkThroughputBenchmarkExecutor 13824.809088 1000,100ms,SSL

Copy link
Contributor

@zhijiangW zhijiangW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @wsry and LGTM now!
I would double execute it in our benchmark machine before merging.

@zhijiangW
Copy link
Contributor

@wsry I also updated the benchmark results with the same connection amount in link. The conclusion is that all the cases for network throughput have regression except for 100, 100ms. I am not sure whether we can explain this corner case properly. But I guess it would not block merging since it should be the right way to go.

…nchmarkEnvironment

Before this change, in network benchmark (for example 1000 channels benchmark with 4 record writers) StreamNetworkBenchmarkEnvironment#createInputGate was creating 1000 input gates with 4 input channels each, which doesn't make much sense. This commit is changing that to a single receiver with 4 input gates and each with 1000 channels.

It is achieved by providing testing implementations of InputChannels, which are using channel index for requesting subpartitions from ResultPartition, instead of subpartition index. Thanks to that, we can map a single ResultPartition with N subpartitions, to a single instance of InputGate with N channels.

The change also influences the benchmark results, overall, the performance goes down a bit because of the decrease of floating buffers and the followings are benchmark results before and after this change:

------------------------------------------------------------------Before----------------------------------------------------------------------
Benchmark                                                                     (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                         N/A  thrpt   30  17079.534 ±  830.532  ops/ms
StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput                     N/A  thrpt   30    599.664 ±   13.325  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                 100,100ms  thrpt   30  45629.898 ± 1623.455  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                             100,100ms,SSL  thrpt   30   9817.421 ±  216.075  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                  1000,1ms  thrpt   30  25442.152 ±  968.340  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                1000,100ms  thrpt   30  27944.285 ±  518.106  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms,SSL  thrpt   30   7820.549 ±  895.862  ops/ms
StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1                                         N/A   avgt   30     13.184 ±    0.093   ms/op

------------------------------------------------------------------After-----------------------------------------------------------------------
Benchmark                                                                     (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput                         N/A  thrpt   30  17345.574 ±  370.647  ops/ms
StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput                     N/A  thrpt   30    608.881 ±   12.054  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                 100,100ms  thrpt   30  41732.518 ± 1109.436  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                             100,100ms,SSL  thrpt   30   9689.525 ±  202.895  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                  1000,1ms  thrpt   30  24106.705 ± 2952.364  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                                1000,100ms  thrpt   30  27509.665 ± 3246.965  ops/ms
StreamNetworkThroughputBenchmarkExecutor.networkThroughput                            1000,100ms,SSL  thrpt   30   7691.287 ±  927.775  ops/ms
StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1                                         N/A   avgt   30     12.758 ±    0.147   ms/op
@wsry
Copy link
Contributor Author

wsry commented Feb 27, 2020

@flinkbot run travis re-run the last Travis build

@wsry
Copy link
Contributor Author

wsry commented Feb 27, 2020

@flinkbot run travis

@zhijiangW
Copy link
Contributor

The failure case in travis is unrelated to this PR, merging

@zhijiangW zhijiangW merged commit 92253f2 into apache:master Feb 27, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants