Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-135][FOLLOWUP][Improvement][AQE] Assign adjacent partitions to the same ShuffleServer #307

Merged
merged 7 commits into from
Nov 11, 2022

Conversation

leixm
Copy link
Contributor

@leixm leixm commented Nov 7, 2022

What changes were proposed in this pull request?

Follow issue#136, allocate adjacent partitions to the same ShuffleServer. When the client calls getShuffleResultForMultiPart, the number of ShuffleServer requests is minimized

Why are the changes needed?

Bring some performance improvement

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

@jerqi jerqi changed the title [Improvement][AQE] Assign adjacent partitions to the same ShuffleServer. [Improvement][AQE] Assign adjacent partitions to the same ShuffleServer Nov 7, 2022
@jerqi
Copy link
Contributor

jerqi commented Nov 7, 2022

Spark usually start the task one by one, If we have 700 reduce tasks, 7 shuffle server, we usually start 0 - 99 task first. 0 - 99 reduce will be allocated to shuffle server 0. Will it increase too much pressure for shuffle server 0?
So I think origin implement may be useful for some situation. It's better to let users to choose which strategy is most suitable. We should separate the mechanism from strategy, let configuration to decide which way we want.
It's ok for me about server's configuration or client's configuration.

@leixm
Copy link
Contributor Author

leixm commented Nov 7, 2022

Spark usually start the task one by one, If we have 700 reduce tasks, 7 shuffle server, we usually start 0 - 99 task first. 0 - 99 reduce will be allocated to shuffle server 0. Will it increase too much pressure for shuffle server 0? So I think origin implement may be useful for some situation. It's better to let users to choose which strategy is most suitable. We should separate the mechanism from strategy, let configuration to decide which way we want. It's ok for me about server's configuration or client's configuration.

Good idea.

@jerqi
Copy link
Contributor

jerqi commented Nov 8, 2022

We should allocate continuous n reduce partitions to one shuffle server to avoid that too many task read the same shuffle server. If we have 700 reduce tasks, our concurrency is 100, we usually start 0 - 99 task first. if we allocate 10 continuous reduce partitions to one shuffle server, shuffle server 0 will have [0, 9] [80, 89] reduce partitions, shuffle server 1 will have [ 10, 19], [90, 99] reduce partition, shuffle server 3 will have [20, 29],[100, 109] reduce partition ....
We also need some performance test.

@leixm
Copy link
Contributor Author

leixm commented Nov 9, 2022

We should allocate continuous n reduce partitions to one shuffle server to avoid that too many task read the same shuffle server. If we have 700 reduce tasks, our concurrency is 100, we usually start 0 - 99 task first. if we allocate 10 continuous reduce partitions to one shuffle server, shuffle server 0 will have [0, 9] [80, 89] reduce partitions, shuffle server 1 will have [ 10, 19], [90, 99] reduce partition, shuffle server 3 will have [20, 29],[100, 109] reduce partition .... We also need some performance test.

How to decide how many contiguous partitions need to be allocated, like 10 mentioned above.

@leixm
Copy link
Contributor Author

leixm commented Nov 9, 2022

Like the above example, if assigning 7 consecutive partitions is the best, for example [0,6] is assigned to server1, [7,12] is assigned to server2, but we do not know the number of concurrent tasks.

@leixm
Copy link
Contributor Author

leixm commented Nov 9, 2022

We can read the executor configuration of spark through configuration, but the actual app running process may not be able to allocate so many resources.

@jerqi
Copy link
Contributor

jerqi commented Nov 9, 2022

We can read the executor configuration of spark through configuration, but the actual app running process may not be able to allocate so many resources.

If we use dynamic allocation, we can't know the number of executors. So I think we can give a configuration first, user can set that value. Similarly ByteDance Shuffle Service give the concurrency tasks through an experience formula, you can see https://github.com/bytedance/CloudShuffleService/blob/ef0ffb3f43f9f6e96af49629aed2a6ce61a6a2ab/spark-shuffle-manager-2/src/main/scala/org/apache/spark/shuffle/css/CssShuffleManager.scala#L64

@jerqi
Copy link
Contributor

jerqi commented Nov 9, 2022

Like the above example, if assigning 7 consecutive partitions is the best, for example [0,6] is assigned to server1, [7,12] is assigned to server2, but we do not know the number of concurrent tasks.

Maybe there are no difference between 7 and 10. We should need some performance tests here.

@codecov-commenter
Copy link

codecov-commenter commented Nov 10, 2022

Codecov Report

Merging #307 (a4ceeba) into master (cf63eae) will increase coverage by 0.28%.
The diff coverage is 86.89%.

