Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] Skip blocks when read from memory #294

Merged
merged 8 commits into from
Dec 11, 2022

Conversation

xianjingfeng
Copy link
Member

@xianjingfeng xianjingfeng commented Nov 2, 2022

What changes were proposed in this pull request?

Skip blocks which not in expected blockId range when read from memory.

Why are the changes needed?

1.If we use AQE, every task will read data from all partitions.
2.If the data of the first shuffle server is incomplete, we need to read from another server if #276 is merged.
Both of the above situations will lead to read redundant data from shuffle server.

Does this PR introduce any user-facing change?

Set rss.client.read.block.skip.strategy to BLOCKID_RANGE.

How was this patch tested?

Already added

@codecov-commenter
Copy link

codecov-commenter commented Nov 2, 2022

Codecov Report

Merging #294 (72195de) into master (884921b) will decrease coverage by 0.52%.
The diff coverage is 50.94%.

@@             Coverage Diff              @@
##             master     #294      +/-   ##
============================================
- Coverage     59.24%   58.71%   -0.53%     
- Complexity     1456     1614     +158     
============================================
  Files           180      194      +14     
  Lines          9631    11024    +1393     
  Branches        835      971     +136     
============================================
+ Hits           5706     6473     +767     
- Misses         3577     4167     +590     
- Partials        348      384      +36     
Impacted Files Coverage Δ
.../java/org/apache/hadoop/mapreduce/RssMRConfig.java 23.07% <ø> (ø)
...pache/hadoop/mapreduce/task/reduce/RssShuffle.java 0.00% <0.00%> (ø)
...e/uniffle/client/factory/ShuffleClientFactory.java 0.00% <0.00%> (ø)
...client/request/CreateShuffleReadClientRequest.java 0.00% <0.00%> (ø)
...rg/apache/uniffle/client/util/RssClientConfig.java 0.00% <ø> (ø)
...a/org/apache/uniffle/common/BlockSkipStrategy.java 0.00% <0.00%> (ø)
...pache/uniffle/server/ShuffleServerGrpcService.java 0.81% <0.00%> (-0.01%) ⬇️
.../org/apache/uniffle/server/ShuffleTaskManager.java 77.22% <0.00%> (ø)
...uniffle/storage/factory/ShuffleHandlerFactory.java 0.00% <0.00%> (ø)
.../storage/handler/impl/MemoryClientReadHandler.java 0.00% <0.00%> (ø)
... and 87 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@zuston
Copy link
Member

zuston commented Nov 3, 2022

Thanks for proposing this PR, overall it will benefit more for some big memory shuffle-server.

After a brief look, I have a question that whether the memory size of processBlockId and expectBlockId are very large, maybe it will have some extra overhead especially for frequently getInMemoryData. Can we send one time for a same MemoryClientReadHandler

@xianjingfeng
Copy link
Member Author

Thanks for proposing this PR, overall it will benefit more for some big memory shuffle-server.

After a brief look, I have a question that whether the memory size of processBlockId and expectBlockId are very large, maybe it will have some extra overhead especially for frequently getInMemoryData. Can we send one time for a same MemoryClientReadHandler

Good idea. I will try.

@frankliee
Copy link
Contributor

frankliee commented Nov 4, 2022

I have two suggestions:

  1. Use bloomfilter or bitmap instead of complete processBlockIds and expectBlockIds.
    Because these blockIds can be very large, while the size of needed data can be small.

  2. Reserve the unskipped interface for the client with the lower version.

@xianjingfeng
Copy link
Member Author

I have two suggestions:

  1. Use bloomfilter or bitmap instead of complete processBlockIds and expectBlockIds.
    Because these blockIds can be very large, while the size of needed data can be small.

Bloomfilter is not suitable here, because it can only make sure whether does not exist. And you can see RssUtils.serializeBitMap

@frankliee
Copy link
Contributor

I have two suggestions:

  1. Use bloomfilter or bitmap instead of complete processBlockIds and expectBlockIds.
    Because these blockIds can be very large, while the size of needed data can be small.

Bloomfilter is not suitable here, because it can only make sure whether does not exist. And you can see RssUtils.serializeBitMap

Maybe we do not need precision skipping ?
We could use the set of (expectBlockIds - processBlockIds) to build a bloomfilter.
The blocks that does not fit bloomfilter can be skipped.

@frankliee
Copy link
Contributor

Besides, it is better to add [ISSUE-ID] in title.

@xianjingfeng
Copy link
Member Author

