Skip to content

Commit

Permalink
[#133] feat(netty): Add StreamServer. (#718)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add StreamServer for netty replace grpc.

### Why are the changes needed?
Add StreamServer.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

UT.

Co-authored-by: leixianming <leixianming@didiglobal.com>
  • Loading branch information
leixm and leixianming committed Mar 15, 2023
1 parent ea4f8c4 commit 46f3f7e
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 0 deletions.
5 changes: 5 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.uniffle.common.web.CommonMetricsServlet;
import org.apache.uniffle.common.web.JettyServer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.netty.StreamServer;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
import org.apache.uniffle.storage.util.StorageType;
Expand Down Expand Up @@ -92,6 +93,8 @@ public class ShuffleServer {
private volatile boolean running;
private ExecutorService executorService;
private Future<?> decommissionFuture;
private boolean nettyServerEnabled;
private StreamServer streamServer;

public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
this.shuffleServerConf = shuffleServerConf;
Expand Down Expand Up @@ -124,6 +127,9 @@ public void start() throws Exception {
registerHeartBeat.startHeartBeat();
jettyServer.start();
server.start();
if (nettyServerEnabled) {
streamServer.start();
}

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
Expand Down Expand Up @@ -164,6 +170,9 @@ public void stopServer() throws Exception {
}
SecurityContextFactory.get().getSecurityContext().close();
server.stop();
if (nettyServerEnabled && streamServer != null) {
streamServer.stop();
}
if (executorService != null) {
executorService.shutdownNow();
}
Expand Down Expand Up @@ -221,6 +230,10 @@ private void initialization() throws Exception {
shuffleBufferManager = new ShuffleBufferManager(shuffleServerConf, shuffleFlushManager);
shuffleTaskManager = new ShuffleTaskManager(shuffleServerConf, shuffleFlushManager,
shuffleBufferManager, storageManager);
nettyServerEnabled = shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
if (nettyServerEnabled) {
streamServer = new StreamServer(this);
}

setServer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,61 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(-1)
.withDescription("Shuffle netty server port");

public static final ConfigOption<Boolean> NETTY_SERVER_EPOLL_ENABLE = ConfigOptions
.key("rss.server.netty.epoll.enable")
.booleanType()
.defaultValue(false)
.withDescription("If enable epoll model with netty server");

public static final ConfigOption<Integer> NETTY_SERVER_ACCEPT_THREAD = ConfigOptions
.key("rss.server.netty.accept.thread")
.intType()
.defaultValue(10)
.withDescription("Accept thread count in netty");

public static final ConfigOption<Integer> NETTY_SERVER_WORKER_THREAD = ConfigOptions
.key("rss.server.netty.worker.thread")
.intType()
.defaultValue(100)
.withDescription("Worker thread count in netty");

public static final ConfigOption<Long> SERVER_NETTY_HANDLER_IDLE_TIMEOUT = ConfigOptions
.key("rss.server.netty.handler.idle.timeout")
.longType()
.defaultValue(60000L)
.withDescription("Idle timeout if there has not data");

public static final ConfigOption<Integer> NETTY_SERVER_CONNECT_BACKLOG = ConfigOptions
.key("rss.server.netty.connect.backlog")
.intType()
.defaultValue(0)
.withDescription("For netty server, requested maximum length of the queue of incoming connections. "
+ "Default 0 for no backlog.");

public static final ConfigOption<Integer> NETTY_SERVER_CONNECT_TIMEOUT = ConfigOptions
.key("rss.server.netty.connect.timeout")
.intType()
.defaultValue(5000)
.withDescription("Timeout for connection in netty");

public static final ConfigOption<Integer> NETTY_SERVER_SEND_BUF = ConfigOptions
.key("rss.server.netty.send.buf")
.intType()
.defaultValue(0)
.withDescription("the optimal size for send buffer(SO_SNDBUF) "
+ "should be latency * network_bandwidth. Assuming latency = 1ms,"
+ "network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB."
+ "Default is 0, OS will dynamically adjust the buf size.");

public static final ConfigOption<Integer> NETTY_SERVER_RECEIVE_BUF = ConfigOptions
.key("rss.server.netty.receive.buf")
.intType()
.defaultValue(0)
.withDescription("the optimal size for receive buffer(SO_RCVBUF) "
+ "should be latency * network_bandwidth. Assuming latency = 1ms,"
+ "network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB."
+ "Default is 0, OS will dynamically adjust the buf size.");

public ShuffleServerConf() {
}

Expand Down
142 changes: 142 additions & 0 deletions server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.server.netty;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.netty.decoder.StreamServerInitDecoder;

public class StreamServer {

private static final Logger LOG = LoggerFactory.getLogger(StreamServer.class);

private ShuffleServer shuffleServer;
private EventLoopGroup shuffleBossGroup;
private EventLoopGroup shuffleWorkerGroup;
private ShuffleServerConf shuffleServerConf;
private ChannelFuture channelFuture;

public StreamServer(ShuffleServer shuffleServer) {
this.shuffleServer = shuffleServer;
this.shuffleServerConf = shuffleServer.getShuffleServerConf();
boolean isEpollEnable = shuffleServerConf.getBoolean(ShuffleServerConf.NETTY_SERVER_EPOLL_ENABLE);
int acceptThreads = shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_ACCEPT_THREAD);
int workerThreads = shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_WORKER_THREAD);
if (isEpollEnable) {
shuffleBossGroup = new EpollEventLoopGroup(acceptThreads);
shuffleWorkerGroup = new EpollEventLoopGroup(workerThreads);
} else {
shuffleBossGroup = new NioEventLoopGroup(acceptThreads);
shuffleWorkerGroup = new NioEventLoopGroup(workerThreads);
}
}

private ServerBootstrap bootstrapChannel(
EventLoopGroup bossGroup,
EventLoopGroup workerGroup,
int backlogSize,
int timeoutMillis,
int sendBuf,
int receiveBuf,
Supplier<ChannelHandler[]> handlerSupplier) {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup);
if (bossGroup instanceof EpollEventLoopGroup) {
serverBootstrap.channel(EpollServerSocketChannel.class);
} else {
serverBootstrap.channel(NioServerSocketChannel.class);
}

serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(final SocketChannel ch) {
ch.pipeline().addLast(handlerSupplier.get());
}
})
.option(ChannelOption.SO_BACKLOG, backlogSize)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);

