Skip to content

Commit

Permalink
Merge pull request #258 from lizhanhui/share_netty_event_loop
Browse files Browse the repository at this point in the history
feat: use shared Netty event loop for TCP/TCP over TLS/Web Socket
  • Loading branch information
lizhanhui committed Apr 30, 2024
2 parents eee7527 + be643e4 commit 477d478
Showing 1 changed file with 38 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import javax.annotation.PreDestroy;
import org.apache.rocketmq.mqtt.cs.channel.AdaptiveTlsHandler;
import org.apache.rocketmq.mqtt.cs.channel.ConnectHandler;
import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
Expand All @@ -50,11 +51,11 @@

@Service
public class MqttServer {
private static Logger logger = LoggerFactory.getLogger(MqttServer.class);
private static final Logger LOGGER = LoggerFactory.getLogger(MqttServer.class);

private ServerBootstrap serverBootstrap = new ServerBootstrap();
private ServerBootstrap wsServerBootstrap = new ServerBootstrap();
private ServerBootstrap tlsServerBootstrap = new ServerBootstrap();
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
private final ServerBootstrap wsServerBootstrap = new ServerBootstrap();
private final ServerBootstrap tlsServerBootstrap = new ServerBootstrap();

@Resource
private ConnectHandler connectHandler;
Expand All @@ -71,21 +72,39 @@ public class MqttServer {
@Resource
private SslFactory sslFactory;

private NioEventLoopGroup acceptorEventLoopGroup;

private NioEventLoopGroup workerEventLoopGroup;

private AdaptiveTlsHandler adaptiveTlsHandler;

@PostConstruct
public void init() throws Exception {
acceptorEventLoopGroup = new NioEventLoopGroup(connectConf.getNettySelectorThreadNum());
workerEventLoopGroup = new NioEventLoopGroup(connectConf.getNettyWorkerThreadNum());

adaptiveTlsHandler = new AdaptiveTlsHandler(TlsMode.PERMISSIVE, sslFactory);

start();
startWs();
startTls();
}

@PreDestroy
public void shutdown() {
if (null != acceptorEventLoopGroup) {
acceptorEventLoopGroup.shutdownGracefully();
}

if (null != workerEventLoopGroup) {
workerEventLoopGroup.shutdownGracefully();
}
}

private void start() {
int port = connectConf.getMqttPort();
serverBootstrap
.group(new NioEventLoopGroup(connectConf.getNettySelectorThreadNum()), new NioEventLoopGroup(connectConf.getNettyWorkerThreadNum()))
.group(acceptorEventLoopGroup, workerEventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 8 * 1024)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
Expand All @@ -96,14 +115,11 @@ private void start() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("connectHandler", connectHandler);
pipeline.addLast("decoder", new MqttDecoder(connectConf.getMaxPacketSizeInByte()));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("dispatcher", mqttPacketDispatcher);
assembleHandlerPipeline(pipeline);
}
});
serverBootstrap.bind();
logger.warn("start mqtt server , port:{}", port);
LOGGER.info("MQTT server for TCP started, listening: {}", port);
}

private void startTls() {
Expand All @@ -113,7 +129,7 @@ private void startTls() {

int tlsPort = connectConf.getMqttTlsPort();
tlsServerBootstrap
.group(new NioEventLoopGroup(connectConf.getNettySelectorThreadNum()), new NioEventLoopGroup(connectConf.getNettyWorkerThreadNum()))
.group(acceptorEventLoopGroup, workerEventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 8 * 1024)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
Expand All @@ -125,14 +141,18 @@ private void startTls() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("sslHandler", new SslHandler(sslFactory.buildSslEngine(ch)));
pipeline.addLast("connectHandler", connectHandler);
pipeline.addLast("decoder", new MqttDecoder(connectConf.getMaxPacketSizeInByte()));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("dispatcher", mqttPacketDispatcher);
assembleHandlerPipeline(pipeline);
}
});
tlsServerBootstrap.bind();
logger.warn("start mqtt tls server , port:{}", tlsPort);
LOGGER.info("MQTT server for TCP over TLS started, listening: {}", tlsPort);
}

private void assembleHandlerPipeline(ChannelPipeline pipeline) {
pipeline.addLast("connectHandler", connectHandler);
pipeline.addLast("decoder", new MqttDecoder(connectConf.getMaxPacketSizeInByte()));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("dispatcher", mqttPacketDispatcher);
}

/**
Expand All @@ -151,7 +171,7 @@ public void initChannel(SocketChannel ch) throws Exception {
private void startWs() {
int port = connectConf.getMqttWsPort();
wsServerBootstrap
.group(new NioEventLoopGroup(connectConf.getNettySelectorThreadNum()), new NioEventLoopGroup(connectConf.getNettyWorkerThreadNum()))
.group(acceptorEventLoopGroup, workerEventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 8 * 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
Expand All @@ -176,7 +196,7 @@ public void initChannel(SocketChannel ch) throws Exception {
}
});
wsServerBootstrap.bind();
logger.warn("start mqtt ws server , port:{}", port);
LOGGER.info("MQTT server for WebSocket started, listening: {}", port);
}

}

0 comments on commit 477d478

Please sign in to comment.