-
Notifications
You must be signed in to change notification settings - Fork 134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Improvement][AQE] Support getting memory data skip by upstream task ids #358
Conversation
I have proposed a draft implementation @jerqi @xianjingfeng If you have time, please take a look. |
client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
Outdated
Show resolved
Hide resolved
Should we give this pr a performance test? |
Codecov Report
@@ Coverage Diff @@
## master #358 +/- ##
============================================
+ Coverage 58.01% 58.56% +0.55%
- Complexity 1361 1586 +225
============================================
Files 171 193 +22
Lines 9006 10881 +1875
Branches 787 953 +166
============================================
+ Hits 5225 6373 +1148
- Misses 3449 4132 +683
- Partials 332 376 +44
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
Outdated
Show resolved
Hide resolved
Performance test has been attached in description. It works well |
server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
Show resolved
Hide resolved
@@ -119,6 +120,9 @@ public RssShuffleReader( | |||
this.partitionToShuffleServers = rssShuffleHandle.getPartitionToServers(); | |||
this.rssConf = rssConf; | |||
this.dataDistributionType = dataDistributionType; | |||
// This mechanism of expectedTaskIdsBitmap filter is to filter out the most of data. | |||
// especially for AQE skew optimization | |||
this.expectedTaskIdsBitmapFilterEnable = mapEndIndex == Integer.MAX_VALUE ? false : true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is the last reduce partition, may the range of mapId be [n, Integer.MAX_VALUE]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So do we need to use the startMapIndex==0
and mapEndIndex==max_value
to judge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it's more accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
@xianjingfeng Do you have another suggestion? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @zuston @xianjingfeng , I will add @xianjingfeng as this pr's co-author. Because this pr is based on pr #294
cc @bin41215 Maybe you have interest about this pr. |
What changes were proposed in this pull request?
Support getting memory data skip by upstream task ids
Why are the changes needed?
In current codebase, when the shuffle-server memory is large and
job is optimized by AQE skew rule, the multiple readers of the same
partition will get the shuffle data from the same shuffle-server.
To avoid reading unused localfile/HDFS data, the PR of #137 has
introduce the LOCAL_ORDER mechanism to filter the most of data.
But for the storage of MEMORY, it still suffer from this. So this PR is to avoid
reading unused data for one reader, by expectedTaskIds bitmap to
filter.
And this optimization is only enabled when AQE skew is applied.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Benchmark
Table
Table1: 100g, dtypes: Array[(String, String)] = Array((v1,StringType), (k1,IntegerType)).
And all columns of k1 have the same value (value = 10)
Table2: 10 records, dtypes: Array[(String, String)] = Array((k2,IntegerType), (v2,StringType)).
And it has the only one record of k2=10
Env
Spark Resource Profile: 10 executors(1core2g)
Shuffle-server Environment: 10 shuffle servers, 10g for buffer read and write.
Spark Shuffle Client Config: storage type: MEMORY_LOCALFILE with LOCAL_ORDER
SQL: spark.sql("select * from Table1,Table2 where k1 = k2").write.mode("overwrite").parquet("xxxxxx")
Result
ESS: cost
3min
Uniffle without patch: cost
11.6min
(2.1 + 9.5)Uniffle with patch: cost
3.5min
(2.1 + 1.4)