if (sendBuf > 0) {
serverBootstrap.childOption(ChannelOption.SO_SNDBUF, sendBuf);
}
if (receiveBuf > 0) {
serverBootstrap.childOption(ChannelOption.SO_RCVBUF, receiveBuf);
}
return serverBootstrap;
}

public void start() {
Supplier<ChannelHandler[]> streamHandlers = () -> new ChannelHandler[]{
new StreamServerInitDecoder()
};
ServerBootstrap serverBootstrap = bootstrapChannel(shuffleBossGroup, shuffleWorkerGroup,
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_CONNECT_BACKLOG),
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_CONNECT_TIMEOUT),
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_SEND_BUF),
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_RECEIVE_BUF),
streamHandlers);

// Bind the ports and save the results so that the channels can be closed later.
// If the second bind fails, the first one gets cleaned up in the shutdown.
int port = shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
try {
channelFuture = serverBootstrap.bind(port);
channelFuture.syncUninterruptibly();
LOG.info("bind localAddress is " + channelFuture.channel().localAddress());
LOG.info("Start stream server successfully with port " + port);
} catch (Exception e) {
ExitUtils.terminate(1, "Fail to start stream server", e, LOG);
}
}

public void stop() {
if (channelFuture != null) {
channelFuture.channel().close().awaitUninterruptibly(10L, TimeUnit.SECONDS);
channelFuture = null;
}
if (shuffleBossGroup != null) {
shuffleBossGroup.shutdownGracefully();
shuffleWorkerGroup.shutdownGracefully();
shuffleBossGroup = null;
shuffleWorkerGroup = null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.server.netty.decoder;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

public class StreamServerInitDecoder extends ByteToMessageDecoder {

public StreamServerInitDecoder() {
}

private void addDecoder(ChannelHandlerContext ctx, byte type) {

}

@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf in,
List<Object> out) {
if (in.readableBytes() < Byte.BYTES) {
return;
}
in.markReaderIndex();
byte magicByte = in.readByte();
in.resetReaderIndex();

addDecoder(ctx, magicByte);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,26 @@ private ShuffleServerConf createShuffleServerConf() throws Exception {
serverConf.setLong(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY, 10);
return serverConf;
}

@Test
public void nettyServerTest() throws Exception {
ShuffleServerConf serverConf = createShuffleServerConf();
serverConf.set(ShuffleServerConf.NETTY_SERVER_PORT, 29999);
ShuffleServer ss1 = new ShuffleServer(serverConf);
ss1.start();
ExitUtils.disableSystemExit();
serverConf.set(ShuffleServerConf.RPC_SERVER_PORT, 19997);
serverConf.set(ShuffleServerConf.JETTY_HTTP_PORT, 19996);
ShuffleServer ss2 = new ShuffleServer(serverConf);
String expectMessage = "Fail to start stream server";
final int expectStatus = 1;
try {
ss2.start();
} catch (Exception e) {
assertEquals(expectMessage, e.getMessage());
assertEquals(expectStatus, ((ExitException) e).getStatus());
return;
}
fail();
}
}

0 comments on commit 46f3f7e

Please sign in to comment.