Maybe we do not need precision skipping ?
We could use the set of (expectBlockIds - processBlockIds) to build a bloomfilter.
The blocks that does not fit bloomfilter can be skipped.

I think precision skipping is better. And i will send one time for a same MemoryClientReadHandler

@xianjingfeng
Copy link
Member Author

Besides, it is better to add [ISSUE-ID] in title.

I have not create issue for this pr. it is a part of #129, and issue #124 is for #129. Use [ISSUE-124] or create another?

@jerqi
Copy link
Contributor

jerqi commented Nov 4, 2022

Maybe we can pass the min and max blockId to replace bitmap?

@frankliee
Copy link
Contributor

frankliee commented Nov 4, 2022

Maybe we do not need precision skipping ?
We could use the set of (expectBlockIds - processBlockIds) to build a bloomfilter.
The blocks that does not fit bloomfilter can be skipped.

I think precision skipping is better. And i will send one time for a same MemoryClientReadHandler

The client side already have precision skipping. The bitmap of all blockIds can still be very large for data skew.
Coarse-grained skipping has been widely used, such as parquet, spark runtime filter and clickhouse.
Besides bloomfilter, the min-max of blockIds can also be a potential option.

@xianjingfeng
Copy link
Member Author

1.I think min and max blockId is ok, but processBlockIds is discontinuous and maybe we need use array.
2.If we send one time for a same MemoryClientReadHandler, we need store some info for it. It may cost a lot of memory if we have many tasks. Is this still needed?
@jerqi @frankliee @zuston

@jerqi
Copy link
Contributor

jerqi commented Nov 4, 2022

1.I think min and max blockId is ok, but processBlockIds is discontinuous and maybe we need use array. 2.If we send one time for a same MemoryClientReadHandler, we need store some info for it. It may cost a lot of memory if we have many tasks. Is this still needed? @jerqi @frankliee @zuston

We seems that we don't need the processedBlock. We can reduce the expect blockIds range according to processed blocks.

@xianjingfeng
Copy link
Member Author

We seems that we don't need the processedBlock. We can reduce the expect blockIds range according to processed blocks.

Get.

@xianjingfeng
Copy link
Member Author

We seems that we don't need the processedBlock. We can reduce the expect blockIds range according to processed blocks.

But the final expect blockIds is also discontinuous. I'm going to use an arrry to store it, like [start1, end1, start2, end2], and if endN-startN is too small, i will remove it for reduce its size.

@jerqi
Copy link
Contributor

jerqi commented Nov 4, 2022

We seems that we don't need the processedBlock. We can reduce the expect blockIds range according to processed blocks.

But the final expect blockIds is also discontinuous. I'm going to use an arrry to store it, like [start1, end1, start2, end2], and if endN-startN is too small, i will remove it for reduce its size.

We can limit the array size and try our best to filter more data which we have processed.

@jerqi
Copy link
Contributor

jerqi commented Nov 4, 2022

Maybe we need some POC to verify the effect of every method.

@jerqi
Copy link
Contributor

jerqi commented Nov 11, 2022

This pr can also optimize the AQE performance. @leixm Maybe you have interest.

@jerqi
Copy link
Contributor

jerqi commented Nov 18, 2022

Maybe we can support multiple filters. Users can choose the filter which they like. MinMax may be good for AQE situation. Bitmap may be good for multiple replicas. We should some extra tests.

@zuston
Copy link
Member

zuston commented Nov 24, 2022

I have a question that the MinMax range is task id min max? @jerqi Not blockId?

@zuston
Copy link
Member

zuston commented Nov 24, 2022

I have a question that the MinMax range is task id min max? @jerqi Not blockId?

And why not directly use the taskIdBitmap to filter most data ? Especially for AQE

Do you mind I pick up this ticket to improve the AQE skew performance? If you hope this also could support multiple replicas, you could go on. @xianjingfeng

@jerqi
Copy link
Contributor

jerqi commented Nov 24, 2022

I have a question that the MinMax range is task id min max? @jerqi Not blockId?

And why not directly use the taskIdBitmap to filter most data ? Especially for AQE

Do you mind I pick up this ticket to improve the AQE skew performance? If you hope this also could support multiple replicas, you could go on. @xianjingfeng

Bitmap is ok. We have concern about the size of bitmap. It need some tests.

@zuston
Copy link
Member

zuston commented Nov 24, 2022

I have a question that the MinMax range is task id min max? @jerqi Not blockId?

And why not directly use the taskIdBitmap to filter most data ? Especially for AQE
Do you mind I pick up this ticket to improve the AQE skew performance? If you hope this also could support multiple replicas, you could go on. @xianjingfeng