@@             Coverage Diff              @@
##             master     #307      +/-   ##
============================================
+ Coverage     60.73%   61.01%   +0.28%     
- Complexity     1462     1489      +27     
============================================
  Files           180      185       +5     
  Lines          9229     9314      +85     
  Branches        887      900      +13     
============================================
+ Hits           5605     5683      +78     
- Misses         3325     3326       +1     
- Partials        299      305       +6     
Impacted Files Coverage Δ
...apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java 0.00% <ø> (ø)
...he/uniffle/client/impl/ShuffleWriteClientImpl.java 21.23% <ø> (ø)
...rg/apache/uniffle/client/util/RssClientConfig.java 0.00% <ø> (ø)
...java/org/apache/uniffle/common/util/Constants.java 0.00% <ø> (ø)
...he/uniffle/coordinator/CoordinatorGrpcService.java 2.29% <0.00%> (-0.02%) ⬇️
...oordinator/PartitionBalanceAssignmentStrategy.java 94.82% <60.00%> (-3.64%) ⬇️
...g/apache/uniffle/coordinator/CoordinatorUtils.java 72.00% <77.41%> (+6.78%) ⬆️
...org/apache/spark/shuffle/RssSparkShuffleUtils.java 54.54% <78.57%> (+5.33%) ⬆️
...niffle/coordinator/AbstractAssignmentStrategy.java 87.09% <82.60%> (-4.34%) ⬇️
.../coordinator/PerferDiffHostAssignmentStrategy.java 92.85% <92.85%> (ø)
... and 10 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@leixm
Copy link
Contributor Author

leixm commented Nov 10, 2022

Like the above example, if assigning 7 consecutive partitions is the best, for example [0,6] is assigned to server1, [7,12] is assigned to server2, but we do not know the number of concurrent tasks.

Maybe there are no difference between 7 and 10. We should need some performance tests here.

I will make some performance tests later.

@zuston
Copy link
Member

zuston commented Nov 10, 2022

We can read the executor configuration of spark through configuration, but the actual app running process may not be able to allocate so many resources.

If we use dynamic allocation, we can't know the number of executors. So I think we can give a configuration first, user can set that value. Similarly ByteDance Shuffle Service give the concurrency tasks through an experience formula, you can see https://github.com/bytedance/CloudShuffleService/blob/ef0ffb3f43f9f6e96af49629aed2a6ce61a6a2ab/spark-shuffle-manager-2/src/main/scala/org/apache/spark/shuffle/css/CssShuffleManager.scala#L64

Yes. This optimization has been applied in our internal uniffle, it works well.

@@ -101,6 +101,7 @@ This document will introduce how to deploy Uniffle coordinators.
|rss.coordinator.remote.storage.io.sample.access.times|3|The number of times to read and write HDFS files|
|rss.coordinator.startup-silent-period.enabled|false|Enable the startup-silent-period to reject the assignment requests for avoiding partial assignments. To avoid service interruption, this mechanism is disabled by default. Especially it's recommended to use in coordinator HA mode when restarting single coordinator.|
|rss.coordinator.startup-silent-period.duration|20000|The waiting duration(ms) when conf of rss.coordinator.startup-silent-period.enabled is enabled.|
|rss.coordinator.select.partition.strategy|AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND|There are two strategies for selecting partitions: ROUND and CONTINUOUS. ROUND will poll to allocate partitions to ShuffleServer, and CONTINUOUS will try to allocate consecutive partitions to ShuffleServer.|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we tell users that this config option can optimize the performance of AQE?

@@ -103,6 +103,8 @@ These configurations are shared by all types of clients.
|<client_type>.rss.client.io.compression.codec|lz4|The compression codec is used to compress the shuffle data. Default codec is `lz4`. Other options are`ZSTD` and `SNAPPY`.|
|<client_type>.rss.client.io.compression.zstd.level|3|The zstd compression level, the default level is 3|
|<client_type>.rss.client.shuffle.data.distribution.type|NORMAL|The type of partition shuffle data distribution, including normal and local_order. The default value is normal. Now this config is only valid in Spark3.x|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we tell users that this config option can optimize the performance of AQE?

@jerqi
Copy link
Contributor

jerqi commented Nov 11, 2022

@leixm
Copy link
Contributor Author

leixm commented Nov 11, 2022

I reused the environment in #190 to compare the performance before and after issue#136, and the performance improvement after merging the PR

Environment

Shuffle Server Num : 5
Shuffle Write: 48G
Configuration: --conf spark.sql.shuffle.partitions=5000 --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64MB --conf spark.dynamicAllocation.maxExecutors=200 --conf spark.executor.cores=6

