-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#719] feat(netty): Optimize allocation strategy #739
Conversation
Codecov Report
@@ Coverage Diff @@
## master #739 +/- ##
============================================
+ Coverage 60.73% 62.81% +2.07%
- Complexity 1859 1898 +39
============================================
Files 231 224 -7
Lines 12857 11049 -1808
Branches 1071 1091 +20
============================================
- Hits 7809 6940 -869
+ Misses 4633 3732 -901
+ Partials 415 377 -38
... and 23 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
Outdated
Show resolved
Hide resolved
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
Outdated
Show resolved
Hide resolved
CoordinatorTestUtils.waitForRegister(coordinatorClient,2); | ||
// When the shuffleServerHeartbeat Test is completed before the current test, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 249 in 1021031
shuffleServerConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, SHUFFLE_SERVER_PORT + 5); |
Because this
shuffleServerConf
modifies the netty port number of a server, there will be one less machine allocated. Therefore, in order to maintain the original test, we need to set the port number back to a negative number, which is equivalent to setting the label to GRPC
.
integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
Outdated
Show resolved
Hide resolved
@@ -130,6 +131,11 @@ public static void main(String[] args) { | |||
assignmentTags.addAll(Arrays.asList(rawTags.split(","))); | |||
} | |||
assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION); | |||
if (ClientType.GRPC_NETTY.name().equals(conf.get(RssMRConfig.RSS_CLIENT_TYPE))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also add a conf validator for RSS_CLIENT_TYPE here?
Then I think we can simply call assignmentTags.add(conf.get(RssMRConfig.RSS_CLIENT_TYPE))
here?
common/src/main/java/org/apache/uniffle/common/config/RssConf.java
Outdated
Show resolved
Hide resolved
@@ -122,4 +126,11 @@ public static void validateTestModeConf(boolean testMode, String storageType) { | |||
+ "because of the poor performance of these two types."); | |||
} | |||
} | |||
|
|||
public static void validateClientType(String clientType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we convert this method to CheckValue#checkValueFunc
like ConfigUtil#POSITIVE_LONG_VALIDATOR
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, i will try it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem to work because there is no checkValue
on mr's client side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got it. My mistake.
Field field = clazz.getDeclaredField("settings"); | ||
field.setAccessible(true); | ||
originConf.remove(ShuffleServerConf.NETTY_SERVER_PORT.key()); | ||
field.set(shuffleServerConf, originConf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need set the value again. They are the same object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @smallzhongfeng @advancedxy
Thanks for your review! @jerqi @advancedxy |
### What changes were proposed in this pull request? Users can choose to use netty's transmission method or grpc's through client configuration. ### Why are the changes needed? Fix: apache#719 ### Does this PR introduce _any_ user-facing change? No. However, if users want to use `netty` as a data transfer method, they need to enable `spark.rss.client.type=GRPC_ NETTY` or `mapreduce.rss.client.type=GRPC_ NETTY` ### How was this patch tested? New uts.
### What changes were proposed in this pull request? Users can choose to use netty's transmission method or grpc's through client configuration. ### Why are the changes needed? Fix: apache#719 ### Does this PR introduce _any_ user-facing change? No. However, if users want to use `netty` as a data transfer method, they need to enable `spark.rss.client.type=GRPC_ NETTY` or `mapreduce.rss.client.type=GRPC_ NETTY` ### How was this patch tested? New uts.
What changes were proposed in this pull request?
Users can choose to use netty's transmission method or grpc's through client configuration.
Why are the changes needed?
Fix: #719
Does this PR introduce any user-facing change?
No. However, if users want to use
netty
as a data transfer method, they need to enablespark.rss.client.type=GRPC_ NETTY
ormapreduce.rss.client.type=GRPC_ NETTY
How was this patch tested?
New uts.