diff --git a/.gitignore b/.gitignore index 1b3636387..cd3c635b0 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ java-sdk.iml .project .settings/ bin/ +out/ ## integration test files nodes/ diff --git a/src/integration-test/java/org/fisco/bcos/sdk/channel/ChannelTest.java b/src/integration-test/java/org/fisco/bcos/sdk/channel/ChannelTest.java index 1a8e1f646..1f931f03f 100644 --- a/src/integration-test/java/org/fisco/bcos/sdk/channel/ChannelTest.java +++ b/src/integration-test/java/org/fisco/bcos/sdk/channel/ChannelTest.java @@ -15,21 +15,33 @@ package org.fisco.bcos.sdk.channel; +import com.fasterxml.jackson.core.JsonProcessingException; import io.netty.channel.ChannelHandlerContext; +import org.fisco.bcos.sdk.channel.model.EnumChannelProtocolVersion; +import org.fisco.bcos.sdk.channel.model.HeartBeatParser; +import org.fisco.bcos.sdk.channel.model.NodeHeartbeat; +import org.fisco.bcos.sdk.channel.model.Options; import org.fisco.bcos.sdk.model.Message; import org.fisco.bcos.sdk.model.MsgType; +import org.fisco.bcos.sdk.model.Response; import org.fisco.bcos.sdk.network.MsgHandler; +import org.fisco.bcos.sdk.utils.ChannelUtils; +import org.fisco.bcos.sdk.utils.ObjectMapperFactory; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + import static org.junit.Assert.fail; public class ChannelTest { + private Logger logger = LoggerFactory.getLogger(ChannelImp.class); + private Channel channel; + @Test public void testConnect() { - Logger logger = LoggerFactory.getLogger(ChannelImp.class); - Channel channel = Channel.build("src/integration-test/resources/config-example.yaml"); + channel = Channel.build("src/integration-test/resources/config-example.yaml"); class TestMsgHandler implements MsgHandler { @Override public void onConnect(ChannelHandlerContext ctx) { @@ -50,6 +62,7 @@ public void onDisconnect(ChannelHandlerContext ctx) { channel.addDisconnectHandler(testMsgHandler); try{ channel.start(); + sendMessage(); Thread.sleep(10000); channel.stop(); } catch (Exception e) { @@ -57,4 +70,55 @@ public void onDisconnect(ChannelHandlerContext ctx) { fail("Exception is not expected"); } } + + // use heart beat for case to send + private void sendMessage() { + List peers = channel.getAvailablePeer(); + if (peers.size() == 0) { + fail("Empty available peer"); + } + String host = peers.get(0); + Message message = new Message(); + try { + message.setSeq(ChannelUtils.newSeq()); + message.setResult(0); + message.setType(Short.valueOf((short) MsgType.CLIENT_HEARTBEAT.getType())); + HeartBeatParser heartBeatParser = new HeartBeatParser(EnumChannelProtocolVersion.VERSION_1); + message.setData(heartBeatParser.encode("0")); + logger.trace( + "encodeHeartbeatToMessage, seq: {}, content: {}, messageType: {}", + message.getSeq(), + heartBeatParser.toString(), + message.getType()); + } catch (JsonProcessingException e) { + logger.error( + "sendHeartbeatMessage failed for decode the message exception, errorMessage: {}", + e.getMessage()); + return; + } + + ResponseCallback callback = + new ResponseCallback() { + @Override + public void onResponse(Response response) { + try { + NodeHeartbeat nodeHeartbeat = + ObjectMapperFactory.getObjectMapper() + .readValue(response.getContent(), NodeHeartbeat.class); + int heartBeat = nodeHeartbeat.getHeartBeat(); + logger.trace(" heartbeat packet in ChannelTest, heartbeat is {} ", heartBeat); + if (heartBeat != 1) { + fail("heartbeat packet in ChannelTest fail"); + } + } catch (Exception e) { + fail(" channel protocol heartbeat failed, exception: " + e.getMessage()); + } + } + }; + + logger.info(" test sendToPeer"); + channel.sendToPeer(message, host); + logger.info(" test asyncSendToPeer"); + channel.asyncSendToPeer(message, host, callback, new Options()); + } } diff --git a/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java b/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java index e70f41c73..ddf90afd1 100644 --- a/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java +++ b/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java @@ -27,7 +27,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.fisco.bcos.sdk.channel.model.ChannelMessageError; import org.fisco.bcos.sdk.channel.model.ChannelPrococolExceiption; import org.fisco.bcos.sdk.channel.model.EnumChannelProtocolVersion; import org.fisco.bcos.sdk.channel.model.HeartBeatParser; @@ -57,6 +59,8 @@ public class ChannelImp implements Channel { private static Logger logger = LoggerFactory.getLogger(ChannelImp.class); + private Integer connectSeconds = 30; + private Integer connectSleepPerMillis = 30; private ChannelMsgHandler msgHandler; private Network network; @@ -79,13 +83,57 @@ public ChannelImp(String filepath) { public void start() { try { network.start(); - startPeriodTask(); + checkConnectionsToStartPeriodTask(); } catch (NetworkException e) { logger.error("init channel network error, {} ", e.getMessage()); } } + private void checkConnectionsToStartPeriodTask() { + try { + int sleepTime = 0; + while (true) { + if (getAvailablePeer().size() > 0 || sleepTime > connectSeconds * 1000) { + break; + } else { + Thread.sleep(connectSleepPerMillis); + sleepTime += connectSleepPerMillis; + } + } + + List peers = getAvailablePeer(); + String connectionInfoStr = ""; + for (String peer : peers) { + connectionInfoStr += peer + ", "; + } + + String baseMessage = + " nodes: " + + connectionInfoStr + + "java version: " + + System.getProperty("java.version") + + " ,java vendor: " + + System.getProperty("java.vm.vendor"); + + if (getAvailablePeer().size() == 0) { + String errorMessage = " Failed to connect to " + baseMessage; + logger.error(errorMessage); + throw new Exception(errorMessage); + } + + logger.info(" Connect to " + baseMessage); + + startPeriodTask(); + } catch (InterruptedException e) { + logger.warn(" thread interrupted exception: ", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + logger.error(" service init failed, error message: {}, error: ", e.getMessage(), e); + } + } + private void startPeriodTask() { + /** periodically send heartbeat message to all connected node, default period : 2s */ scheduledExecutorService.scheduleAtFixedRate( () -> broadcastHeartbeat(), 0, heartBeatDelay, TimeUnit.MILLISECONDS); } @@ -264,25 +312,33 @@ public void asyncSendToPeer( Message out, String peerIpPort, ResponseCallback callback, Options options) { msgHandler.addSeq2CallBack(out.getSeq(), callback); ChannelHandlerContext ctx = msgHandler.getAvailablePeer().get(peerIpPort); - if (options.getTimeout() > 0) { - callback.setTimeout( - timeoutHandler.newTimeout( - new TimerTask() { - @Override - public void run(Timeout timeout) { - // handle timer - callback.onTimeout(); - msgHandler.removeSeq(out.getSeq()); - } - }, - options.getTimeout(), - TimeUnit.MILLISECONDS)); - } if (ctx != null) { + if (options.getTimeout() > 0) { + callback.setTimeout( + timeoutHandler.newTimeout( + new TimerTask() { + @Override + public void run(Timeout timeout) { + // handle timer + callback.onTimeout(); + msgHandler.removeSeq(out.getSeq()); + } + }, + options.getTimeout(), + TimeUnit.MILLISECONDS)); + } ctx.writeAndFlush(out); logger.debug("send message to {} success ", peerIpPort); } else { logger.debug("send message to {} failed ", peerIpPort); + Response response = new Response(); + response.setErrorCode(ChannelMessageError.CONNECTION_INVALID.getError()); + response.setErrorMessage( + "The connection to peer " + + ChannelVersionNegotiation.getPeerHost(ctx) + + " is invalid."); + response.setContent(""); + callback.onResponse(response); } } @@ -396,4 +452,8 @@ public void onResponse(Response response) { ctx.writeAndFlush(message); msgHandler.addSeq2CallBack(seq, callback); } + + public void setThreadPool(ThreadPoolExecutor threadPool) { + network.setMsgHandleThreadPool(threadPool); + } } diff --git a/src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java b/src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java index 2c7c6d488..7f040825d 100644 --- a/src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java +++ b/src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java @@ -99,8 +99,9 @@ public void onConnect(ChannelHandlerContext ctx) { @Override public void onMessage(ChannelHandlerContext ctx, Message msg) { logger.debug( - "onMessage in ChannelMsgHandler called, host : {}, msgType : {}", + "onMessage in ChannelMsgHandler called, host : {}, seq : {}, msgType : {}", ChannelVersionNegotiation.getPeerHost(ctx), + msg.getSeq(), (int) msg.getType()); ResponseCallback callback = getAndRemoveSeq(msg.getSeq()); @@ -124,6 +125,11 @@ public void onMessage(ChannelHandlerContext ctx, Message msg) { response.setContent(new String(msg.getData())); callback.onResponse(response); } else { + logger.trace( + " receive response with invalid seq, type: {}, result: {}, content: {}", + (int) msg.getType(), + msg.getResult(), + new String(msg.getData())); MsgHandler msgHandler = msgHandlers.get(msg.getType()); msgHandler.onMessage(ctx, msg); } @@ -219,9 +225,6 @@ public void onResponse(Response response) { if (disconnect) { ctx.disconnect(); ctx.close(); - } else { - String host = ChannelVersionNegotiation.getPeerHost(ctx); - addAvailablePeer(host, ctx); } } }; @@ -300,6 +303,9 @@ public void onResponse(Response response) { if (disconnect) { ctx.disconnect(); ctx.close(); + } else { + String host = ChannelVersionNegotiation.getPeerHost(ctx); + addAvailablePeer(host, ctx); } } }; diff --git a/src/main/java/org/fisco/bcos/sdk/channel/model/ChannelMessageError.java b/src/main/java/org/fisco/bcos/sdk/channel/model/ChannelMessageError.java index e0440af74..6788ee0ff 100644 --- a/src/main/java/org/fisco/bcos/sdk/channel/model/ChannelMessageError.java +++ b/src/main/java/org/fisco/bcos/sdk/channel/model/ChannelMessageError.java @@ -2,7 +2,8 @@ public enum ChannelMessageError { MESSAGE_TIMEOUT(102), // timeout - INTERNAL_MESSAGE_HANDLE_FAILED(-5000); + INTERNAL_MESSAGE_HANDLE_FAILED(-5000), + CONNECTION_INVALID(-5001); private int error;