We measure the performance of get_shuffle_result by the following metrics:

  • get_shuffle_result_times: The number of calls of the get_shuffle_result interface
  • get_shuffle_result_cost: Time consumption of get_shuffle_result interface
  • get_shuffle_result_for_multi_part_times:The number of calls of the get_shuffle_result_for_multi_part interface
  • get_shuffle_result_for_multi_part_cost: Time consumption of get_shuffle_result_for_multi_part interface

Test Results

Before issue_136

serverId get_shuffle_result_times get_shuffle_result_cost(ms)
Server1 1000 157614
Server2 1000 426897
Server3 1000 269488
Server4 1000 906758
Server5 1001 123217
sum 5001 1883974

After issue_136

serverId get_shuffle_result_for_multi_part_times get_shuffle_result_for_multi_part_cost(ms)
Server1 833 870720
Server2 833 260865
Server3 834 333202
Server4 833 90277
Server5 835 94113
sum 4168 1649177

After this pr

serverId get_shuffle_result_for_multi_part_times get_shuffle_result_for_multi_part_cost(ms)
Server1 168 40355
Server2 167 43852
Server3 167 98452
Server4 167 91838
Server5 168 25479
sum 837 299976

Summarize

After this pr, the number of interface requests is reduced by 79.9%, and the total time is reduced by 81.8%.

@@ -53,6 +54,59 @@ public static int nextIdx(int idx, int size) {
return idx;
}

/**
* Assign multiple adjacent partitionRanges to several servers
* Suppose totalPartitionNum=52, partitionNumPerRange=2, serverNum=5, estimateTaskConcurrency=20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionNumPerRange should be 1. We will remove range partition in the future.. We can use this to simplify the logic. Current implement is ok for me, too.

@@ -110,6 +110,7 @@ public void getShuffleAssignments(
final int replica = request.getDataReplica();
final Set<String> requiredTags = Sets.newHashSet(request.getRequireTagsList());
final int requiredShuffleServerNumber = request.getAssignmentShuffleServerNumber();
final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will the value be if the old client request the server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value will be 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it cause an unexpected result? If not, this feature will be compatible feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the estimateTaskConcurrency value is 0, and rss.coordinator.select.partition.strategy=CONTINUOUS, the assignment will be similar to ROUND strategy, you can check CoordinatorUtils#generateRangesGroup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @leixm @zuston

@jerqi jerqi changed the title [Improvement][AQE] Assign adjacent partitions to the same ShuffleServer [ISSUE-135][FOLLOWUP][Improvement][AQE] Assign adjacent partitions to the same ShuffleServer Nov 11, 2022
@jerqi jerqi merged commit 84f781f into apache:master Nov 11, 2022
@leixm
Copy link
Contributor Author

leixm commented Nov 11, 2022

We can read the executor configuration of spark through configuration, but the actual app running process may not be able to allocate so many resources.

If we use dynamic allocation, we can't know the number of executors. So I think we can give a configuration first, user can set that value. Similarly ByteDance Shuffle Service give the concurrency tasks through an experience formula, you can see https://github.com/bytedance/CloudShuffleService/blob/ef0ffb3f43f9f6e96af49629aed2a6ce61a6a2ab/spark-shuffle-manager-2/src/main/scala/org/apache/spark/shuffle/css/CssShuffleManager.scala#L64

Yes. This optimization has been applied in our internal uniffle, it works well.

Maybe we can apply this feature to our community and estimate the number of ShuffleServers needed according to the number of concurrent tasks.

@jerqi
Copy link
Contributor

jerqi commented Nov 11, 2022

We can read the executor configuration of spark through configuration, but the actual app running process may not be able to allocate so many resources.

If we use dynamic allocation, we can't know the number of executors. So I think we can give a configuration first, user can set that value. Similarly ByteDance Shuffle Service give the concurrency tasks through an experience formula, you can see https://github.com/bytedance/CloudShuffleService/blob/ef0ffb3f43f9f6e96af49629aed2a6ce61a6a2ab/spark-shuffle-manager-2/src/main/scala/org/apache/spark/shuffle/css/CssShuffleManager.scala#L64

Yes. This optimization has been applied in our internal uniffle, it works well.

Maybe we can apply this feature to our community and estimate the number of ShuffleServers needed according to the number of concurrent tasks.

Would you contribute this feature and let the @zuston help you review this feature?

@zuston
Copy link
Member

zuston commented Nov 11, 2022

I'm grad to review this feature if you want @leixm

@leixm
Copy link
Contributor Author

leixm commented Nov 13, 2022

I'm grad to review this feature if you want @leixm

Thank you, i will raise a pr for this feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants