diff --git a/src/main/java/org/fisco/bcos/sdk/network/ChannelHandler.java b/src/main/java/org/fisco/bcos/sdk/network/ChannelHandler.java index 954fdc86f..61b8e101a 100644 --- a/src/main/java/org/fisco/bcos/sdk/network/ChannelHandler.java +++ b/src/main/java/org/fisco/bcos/sdk/network/ChannelHandler.java @@ -23,6 +23,7 @@ import io.netty.handler.ssl.SslHandshakeCompletionEvent; import io.netty.handler.timeout.IdleStateEvent; import java.util.Objects; +import java.util.concurrent.ExecutorService; import org.fisco.bcos.sdk.model.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,11 @@ public class ChannelHandler extends SimpleChannelInboundHandler { private static Logger logger = LoggerFactory.getLogger(ChannelHandler.class); private MsgHandler msgHandler; private ConnectionManager connectionManager; + private ExecutorService msgHandleThreadPool; + + public void setMsgHandleThreadPool(ExecutorService msgHandleThreadPool) { + this.msgHandleThreadPool = msgHandleThreadPool; + } public ChannelHandler(ConnectionManager connManager, MsgHandler msgHandler) { this.msgHandler = msgHandler; @@ -140,7 +146,18 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) { final ChannelHandlerContext ctxF = ctx; final Message in = (Message) msg; - msgHandler.onMessage(ctxF, in); + + if (msgHandleThreadPool == null) { + msgHandler.onMessage(ctxF, in); + } else { + msgHandleThreadPool.execute( + new Runnable() { + @Override + public void run() { + msgHandler.onMessage(ctxF, in); + } + }); + } } @Override diff --git a/src/main/java/org/fisco/bcos/sdk/network/ConnectionManager.java b/src/main/java/org/fisco/bcos/sdk/network/ConnectionManager.java index fa7874e8e..6c02d71da 100644 --- a/src/main/java/org/fisco/bcos/sdk/network/ConnectionManager.java +++ b/src/main/java/org/fisco/bcos/sdk/network/ConnectionManager.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -177,6 +178,10 @@ private void reconnect() { } } + public void setMsgHandleThreadPool(ExecutorService msgHandleThreadPool) { + channelHandler.setMsgHandleThreadPool(msgHandleThreadPool); + } + public List getConnectionInfoList() { return connectionInfoList; } diff --git a/src/main/java/org/fisco/bcos/sdk/network/Network.java b/src/main/java/org/fisco/bcos/sdk/network/Network.java index cc0c1d4c7..a101eed72 100644 --- a/src/main/java/org/fisco/bcos/sdk/network/Network.java +++ b/src/main/java/org/fisco/bcos/sdk/network/Network.java @@ -18,6 +18,7 @@ import io.netty.channel.ChannelHandlerContext; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; import org.fisco.bcos.sdk.config.ConfigOption; import org.fisco.bcos.sdk.model.Message; @@ -77,6 +78,13 @@ static Network build(ConfigOption config, MsgHandler handler) { */ void removeConnection(String peerIpPort); + /** + * Set thread pool + * + * @param threadPool + */ + void setMsgHandleThreadPool(ThreadPoolExecutor threadPool); + /** Exit gracefully */ void stop(); } diff --git a/src/main/java/org/fisco/bcos/sdk/network/NetworkImp.java b/src/main/java/org/fisco/bcos/sdk/network/NetworkImp.java index 7620b3385..0fab7ac93 100644 --- a/src/main/java/org/fisco/bcos/sdk/network/NetworkImp.java +++ b/src/main/java/org/fisco/bcos/sdk/network/NetworkImp.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ThreadPoolExecutor; import org.fisco.bcos.sdk.config.ConfigOption; import org.fisco.bcos.sdk.model.Message; import org.slf4j.Logger; @@ -66,7 +67,6 @@ public List getConnectionInfo() { @Override public void start() throws NetworkException { - // set ThreadPool ?? connManager.startConnect(); connManager.startReconnectSchedule(); } @@ -81,6 +81,11 @@ public void removeConnection(String peerIpPort) { connManager.removeConnection(peerIpPort); } + @Override + public void setMsgHandleThreadPool(ThreadPoolExecutor threadPool) { + connManager.setMsgHandleThreadPool(threadPool); + } + @Override public void stop() { connManager.stopReconnectSchedule();