-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1596] fix(netty): Use a ChannelFutureListener callback mechanism to release readMemory #1605
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1605 +/- ##
============================================
+ Coverage 54.02% 54.85% +0.82%
Complexity 2863 2863
============================================
Files 438 418 -20
Lines 24858 22551 -2307
Branches 2114 2123 +9
============================================
- Hits 13430 12370 -1060
+ Misses 10587 9408 -1179
+ Partials 841 773 -68 ☔ View full report in Codecov by Sentry. |
Test Results 2 340 files ±0 2 340 suites ±0 4h 31m 8s ⏱️ - 1m 0s For more details on these failures and errors, see this check. Results for commit cf50ded. ± Comparison against base commit 3a1b4d2. ♻️ This comment has been updated with latest results. |
new ReleaseMemoryAndRecordReadTimeListener( | ||
start, readBufferSize, data.size(), requestInfo, req, client); | ||
client.getChannel().writeAndFlush(response).addListener(listener); | ||
return; | ||
} catch (Exception e) { | ||
status = StatusCode.INTERNAL_ERROR; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here haven't release the read memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! Done.
new ReleaseMemoryAndRecordReadTimeListener( | ||
start, assumedFileSize, data.size(), requestInfo, req, client); | ||
client.getChannel().writeAndFlush(response).addListener(listener); | ||
return; | ||
} catch (FileNotFoundException indexFileNotFoundException) { | ||
LOG.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
new ReleaseMemoryAndRecordReadTimeListener( | ||
start, length, sdr.getDataLength(), requestInfo, req, client); | ||
client.getChannel().writeAndFlush(response).addListener(listener); | ||
return; | ||
} catch (Exception e) { | ||
status = StatusCode.INTERNAL_ERROR; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} catch (FileNotFoundException indexFileNotFoundException) { | ||
if (shuffleIndexResult != null) { | ||
shuffleIndexResult.release(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The memory requirement is not released. Why not using shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. I've missed it. Done.
} catch (Exception e) { | ||
shuffleServer.getShuffleBufferManager().releaseReadMemory(length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some concern about this. Will it release twice? Because maybe completelistener will release memory when it throws exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, writeAndFlush
is asynchronous, so no exceptions will be catched here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we throw an exception in the future listener?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new RssException("Can not handle request " + request.type());
You mean this code snippet? I think we will never reach here, so it's fine we do anything. It's like you use assert
to make sure something will not happen. And if it happens, it must be a serious bug that needs to be fixed immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this exception will be ignored now. It's not a good way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or I can just log an error message if you prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ERROR log message or assertion is acceptable. It's up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Now we still use Grpc as the default. I think we change default config options when we use Netty as default. |
Because the We actually are using "more" memory. This is the normal behavior. Before this PR, the |
Do you want to change the default value in this PR? |
I get your point. You want to revert the changes to the configurations' default values in this PR. Right? |
Yes. |
Done. Default values are reverted. But I think descriptions still need to be updated. Because the implemention of the code is changed. |
@@ -42,7 +42,7 @@ public class ShuffleServerConf extends RssBaseConf { | |||
.doubleType() | |||
.defaultValue(0.6) | |||
.withDescription( | |||
"JVM heap size * ratio for the maximum memory of buffer manager for shuffle server, this " | |||
"JVM heap size or off-heap size(when enabling Netty) * ratio for the maximum memory of buffer manager for shuffle server, this " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you modify the document, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Updated.
Please update the description, and then I will merge this. |
I think the descriptions of the configs have already been updated in this PR. |
Got it. Thanks for your contribution, merged. |
…nnel is writable (#1641) ### What changes were proposed in this pull request? 1. Send failed responses only when the channel is writable. 2. Print debug logs when the data is successfully sent, reducing log output. 3. Reduce the duplicated error log. ### Why are the changes needed? A follow-up PR for: #1605. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs.
…nnel is writable (#1641) ### What changes were proposed in this pull request? 1. Send failed responses only when the channel is writable. 2. Print debug logs when the data is successfully sent, reducing log output. 3. Reduce the duplicated error log. ### Why are the changes needed? A follow-up PR for: #1605. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs.
What changes were proposed in this pull request?
ChannelFutureListener
and use its callback mechanism to releasereadMemory
only after thewriteAndFlush
method is truly completed.rss.server.buffer.capacity.ratio
andrss.server.read.buffer.capacity.ratio
.Why are the changes needed?
This is actually a bug, which was introduced by PR #879. The issue has been present since the very beginning when the Netty feature was first integrated.
Fix #1596.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
I don't think we need new tests. Tested in our env.
The new log will be: