Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-137][Improvement][AQE] Sort MapId before the data are flushed #293

Merged
merged 16 commits into from
Nov 5, 2022

Conversation

zuston
Copy link
Member

@zuston zuston commented Nov 1, 2022

What changes were proposed in this pull request?

Introduce a new mechanism to determine how to write data to file, including directly append or append after sort.

Why are the changes needed?

In our internal uniffle deployment, 200+ shuffle-servers are in service. A single shuffle-server uses 4 SATA SSDs to be used as the localfile storage, the max network bandwidth is limited to 5G/s. The storageType of the shuffle-server is MEMORY_LOCALFILE.

After monitoring the read_data_rate metric, I found it always will reach the max network bandwidth. However, at that time, the number of apps running was low. And only single disk usage is 100%.

After digging into the shuffle-server’s log, I found almost all requests with the same AppId and the same Partition to get the shuffle data from the same partition data file. This indicates the reason for high disk utilization due to the hotspots of reading.

It was found that this App’s shuffle-read was optimized by AQE skew data split, which causes the Uniffle shuffle-server high-pressure of network and diskIO.

After catching this point, I analyzed the performance of historical tasks using different shuffle-services briefly.

And in current implementation, one partition’s buffer will be flushed to disk once the size reaches the threshold of 64M. And the spark/mr uniffle client will fetch one batch data of 14M size(default value). That means for one buffer of one partition, the client needs to have 5 network interactions with the shuffle-server if the data with MapId is relatively discrete.

To solve this problem, we could make the 64M buffer’s data sorted by MapId. That means for the uniffle client, ideally it will read one time in a single buffer.

Does this PR introduce any user-facing change?

  1. Introduce the shuffle-data distribution type, NORMAL or LOCAL_ORDER, which can have other implementations, like GLOBAL_ORDER.
  2. Make the segment split strategy as a general interface for above different data distribution type.

How was this patch tested?

  1. UTs
  2. Spark Tests on offline hadoop cluster

Benchmark

Table1: 100g, dtypes: Array[(String, String)] = Array((v1,StringType), (k1,IntegerType)). And all columns of k1 have the same value (value = 10)

Table2: 10 records, dtypes: Array[(String, String)] = Array((k2,IntegerType), (v2,StringType)). And it has the only one record of k2=10

Environment: 100 executors(1core2g)
SQL: spark.sql("select * from Table1,Table2 where k1 = k2").write.mode("overwrite").parquet("xxxxxx")

  • Uniffle without patch: cost 12min
  • Uniffle with patch: cost 4min

Reference

  1. Design doc: https://docs.google.com/document/d/1G0cOFVJbYLf2oX1fiadh7zi2M6DlEcjTQTh4kSkb0LA/edit?usp=sharing

@codecov-commenter
Copy link

codecov-commenter commented Nov 2, 2022

Codecov Report

Merging #293 (17f3619) into master (7d4428e) will increase coverage by 0.65%.
The diff coverage is 70.05%.

@@             Coverage Diff              @@
##             master     #293      +/-   ##
============================================
+ Coverage     60.10%   60.76%   +0.65%     
- Complexity     1413     1454      +41     
============================================
  Files           175      179       +4     
  Lines          9082     9201     +119     
  Branches        872      882      +10     
============================================
+ Hits           5459     5591     +132     
+ Misses         3331     3316      -15     
- Partials        292      294       +2     
Impacted Files Coverage Δ
...apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java 0.00% <0.00%> (ø)
...e/uniffle/client/factory/ShuffleClientFactory.java 0.00% <0.00%> (ø)
...he/uniffle/client/impl/ShuffleWriteClientImpl.java 21.23% <ø> (ø)
...client/request/CreateShuffleReadClientRequest.java 0.00% <0.00%> (ø)
...uniffle/common/segment/SegmentSplitterFactory.java 0.00% <0.00%> (ø)
.../java/org/apache/uniffle/common/util/RssUtils.java 63.07% <ø> (-5.98%) ⬇️
...pache/uniffle/server/ShuffleServerGrpcService.java 0.86% <0.00%> (-0.01%) ⬇️
...uniffle/storage/factory/ShuffleHandlerFactory.java 0.00% <0.00%> (ø)
...handler/impl/LocalFileQuorumClientReadHandler.java 0.00% <0.00%> (ø)
...orage/request/CreateShuffleReadHandlerRequest.java 0.00% <0.00%> (ø)
... and 21 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@zuston zuston changed the title [WIP][Improvement][AQE] Sort MapId before the data are flushed [Improvement][AQE] Sort MapId before the data are flushed Nov 3, 2022
@jerqi jerqi linked an issue Nov 3, 2022 that may be closed by this pull request
@jerqi jerqi changed the title [Improvement][AQE] Sort MapId before the data are flushed [ISSUE-137][Improvement][AQE] Sort MapId before the data are flushed Nov 3, 2022
@jerqi
Copy link
Contributor

