From 4b6fee40e4ac8dbbeedc69346f8865afdaa74488 Mon Sep 17 00:00:00 2001 From: chaychen2005 <605966652@qq.com> Date: Tue, 28 Jul 2020 15:32:13 +0800 Subject: [PATCH] fix channel bug --- .../fisco/bcos/sdk/channel/ChannelTest.java | 13 ++- .../org/fisco/bcos/sdk/channel/Channel.java | 4 + .../fisco/bcos/sdk/channel/ChannelImp.java | 109 +++++++++++++++++- .../bcos/sdk/channel/ChannelMsgHandler.java | 105 +++-------------- .../sdk/channel/model/ChannelRequest.java | 53 +++++++++ 5 files changed, 193 insertions(+), 91 deletions(-) create mode 100644 src/main/java/org/fisco/bcos/sdk/channel/model/ChannelRequest.java diff --git a/src/integration-test/java/org/fisco/bcos/sdk/channel/ChannelTest.java b/src/integration-test/java/org/fisco/bcos/sdk/channel/ChannelTest.java index 7eeb62fee..1a8e1f646 100644 --- a/src/integration-test/java/org/fisco/bcos/sdk/channel/ChannelTest.java +++ b/src/integration-test/java/org/fisco/bcos/sdk/channel/ChannelTest.java @@ -16,7 +16,6 @@ package org.fisco.bcos.sdk.channel; import io.netty.channel.ChannelHandlerContext; -import org.fisco.bcos.sdk.config.ConfigException; import org.fisco.bcos.sdk.model.Message; import org.fisco.bcos.sdk.model.MsgType; import org.fisco.bcos.sdk.network.MsgHandler; @@ -24,9 +23,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.fail; + public class ChannelTest { @Test - public void testConnect() throws ConfigException { + public void testConnect() { Logger logger = LoggerFactory.getLogger(ChannelImp.class); Channel channel = Channel.build("src/integration-test/resources/config-example.yaml"); class TestMsgHandler implements MsgHandler { @@ -47,5 +48,13 @@ public void onDisconnect(ChannelHandlerContext ctx) { channel.addConnectHandler(testMsgHandler); channel.addMessageHandler(MsgType.CHANNEL_RPC_REQUEST, testMsgHandler); channel.addDisconnectHandler(testMsgHandler); + try{ + channel.start(); + Thread.sleep(10000); + channel.stop(); + } catch (Exception e) { + e.printStackTrace(); + fail("Exception is not expected"); + } } } diff --git a/src/main/java/org/fisco/bcos/sdk/channel/Channel.java b/src/main/java/org/fisco/bcos/sdk/channel/Channel.java index 435b16c38..5d6fe8f9d 100644 --- a/src/main/java/org/fisco/bcos/sdk/channel/Channel.java +++ b/src/main/java/org/fisco/bcos/sdk/channel/Channel.java @@ -41,6 +41,10 @@ static Channel build(String filepath) { return new ChannelImp(filepath); } + void start(); + + void stop(); + /** * Add a message handler to handle specific type messages. When one message comes the handler * will be notified, handler.onMessage(ChannleHandlerContext ctx, Message msg) called. diff --git a/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java b/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java index e6f7619fa..e70f41c73 100644 --- a/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java +++ b/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java @@ -15,6 +15,7 @@ package org.fisco.bcos.sdk.channel; +import com.fasterxml.jackson.core.JsonProcessingException; import io.netty.channel.ChannelHandlerContext; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; @@ -23,9 +24,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import org.fisco.bcos.sdk.channel.model.ChannelPrococolExceiption; import org.fisco.bcos.sdk.channel.model.EnumChannelProtocolVersion; +import org.fisco.bcos.sdk.channel.model.HeartBeatParser; +import org.fisco.bcos.sdk.channel.model.NodeHeartbeat; import org.fisco.bcos.sdk.channel.model.Options; import org.fisco.bcos.sdk.config.Config; import org.fisco.bcos.sdk.config.ConfigException; @@ -33,7 +39,13 @@ import org.fisco.bcos.sdk.model.Message; import org.fisco.bcos.sdk.model.MsgType; import org.fisco.bcos.sdk.model.Response; -import org.fisco.bcos.sdk.network.*; +import org.fisco.bcos.sdk.network.ConnectionInfo; +import org.fisco.bcos.sdk.network.MsgHandler; +import org.fisco.bcos.sdk.network.Network; +import org.fisco.bcos.sdk.network.NetworkException; +import org.fisco.bcos.sdk.network.NetworkImp; +import org.fisco.bcos.sdk.utils.ChannelUtils; +import org.fisco.bcos.sdk.utils.ObjectMapperFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,20 +62,40 @@ public class ChannelImp implements Channel { private Network network; private Map> groupId2PeerIpPortList; // upper module settings are required private Timer timeoutHandler = new HashedWheelTimer(); + private long heartBeatDelay = (long) 2000; + private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1); public ChannelImp(String filepath) { try { ConfigOption config = Config.load(filepath); msgHandler = new ChannelMsgHandler(); network = new NetworkImp(config, msgHandler); - network.start(); } catch (ConfigException e) { logger.error("init channel config error, {} ", e.getMessage()); + } + } + + @Override + public void start() { + try { + network.start(); + startPeriodTask(); } catch (NetworkException e) { logger.error("init channel network error, {} ", e.getMessage()); } } + private void startPeriodTask() { + scheduledExecutorService.scheduleAtFixedRate( + () -> broadcastHeartbeat(), 0, heartBeatDelay, TimeUnit.MILLISECONDS); + } + + @Override + public void stop() { + scheduledExecutorService.shutdownNow(); + network.stop(); + } + @Override public void addConnectHandler(MsgHandler handler) { msgHandler.addConnectHandler(handler); @@ -291,4 +323,77 @@ public List getAvailablePeer() { public EnumChannelProtocolVersion getVersion() { return null; } + + private void broadcastHeartbeat() { + msgHandler + .getAvailablePeer() + .forEach( + (peer, ctx) -> { + sendHeartbeatMessage(ctx); + logger.trace("broadcastHeartbeat to {} success ", peer); + }); + } + + public void sendHeartbeatMessage(ChannelHandlerContext ctx) { + String seq = ChannelUtils.newSeq(); + Message message = new Message(); + + try { + message.setSeq(seq); + message.setResult(0); + message.setType(Short.valueOf((short) MsgType.CLIENT_HEARTBEAT.getType())); + HeartBeatParser heartBeatParser = + new HeartBeatParser(ChannelVersionNegotiation.getProtocolVersion(ctx)); + message.setData(heartBeatParser.encode("0")); + logger.trace( + "encodeHeartbeatToMessage, seq: {}, content: {}, messageType: {}", + message.getSeq(), + heartBeatParser.toString(), + message.getType()); + } catch (JsonProcessingException e) { + logger.error( + "sendHeartbeatMessage failed for decode the message exception, errorMessage: {}", + e.getMessage()); + return; + } + + ResponseCallback callback = + new ResponseCallback() { + @Override + public void onResponse(Response response) { + Boolean disconnect = true; + try { + if (response.getErrorCode() != 0) { + logger.error( + " channel protocol heartbeat request failed, code: {}, message: {}", + response.getErrorCode(), + response.getErrorMessage()); + throw new ChannelPrococolExceiption( + " channel protocol heartbeat request failed, code: " + + response.getErrorCode() + + ", message: " + + response.getErrorMessage()); + } + + NodeHeartbeat nodeHeartbeat = + ObjectMapperFactory.getObjectMapper() + .readValue(response.getContent(), NodeHeartbeat.class); + int heartBeat = nodeHeartbeat.getHeartBeat(); + logger.trace(" heartbeat packet, heartbeat is {} ", heartBeat); + disconnect = false; + } catch (Exception e) { + logger.error( + " channel protocol heartbeat failed, exception: {}", + e.getMessage()); + } + if (disconnect) { + String host = ChannelVersionNegotiation.getPeerHost(ctx); + network.removeConnection(host); + } + } + }; + + ctx.writeAndFlush(message); + msgHandler.addSeq2CallBack(seq, callback); + } } diff --git a/src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java b/src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java index 68a6326c5..2c7c6d488 100644 --- a/src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java +++ b/src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java @@ -24,12 +24,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.fisco.bcos.sdk.channel.model.*; import org.fisco.bcos.sdk.client.protocol.response.NodeVersion; -import org.fisco.bcos.sdk.model.JsonRpcRequest; import org.fisco.bcos.sdk.model.Message; import org.fisco.bcos.sdk.model.MsgType; import org.fisco.bcos.sdk.model.Response; @@ -48,13 +44,11 @@ public class ChannelMsgHandler implements MsgHandler { private static Logger logger = LoggerFactory.getLogger(ChannelImp.class); private final ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper(); - private long heartBeatDelay = (long) 2000; private List msgConnectHandlerList = new ArrayList<>(); private List msgDisconnectHandleList = new ArrayList<>(); private Map msgHandlers = new ConcurrentHashMap<>(); private Map seq2Callback = new ConcurrentHashMap<>(); - private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1); private Map availablePeer = new ConcurrentHashMap<>(); public Map getAvailablePeer() { @@ -79,7 +73,7 @@ public void addSeq2CallBack(String seq, ResponseCallback callback) { public void removeSeq(String seq) { seq2Callback.remove(seq); - }; + } private void addAvailablePeer(String host, ChannelHandlerContext ctx) { availablePeer.put(host, ctx); @@ -146,16 +140,15 @@ public void onDisconnect(ChannelHandlerContext ctx) { } private void queryNodeVersion(ChannelHandlerContext ctx) { - JsonRpcRequest request = new JsonRpcRequest("getClientVersion", Arrays.asList()); + ChannelRequest request = new ChannelRequest("getClientVersion", Arrays.asList()); String seq = ChannelUtils.newSeq(); Message message = new Message(); - try { - byte[] encodedData = objectMapper.writeValueAsBytes(request); + byte[] payload = objectMapper.writeValueAsBytes(request); message.setSeq(seq); message.setResult(0); - message.setType(Short.valueOf((short) MsgType.CHANNEL_RPC_REQUEST.ordinal())); - message.setData(encodedData); + message.setType((short) MsgType.CHANNEL_RPC_REQUEST.getType()); + message.setData(payload); logger.trace( "encodeRequestToMessage, seq: {}, method: {}, messageType: {}", message.getSeq(), @@ -209,37 +202,26 @@ public void onResponse(Response response) { if (EnumNodeVersion.channelProtocolHandleShakeSupport( nodeVersion.getResult().getSupportedVersion())) { // node support channel protocol handshake, start it - logger.info( - " support channel handshake node: {}, content: {}", - nodeVersion.getResult(), - response.getContent()); + logger.info(" support channel handshake node"); queryChannelProtocolVersion(ctx); - disconnect = false; } else { // default channel protocol - logger.info( - " not support channel handshake set default ,node: {}, content: {}", - nodeVersion.getResult(), - response.getContent()); + logger.info(" not support channel handshake set default"); ChannelVersionNegotiation.setProtocolVersion( ctx, EnumChannelProtocolVersion.VERSION_1, nodeVersion.getResult().getSupportedVersion()); } - + disconnect = false; } catch (Exception e) { logger.error(" query node version failed, message: {}", e.getMessage()); } if (disconnect) { - // TODO: disconnect + ctx.disconnect(); + ctx.close(); } else { String host = ChannelVersionNegotiation.getPeerHost(ctx); addAvailablePeer(host, ctx); - scheduledExecutorService.scheduleAtFixedRate( - () -> sendHeartbeatMessage(ctx), - 0, - heartBeatDelay, - TimeUnit.MILLISECONDS); } } }; @@ -259,8 +241,13 @@ private void queryChannelProtocolVersion(ChannelHandlerContext ctx) byte[] payload = objectMapper.writeValueAsBytes(channelHandshake); message.setSeq(seq); message.setResult(0); - message.setType(Short.valueOf((short) MsgType.CLIENT_HANDSHAKE.ordinal())); + message.setType(Short.valueOf((short) MsgType.CLIENT_HANDSHAKE.getType())); message.setData(payload); + logger.trace( + "encodeChannelHandshakeToMessage, seq: {}, data: {}, messageType: {}", + message.getSeq(), + channelHandshake.toString(), + message.getType()); } catch (JsonProcessingException e) { logger.error( "queryChannelProtocolVersion failed for decode the message exception, errorMessage: {}", @@ -311,64 +298,8 @@ public void onResponse(Response response) { e.getMessage()); } if (disconnect) { - // TODO: disconnect - } - } - }; - - ctx.writeAndFlush(message); - addSeq2CallBack(seq, callback); - } - - public void sendHeartbeatMessage(ChannelHandlerContext ctx) { - String seq = ChannelUtils.newSeq(); - Message message = new Message(); - - try { - message.setSeq(seq); - message.setResult(0); - message.setType(Short.valueOf((short) MsgType.CLIENT_HEARTBEAT.ordinal())); - HeartBeatParser heartBeatParser = - new HeartBeatParser(ChannelVersionNegotiation.getProtocolVersion(ctx)); - message.setData(heartBeatParser.encode("0")); - } catch (JsonProcessingException e) { - logger.error( - "sendHeartbeatMessage failed for decode the message exception, errorMessage: {}", - e.getMessage()); - return; - } - - ResponseCallback callback = - new ResponseCallback() { - @Override - public void onResponse(Response response) { - Boolean disconnect = true; - try { - if (response.getErrorCode() != 0) { - logger.error( - " channel protocol heartbeat request failed, code: {}, message: {}", - response.getErrorCode(), - response.getErrorMessage()); - throw new ChannelPrococolExceiption( - " channel protocol heartbeat request failed, code: " - + response.getErrorCode() - + ", message: " - + response.getErrorMessage()); - } - - NodeHeartbeat nodeHeartbeat = - objectMapper.readValue( - response.getContent(), NodeHeartbeat.class); - int heartBeat = nodeHeartbeat.getHeartBeat(); - logger.trace(" heartbeat packet, heartbeat is {} ", heartBeat); - disconnect = false; - } catch (Exception e) { - logger.error( - " channel protocol heartbeat failed, exception: {}", - e.getMessage()); - } - if (disconnect) { - // TODO: disconnect + ctx.disconnect(); + ctx.close(); } } }; diff --git a/src/main/java/org/fisco/bcos/sdk/channel/model/ChannelRequest.java b/src/main/java/org/fisco/bcos/sdk/channel/model/ChannelRequest.java new file mode 100644 index 000000000..370dcec5b --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/channel/model/ChannelRequest.java @@ -0,0 +1,53 @@ +/** + * Copyright 2014-2020 [fisco-dev] + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.fisco.bcos.sdk.channel.model; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +public class ChannelRequest { + // for set the json id + private static AtomicLong nextIdGetter = new AtomicLong(0); + // the jsonrpc version, default is 2.0 + private String jsonrpc = "2.0"; + // rpc method + private String method; + // params for the rpc interface + private List params; + // the json rpc request id + private long id; + + public ChannelRequest(String method, List params) { + this.method = method; + this.params = params; + this.id = nextIdGetter.getAndIncrement(); + } + + // getter and setter for the class members + public String getJsonrpc() { + return this.jsonrpc; + } + + public String getMethod() { + return this.method; + } + + public long getId() { + return this.id; + } + + public List getParams() { + return this.params; + } +}