-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#133] feat(netty): Add StreamServer. #718
Conversation
Codecov Report
@@ Coverage Diff @@
## master #718 +/- ##
============================================
+ Coverage 60.60% 62.92% +2.32%
- Complexity 1849 1860 +11
============================================
Files 229 217 -12
Lines 12749 10898 -1851
Branches 1064 1073 +9
============================================
- Hits 7727 6858 -869
+ Misses 4611 3686 -925
+ Partials 411 354 -57
... and 14 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@@ -404,6 +404,48 @@ public class ShuffleServerConf extends RssBaseConf { | |||
.defaultValue(-1) | |||
.withDescription("Shuffle netty server port"); | |||
|
|||
public static final ConfigOption<Boolean> NETTY_SERVER_ENABLED = ConfigOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this config option. If netty port is set, we should enable Netty server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok for me.
.defaultValue(false) | ||
.withDescription("If enable netty server"); | ||
|
||
public static final ConfigOption<Boolean> SERVER_UPLOAD_EPOLL_ENABLE = ConfigOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the meaning of upload
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already Fixed.
|
||
private static final Logger logger = LoggerFactory.getLogger(StreamServerInitDecoder.class); | ||
|
||
private ShuffleServer shuffleServer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need ShuffleServer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, StreamServerInitDecoder needs ShuffleServer to obtain taskManager, etc. I will delete it in this PR, and add it to subsequent PRs.
@smallzhongfeng Could you help me review this pr? |
ch.pipeline().addLast(handlerSupplier.get()); | ||
} | ||
}) | ||
.option(ChannelOption.SO_BACKLOG, backlogSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add Option TCP_NODELAY
, this parameter should be used to send packets this time. It does not need to use Nagle algorithm to reduce the delay time.
channelFuture.channel().close().awaitUninterruptibly(10L, TimeUnit.SECONDS); | ||
channelFuture = null; | ||
} | ||
shuffleBossGroup.shutdownGracefully(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShuffleBossGroup
determines whether it is empty ?
|
||
addDecoder(ctx, magicByte); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this blank line.
@smallzhongfeng All done. |
@@ -221,6 +230,10 @@ private void initialization() throws Exception { | |||
shuffleBufferManager = new ShuffleBufferManager(shuffleServerConf, shuffleFlushManager); | |||
shuffleTaskManager = new ShuffleTaskManager(shuffleServerConf, shuffleFlushManager, | |||
shuffleBufferManager, storageManager); | |||
nettyServerEnabled = shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) > 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe >= 0?
@jerqi could you also add port=0 for random port binding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We set up an issue first.
.key("rss.server.netty.epoll.enable") | ||
.booleanType() | ||
.defaultValue(false) | ||
.withDescription("If enable epoll model with netty server"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: whether to enable epoll mode with netty server?
Also, could you add more description about how epoll mode diffs with normal mode?
You can update the docs in the final PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EPOLL is more suitable for scenarios with a large number of connections, but we need more tests to verify the difference between nio and epoll in uniffle usage scenarios. It is difficult to give a suggestion at present, so the default value is nio.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is difficult to give a suggestion at present, so the default value is nio.
That's OK. Let's add a todo in the final PR, which I think will includes a comprehensive docs.
public static final ConfigOption<Integer> NETTY_SERVER_CONNECT_BACKLOG = ConfigOptions | ||
.key("rss.server.netty.connect.backlog") | ||
.intType() | ||
.defaultValue(1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is 1000 enough?
.defaultValue(-1) | ||
.withDescription("the optimal size for send buffer(SO_SNDBUF) " | ||
+ "should be latency * network_bandwidth. Assuming latency = 1ms," | ||
+ "network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some desc about what the default value -1
means?
.key("rss.server.netty.receive.buf") | ||
.intType() | ||
.defaultValue(-1) | ||
.withDescription("the optimal size for receive buffer(SO_RCVBUF) " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
ServerBootstrap serverBootstrap = bossGroup instanceof EpollEventLoopGroup | ||
? new ServerBootstrap().group(bossGroup, workerGroup) | ||
.channel(EpollServerSocketChannel.class) | ||
: new ServerBootstrap().group(bossGroup, workerGroup) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the indentation looks a bit weird..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@advancedxy All done. |
@jerqi @advancedxy Can you help review plz? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @advancedxy @leixm @smallzhongfeng merged.
### What changes were proposed in this pull request? Add StreamServer for netty replace grpc. ### Why are the changes needed? Add StreamServer. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT. Co-authored-by: leixianming <leixianming@didiglobal.com>
### What changes were proposed in this pull request? Add StreamServer for netty replace grpc. ### Why are the changes needed? Add StreamServer. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT. Co-authored-by: leixianming <leixianming@didiglobal.com>
What changes were proposed in this pull request?
Add StreamServer for netty replace grpc.
Why are the changes needed?
Add StreamServer.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT.