Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/main/java/org/fisco/bcos/sdk/network/ChannelHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import org.fisco.bcos.sdk.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,6 +34,11 @@ public class ChannelHandler extends SimpleChannelInboundHandler<Message> {
private static Logger logger = LoggerFactory.getLogger(ChannelHandler.class);
private MsgHandler msgHandler;
private ConnectionManager connectionManager;
private ExecutorService msgHandleThreadPool;

public void setMsgHandleThreadPool(ExecutorService msgHandleThreadPool) {
this.msgHandleThreadPool = msgHandleThreadPool;
}

public ChannelHandler(ConnectionManager connManager, MsgHandler msgHandler) {
this.msgHandler = msgHandler;
Expand Down Expand Up @@ -140,7 +146,18 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final ChannelHandlerContext ctxF = ctx;
final Message in = (Message) msg;
msgHandler.onMessage(ctxF, in);

if (msgHandleThreadPool == null) {
msgHandler.onMessage(ctxF, in);
} else {
msgHandleThreadPool.execute(
new Runnable() {
@Override
public void run() {
msgHandler.onMessage(ctxF, in);
}
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -177,6 +178,10 @@ private void reconnect() {
}
}

public void setMsgHandleThreadPool(ExecutorService msgHandleThreadPool) {
channelHandler.setMsgHandleThreadPool(msgHandleThreadPool);
}

public List<ConnectionInfo> getConnectionInfoList() {
return connectionInfoList;
}
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/fisco/bcos/sdk/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import org.fisco.bcos.sdk.config.ConfigOption;
import org.fisco.bcos.sdk.model.Message;

Expand Down Expand Up @@ -77,6 +78,13 @@ static Network build(ConfigOption config, MsgHandler handler) {
*/
void removeConnection(String peerIpPort);

/**
* Set thread pool
*
* @param threadPool
*/
void setMsgHandleThreadPool(ThreadPoolExecutor threadPool);

/** Exit gracefully */
void stop();
}
7 changes: 6 additions & 1 deletion src/main/java/org/fisco/bcos/sdk/network/NetworkImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
import org.fisco.bcos.sdk.config.ConfigOption;
import org.fisco.bcos.sdk.model.Message;
import org.slf4j.Logger;
Expand Down Expand Up @@ -66,7 +67,6 @@ public List<ConnectionInfo> getConnectionInfo() {

@Override
public void start() throws NetworkException {
// set ThreadPool ??
connManager.startConnect();
connManager.startReconnectSchedule();
}
Expand All @@ -81,6 +81,11 @@ public void removeConnection(String peerIpPort) {
connManager.removeConnection(peerIpPort);
}

@Override
public void setMsgHandleThreadPool(ThreadPoolExecutor threadPool) {
connManager.setMsgHandleThreadPool(threadPool);
}

@Override
public void stop() {
connManager.stopReconnectSchedule();
Expand Down