-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#133] feat(netty): Introduce ShuffleServerGrpcNettyClient. #839
Conversation
Codecov Report
@@ Coverage Diff @@
## master #839 +/- ##
============================================
+ Coverage 57.09% 58.73% +1.64%
- Complexity 2144 2175 +31
============================================
Files 321 307 -14
Lines 15648 13617 -2031
Branches 1243 1262 +19
============================================
- Hits 8934 7998 -936
+ Misses 6206 5184 -1022
+ Partials 508 435 -73
... and 43 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
cc @smallzhongfeng PTAL. |
It seems a bit messy at the moment. Some codes use ByteBuffer, such as FixedSizeSegmentSplitter and ShuffleReadClientImpl, and some codes use ByteBuf, such as netty client or protocol objects. At present, I prefer to use ByteBuf in nettyclient, and keep ByteBuffer in other places. |
+1 for this change. but also cc @jerqi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for some minor changes.
long start = System.currentTimeMillis(); | ||
RpcResponse rpcResponse = transportClient.sendRpcSync(sendShuffleDataRequest, RPC_TIMEOUT_DEFAULT_MS); | ||
LOG.debug("Do sendShuffleData to {}:{} rpc cost:" + (System.currentTimeMillis() - start) | ||
+ " ms for " + allocateSize + " bytes with " + finalBlockNum + " blocks", host, port); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: code Indent
+ " ms for " + allocateSize + " bytes with " + finalBlockNum + " blocks", host, port); | ||
if (rpcResponse.getStatusCode() != StatusCode.SUCCESS) { | ||
String msg = "Can't send shuffle data with " + finalBlockNum | ||
+ " blocks to " + host + ":" + port |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: code Indent
request.getExpectedTaskIds() | ||
); | ||
String requestInfo = "appId[" + request.getAppId() | ||
+ "], shuffleId[" + request.getShuffleId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
OK for me. |
@@ -79,7 +80,8 @@ public ShuffleReadClientImpl( | |||
IdHelper idHelper, | |||
ShuffleDataDistributionType dataDistributionType, | |||
boolean expectedTaskIdsBitmapFilterEnable, | |||
boolean offHeapEnabled) { | |||
boolean offHeapEnabled, | |||
RssConf rssConf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can reduce some parameters of constructor method if we pass the rssConf to the ShuffleReadClientImpl
actually.
@smallzhongfeng @jerqi PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @leixm
What changes were proposed in this pull request?
Introduce ShuffleServerGrpcNettyClient.
Why are the changes needed?
for #133
Does this PR introduce any user-facing change?
No.
How was this patch tested?
existing UTs.