jerqi commented Nov 4, 2022

cc @leixm . Maybe you have interest about this feature.

import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;

public class LocalOrderSegmentSplitter implements SegmentSplitter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our blockId have the taskAttemptId information. We usually use the blockId to filter the data. You can see

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think twice.

Copy link
Contributor

@jerqi jerqi Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between LocalOrderSegmentSplitter and FixedSizeSegementSplitter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the startMapId and endMapId.

What's the difference between LocalOrderSegmentSplitter and FixedSizeSegementSplitter?

LocalOrderSegmentSplitter is to get the segments according to the index file local order.

FixedSizeSegementSplitter is to keep the consistent with the original segment split logic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, maybe we could introduce the global order strategy which will be implemented in current abstraction easier.

fileOffset = -1;
}

if (expectTaskIds.contains(taskAttemptId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data of ShuffleDataSegment seems to be continuous. It will be wrong when you filter one record.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This point need to be ensured.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe more in detail ?

Copy link
Contributor

@jerqi jerqi Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have 4 blocks, block 1, block 2, block 3, block4, block 5. Block 3 is filtered. Block 1, Block 2, Block 4, Block 5 can't be ShuffleDataSegment. The blocks in one ShuffleDataSegment must be continous. You can see

bs.getOffset(), bs.getLength()), bs.getUncompressLength());
for details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds we still need to use startMapId to get the data.

startMapId will also filter the data. It's the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that my worried case won't happen because our data is sorted. But we also need to check this point again and add some comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This splitter is bound to the LOCAL_ORDER. I don't quite understand what you mean.

Copy link
Contributor

@jerqi jerqi Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExpectTaskId should be a range. The range is between startMapId and endMapId. If a blockId A isn't in this range, there won't be any other blockId after blockId A in this ShuffleDataSegement. Because data is local order in the ShuffleDataSegment. So there won't discontinuous blockIds in a ShuffleDataSegement. The discontinuous blockIds in a ShuffleDataSegment isn't permitted in our system. You can see related logic for details in

bs.getOffset(), bs.getLength()), bs.getUncompressLength());

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. This won’t happen, the index data will be split to multi ordered parts. And then to get the segments in the range by expectTaskIds.

Maybe I need to add more comment on this class header.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rethink the discontinuous problem you mentioned will only occur if the block's taskId is not in this reader partition and this taskId is in the expectTaskIds range in this index file. like this index file, its blocks are as follow:

Block-A (partition: 1, taskId: 1)
Block-B (partition: 2, taskId: 1)
Block-C (partition: 1, taskId: 2)

If the reader want to get the partition-1 data and the taskId range is in [1, 3). And so the Block-B will be filtered, which will cause discontinuous problem.

I think we need to sort the index file by two factors, first one is partitionId, second one is the taskId.

Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for some minors.

Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, wait for CI. Great work! thanks @zuston

@jerqi jerqi merged commit 74949f5 into apache:master Nov 5, 2022
@leixm
Copy link
Contributor

leixm commented Nov 7, 2022

cc @leixm . Maybe you have interest about this feature.

It's a pity that I was busy and didn't participate in the review.

@zuston
Copy link
Member Author

zuston commented Nov 8, 2022

cc @leixm . Maybe you have interest about this feature.

It's a pity that I was busy and didn't participate in the review.

Looking forward to your ideas on this optimization.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Improvement][AQE] Sort MapId before the data are flushed
4 participants