New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send commit concurrently in client side #59
Conversation
Codecov Report
@@ Coverage Diff @@
## master #59 +/- ##
============================================
- Coverage 55.21% 55.16% -0.05%
+ Complexity 1111 1110 -1
============================================
Files 148 148
Lines 7953 7962 +9
Branches 760 760
============================================
+ Hits 4391 4392 +1
- Misses 3321 3328 +7
- Partials 241 242 +1
Continue to review full report at Codecov.
|
Do you have performance tests? I guess this pr can't improve the performance. Because the performance bottleneck of |
Yes. I tested When using localfile mode, it cost 7.3 min. |
As I know the spilling to disk event need to be triggered by client side. So if the previous trigger is blocked, the next one will |
We don't recommend to use the storageType |
@@ -247,43 +249,57 @@ public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo | |||
return new SendShuffleDataResult(successBlockIds, failedBlockIds); | |||
} | |||
|
|||
/** | |||
* This method will wait until all shuffle data have been spilled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spilled
-> flushed
.
Please put performance test results into |
@@ -17,6 +17,8 @@ | |||
|
|||
package org.apache.uniffle.client.util; | |||
|
|||
import org.apache.hadoop.io.OutputBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this?
private final ForkJoinPool dataTransferPool; | ||
|
||
public ShuffleWriteClientImpl(String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum, | ||
int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled, | ||
int dataTranferPoolSize) { | ||
int dataTranferPoolSize, int commitSenderPoolSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We prefer the code style as below
public ShuffleWriteClientImpl(
String clientType,
int retryMax,
long retryIntervalMax,
int heartBeatThreadNum,
int replica,
int replicaWrite,
int replicaRead,
boolean replicaSkipEnabled,
int dataTranferPoolSize,
int commitSenderPoolSize) {
public static final String RSS_COMMIT_SENDER_POOL_SIZE = | ||
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COMMIT_SENDER_POOL_SIZE; | ||
public static final int RSS_COMMIT_SENDER_POOL_SIZE_DEFAULT_VALUE = | ||
RssClientConfig.RSS_COMMIT_SENDER_POOL_SIZE_DEFAULT_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name's style should be consistent with data_transfer_pool_size
. How about data_commit_pool_size
?
Could you update the document about this feature? |
4b5389f |
If we close the forkjoin pool in the scope of method. I think it’s ok. |
Ok |
The performance of |
client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
Outdated
Show resolved
Hide resolved
Besides I think i can submit new PR to let |
client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
Outdated
Show resolved
Hide resolved
client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
Outdated
Show resolved
Hide resolved
We'd better have performance tests. |
}); | ||
}); | ||
}).join(); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use
finally {
forkJoinPool.shutdownNow();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My fault…..
Could you update the document because this pr introduce the user-facing change? |
Done @jerqi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What changes were proposed in this pull request?
Sending commit concurrently in client side
Why are the changes needed?
I found when using the
LOCALFILE
storageType, waiting the commit will cost too much time. To speed up, it can be sent commit concurrently by using thread pool.Performance Test Case
Using 1000 executors of Spark, single executor 1g/1core to run TeraSort 1TB.
When using
LOCALFILE
storageType mode, it cost 7.3 min.And then after applying this PR, it cost 6.1 min
Does this PR introduce any user-facing change?
rss.client.data.commit.pool.size
, the default value is assigned shuffle server size.How was this patch tested?
No need