Bitmap is ok. We have concern about the size of bitmap. It need some tests.

The size of taskIdsBitmap shoud be small. Actually, it only contains the limited task ids.

@jerqi
Copy link
Contributor

jerqi commented Nov 25, 2022

I have a question that the MinMax range is task id min max? @jerqi Not blockId?

And why not directly use the taskIdBitmap to filter most data ? Especially for AQE
Do you mind I pick up this ticket to improve the AQE skew performance? If you hope this also could support multiple replicas, you could go on. @xianjingfeng

Bitmap is ok. We have concern about the size of bitmap. It need some tests.

The size of taskIdsBitmap shoud be small. Actually, it only contains the limited task ids.

100w tasks will occupy 125k memory. If we use blockBitmap, the blockBitmap may occupy serveral MB.

@xianjingfeng
Copy link
Member Author

I think in most cases, we only need to read from one replica, so i think we can give priority to AQE.

100w tasks will occupy 125k memory.

And i think this is acceptable and taskIdBitmap is more precision for multiple replicas.
@jerqi @zuston

@zuston
Copy link
Member

zuston commented Nov 25, 2022

I think in most cases, we only need to read from one replica, so i think we can give priority to AQE.

100w tasks will occupy 125k memory.

And i think this is acceptable and taskIdBitmap is more precision for multiple replicas. @jerqi @zuston

It's OK for me. I think we could disable this taskIdBitmap filter in no-AQE optimization. And especially for AQE, the taskIds size for one reader should not be large.

What do u think so? @jerqi @xianjingfeng

@xianjingfeng
Copy link
Member Author

I'm tangled to choose taskbitmap or minmax as multi replica filter. Could we discuss this problem? If taskbitmap is small enough, Could we choose taskBitmap as multi replicas, too? It will be more easy to combine AQE and multi replicas.

Theoretically, i think min-max will be smaller and more precise for multi replicas in actual use. Because processedBlockIds is basically continuous in most cases.

@jerqi
Copy link
Contributor

jerqi commented Dec 5, 2022

How to combine aqe and multi replicas? Do we need pass two filters?

@xianjingfeng
Copy link
Member Author

How to combine aqe and multi replicas? Do we need pass two filters?

We just need pass one of them. I think taskIdBitmap will support multi replicas in the future. If @zuston doesn't do it, i will do it.

@zuston
Copy link
Member

zuston commented Dec 6, 2022

How to combine aqe and multi replicas? Do we need pass two filters?

We just need pass one of them. I think taskIdBitmap will support multi replicas in the future. If @zuston doesn't do it, i will do it.

Feel free to do this. I have no plan to support multiple replicas filter.

@zuston
Copy link
Member

zuston commented Dec 6, 2022

Theoretically, i think min-max will be smaller and more precise for multi replicas in actual use. Because processedBlockIds is basically continuous in most cases.

@xianjingfeng A little question: does the min-max blockIds filter and taskIdBitmap filter are exclusive?

@xianjingfeng
Copy link
Member Author

Theoretically, i think min-max will be smaller and more precise for multi replicas in actual use. Because processedBlockIds is basically continuous in most cases.

@xianjingfeng A little question: does the min-max blockIds filter and taskIdBitmap filter are exclusive?

Yes

@jerqi
Copy link
Contributor

jerqi commented Dec 6, 2022

Could we add some performance tests for this feature with production jobs?

@xianjingfeng
Copy link
Member Author

Could we add some performance tests for this feature with production jobs?

I will

@xianjingfeng
Copy link
Member Author

xianjingfeng commented Dec 9, 2022

Performance Test

Table

Table1: 10g, dtypes: Array[(String, String)] = Array((v1,StringType), (k1,StringType)).
And all columns of k1 have the same value (value = 10)

Table2: 10 records, dtypes: Array[(String, String)] = Array((k2,StringType), (v2,StringType)).
And it has the only one record of k2=10

Env

Spark Resource Profile: 10 executors(1core4g)
Shuffle-server Environment: 6 shuffle servers, 20g for buffer read and 40g for buffer write.
Spark Shuffle Client Config: storage type: MEMORY_LOCALFILE_HDFS with LOCAL_ORDER
SQL: spark.sql("select * from Table1,Table2 where k1 = k2").write.mode("overwrite").parquet("xxxxxx")

Result

BITMAP and MINMAX look similar. I think their gap has little impact on the overall performance. See the following picture.
sc-20221209163411

cc @jerqi @zuston

@jerqi
Copy link
Contributor

jerqi commented Dec 9, 2022

Performance Test

Table

Table1: 10g, dtypes: Array[(String, String)] = Array((v1,StringType), (k1,StringType)). And all columns of k1 have the same value (value = 10)

Table2: 10 records, dtypes: Array[(String, String)] = Array((k2,StringType), (v2,StringType)). And it has the only one record of k2=10

Env

Spark Resource Profile: 10 executors(1core4g) Shuffle-server Environment: 6 shuffle servers, 20g for buffer read and 40g for buffer write. Spark Shuffle Client Config: storage type: MEMORY_LOCALFILE_HDFS with LOCAL_ORDER SQL: spark.sql("select * from Table1,Table2 where k1 = k2").write.mode("overwrite").parquet("xxxxxx")

Result

BITMAP and MINMAX look similar. I think their gap has little impact on the overall performance. See the following picture. sc-20221209163411

cc @jerqi @zuston

OK.

@jerqi
Copy link
Contributor

jerqi commented Dec 10, 2022

Could you modify the description of this pr? Do it only add a range filter strategy for AQE? Will multi replicas support to filter data in this pr?

@xianjingfeng
Copy link
Member Author

Could you modify the description of this pr?

Done.

Do it only add a range filter strategy for AQE? Will multi replicas support to filter data in this pr?

Support multi replicas too.

}

@Override
public ShuffleDataResult readShuffleData() {
if (BlockSkipStrategy.BLOCKID_RANGE.equals(blockSkipStrategy) && lastBlockId == Constants.INVALID_BLOCK_ID) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we judge the lastBlockId == Constants.INVALID_BLOCK_ID?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need to build the blockId range at the first time the handler read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it.

Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, @zuston Do you have another suggestion?

@jerqi jerqi merged commit 55191c4 into apache:master Dec 11, 2022
@jerqi
Copy link
Contributor

jerqi commented Dec 11, 2022

Merged. Thanks all @frankliee @zuston @xianjingfeng . @xianjingfeng Could you raise a follow-up pr to add some docs about this feature?

@xianjingfeng
Copy link
Member Author

Could you raise a follow-up pr to add some docs about this feature?

Yes.

@xianjingfeng
Copy link
Member Author

BLOCKID_RANGE is not a good choice now, because blockId is not continuous. My fault. 😂 @jerqi

// BlockId is long and composed of partitionId, executorId and AtomicInteger.
// AtomicInteger is first 19 bit, max value is 2^19 - 1
// partitionId is next 24 bit, max value is 2^24 - 1
// taskAttemptId is rest of 20 bit, max value is 2^20 - 1
public static Long getBlockId(long partitionId, long taskAttemptId, long atomicInt) {

Should we remove it or modify blockid generation rule?

@jerqi
Copy link
Contributor

jerqi commented Dec 12, 2022

BLOCKID_RANGE is not a good choice now, because blockId is not continuous. My fault. 😂 @jerqi

// BlockId is long and composed of partitionId, executorId and AtomicInteger.
// AtomicInteger is first 19 bit, max value is 2^19 - 1
// partitionId is next 24 bit, max value is 2^24 - 1
// taskAttemptId is rest of 20 bit, max value is 2^20 - 1
public static Long getBlockId(long partitionId, long taskAttemptId, long atomicInt) {

Should we remove it or modify blockid generation rule?

Let's remove it. We can use taskBitmap as the replica filter. It's hard to modify block generation rule. It will be imcompatible feature.

@xianjingfeng
Copy link
Member Author

Let's remove it. We can use taskBitmap as the replica filter. It's hard to modify block generation rule. It will be imcompatible feature.

Revert directly or keep some modification, such as BlockSkipStrategy?

@xianjingfeng
Copy link
Member Author

I wonder why we put AtomicInteger in front of blockId? What is the purpose of this design? @jerqi

xianjingfeng added a commit to xianjingfeng/incubator-uniffle that referenced this pull request Dec 12, 2022
jerqi pushed a commit that referenced this pull request Dec 12, 2022
This reverts commit 55191c4.

### What changes were proposed in this pull request?
Revert #294 

### Why are the changes needed?
BlockId is discontinuous, so BLOCKID_RANGE is not a good choice to filter memory data

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
No need
@jerqi
Copy link
Contributor

jerqi commented Dec 12, 2022

I wonder why we put AtomicInteger in front of blockId? What is the purpose of this design? @jerqi

Reduce the size of RoaringBitmap. We should put the worst frequently data to higher bit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants