Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement][AQE] Sort MapId before the data are flushed #137

Closed
jerqi opened this issue Aug 6, 2022 · 17 comments · Fixed by #293
Closed

[Improvement][AQE] Sort MapId before the data are flushed #137

jerqi opened this issue Aug 6, 2022 · 17 comments · Fixed by #293
Assignees

Comments

@jerqi
Copy link
Contributor

jerqi commented Aug 6, 2022

When we use aqe, we need use mapId to filter the data which we don't need, If we sort MapId before the data are flushed. We split the data to segments, if a segment don't have the data which we want to read, we will drop the data. If data is sorted by mapId, we can filter more data and mprove our performance.

@zuston
Copy link
Member

zuston commented Aug 28, 2022

Do we need to sort data by MapID of one partition before flushing data for all jobs? I think no. This will bring unused cost for those non-AQE optimized stages. Maybe we could sort the partition data by MapId when AQE's specified ShufflePartitionSpec is applied in first time.

@jerqi
Copy link
Contributor Author

jerqi commented Aug 28, 2022

You are right.

@zuston
Copy link
Member

zuston commented Oct 9, 2022

Do u have implemented this in your internal version? If not, I'm interested on this. @jerqi

@jerqi
Copy link
Contributor Author

jerqi commented Oct 9, 2022

No. You can go ahead.

@zuston zuston changed the title [Improvement][Aqe] Sort MapId before the data are flushed [Improvement][AQE] Sort MapId before the data are flushed Oct 9, 2022
@zuston zuston self-assigned this Oct 9, 2022
@zuston
Copy link
Member

zuston commented Oct 28, 2022

@jerqi
Copy link
Contributor Author

jerqi commented Oct 28, 2022

It's better to sort MapId before the data are flushed.It won't bring too much cost for non-AQE optimized stages.

@zuston
Copy link
Member

zuston commented Oct 28, 2022

It's better to sort MapId before the data are flushed.It won't bring too much cost for non-AQE optimized stages.

Does data need to sort by mapId?

@jerqi
Copy link
Contributor Author

jerqi commented Oct 28, 2022

It's better to sort MapId before the data are flushed.It won't bring too much cost for non-AQE optimized stages.

Does data need to sort by mapId?

Yes, we only need local order. If we have local order, we can filter much data effectively.

@zuston
Copy link
Member

zuston commented Oct 28, 2022

It's better to sort MapId before the data are flushed.It won't bring too much cost for non-AQE optimized stages.

Does data need to sort by mapId?

Yes, we only need local order. If we have local order, we can filter much data effectively.

Emm... I remember you prefer only sort the index-file instead of data-file, which is mentioned in offline meeting. Do i misunderstand you?

@jerqi
Copy link
Contributor Author

jerqi commented Oct 28, 2022

It's better to sort MapId before the data are flushed.It won't bring too much cost for non-AQE optimized stages.

Does data need to sort by mapId?

Yes, we only need local order. If we have local order, we can filter much data effectively.

Emm... I remember you prefer only sort the index-file instead of data-file, which is mentioned in offline meeting. Do i misunderstand you?

Give an example:
We have three buffers to flush, they taskId 1 block, taskId 2 block, taskId 3 block. We should sort them to taskId 1 block, taskId 2 block, taskId 3 block. And then we can flush them to disks.Then we receive taskId 2 block, taskId 6 block, taskId 1 block, we sort them and flush them, so currently the data on the disk should be
taskId 1 block , taskId 2 block, taskId 3 block, taskId 1 block, taskId 2 block, taskId 6 block.
The data only have local order.

@zuston
Copy link
Member

zuston commented Oct 28, 2022

taskId-1 block , taskId-2 block, taskId-3 block, taskId-1 block, taskId-2 block, taskId-6 block.

If one reader want the data from taskId=1, so it still want to read the data segment from taskId-1 block , taskId-2 block, taskId-3 block, taskId-1 block. The data of taskId-2 block, taskId-3 block is unnecessary for this reader. Right?

@jerqi
Copy link
Contributor Author

jerqi commented Oct 28, 2022

taskId-1 block , taskId-2 block, taskId-3 block, taskId-1 block, taskId-2 block, taskId-6 block.

If one reader want the data from taskId=1, so it still want to read the data segment from taskId-1 block , taskId-2 block, taskId-3 block, taskId-1 block. The data of taskId-2 block, taskId-3 block is unnecessary for this reader. Right?

Yes.

@zuston
Copy link
Member

zuston commented Oct 28, 2022

This looks ineffective and it's the same with the original block filter.

@jerqi
Copy link
Contributor Author

jerqi commented Oct 28, 2022

This looks ineffective and it's the same with the original block filter.

Actually considering random io, It will cost the same time when you read 3 records or 2 records.

@zuston
Copy link
Member

zuston commented Oct 28, 2022

This looks ineffective and it's the same with the original block filter.

Actually considering random io, It will cost the same time when you read 3 records or 2 records.

Yes. According to the problems mentioned by proposal design motivation section, the key point is a lot of data read by multiple times which depends on split number optimized by AQE. From this view, we should sort the data file.

@jerqi
Copy link
Contributor Author

jerqi commented Oct 28, 2022

This looks ineffective and it's the same with the original block filter.

Actually considering random io, It will cost the same time when you read 3 records or 2 records.

Yes. According to the problems mentioned by proposal design motivation section, the key point is a lot of data read by multiple times which depends on split number optimized by AQE. From this view, we should sort the data file.

We don't need global order, local order should be enough.

@zuston
Copy link
Member

zuston commented Nov 3, 2022

#293

@jerqi jerqi closed this as completed in #293 Nov 5, 2022
jerqi pushed a commit that referenced this issue Nov 29, 2022
…ids (#358)

### What changes were proposed in this pull request?

Support getting memory data skip by upstream task ids

### Why are the changes needed?

In current codebase, when the shuffle-server memory is large and 
job is optimized by AQE skew rule, the multiple readers of the same 
partition will get the shuffle data from the same shuffle-server. 

To avoid reading unused localfile/HDFS data, the PR of #137 has 
introduce the LOCAL_ORDER mechanism to filter the most of data. 

But for the storage of MEMORY, it still suffer from this. So this PR is to avoid 
reading unused data for one reader, by expectedTaskIds bitmap to
filter.

And this optimization is only enabled when AQE skew is applied.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?
1. UTs

### Benchmark

#### Table
Table1: 100g, dtypes: Array[(String, String)] = Array((v1,StringType), (k1,IntegerType)). 
And all columns of k1 have the same value (value = 10)

Table2: 10 records, dtypes: Array[(String, String)] = Array((k2,IntegerType), (v2,StringType)). 
And it has the only one record of k2=10

#### Env
Spark Resource Profile: 10 executors(1core2g)
Shuffle-server Environment: 10 shuffle servers, 10g for buffer read and write. 
Spark Shuffle Client Config: storage type: MEMORY_LOCALFILE with LOCAL_ORDER
SQL: spark.sql("select * from Table1,Table2 where k1 = k2").write.mode("overwrite").parquet("xxxxxx")

#### Result
__ESS__: cost `3min`
__Uniffle without patch__: cost `11.6min` (2.1 + 9.5)
__Uniffle with patch__: cost `3.5min` (2.1 + 1.4)

Co-authored-by: xianjingfeng <583872483@qq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants