Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
package org.fisco.bcos.sdk.channel;

import io.netty.channel.ChannelHandlerContext;
import org.fisco.bcos.sdk.config.ConfigException;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.network.MsgHandler;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.fail;

public class ChannelTest {
@Test
public void testConnect() throws ConfigException {
public void testConnect() {
Logger logger = LoggerFactory.getLogger(ChannelImp.class);
Channel channel = Channel.build("src/integration-test/resources/config-example.yaml");
class TestMsgHandler implements MsgHandler {
Expand All @@ -47,5 +48,13 @@ public void onDisconnect(ChannelHandlerContext ctx) {
channel.addConnectHandler(testMsgHandler);
channel.addMessageHandler(MsgType.CHANNEL_RPC_REQUEST, testMsgHandler);
channel.addDisconnectHandler(testMsgHandler);
try{
channel.start();
Thread.sleep(10000);
channel.stop();
} catch (Exception e) {
e.printStackTrace();
fail("Exception is not expected");
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/org/fisco/bcos/sdk/channel/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ static Channel build(String filepath) {
return new ChannelImp(filepath);
}

void start();

void stop();

/**
* Add a message handler to handle specific type messages. When one message comes the handler
* will be notified, handler.onMessage(ChannleHandlerContext ctx, Message msg) called.
Expand Down
109 changes: 107 additions & 2 deletions src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.fisco.bcos.sdk.channel;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
Expand All @@ -23,17 +24,28 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.fisco.bcos.sdk.channel.model.ChannelPrococolExceiption;
import org.fisco.bcos.sdk.channel.model.EnumChannelProtocolVersion;
import org.fisco.bcos.sdk.channel.model.HeartBeatParser;
import org.fisco.bcos.sdk.channel.model.NodeHeartbeat;
import org.fisco.bcos.sdk.channel.model.Options;
import org.fisco.bcos.sdk.config.Config;
import org.fisco.bcos.sdk.config.ConfigException;
import org.fisco.bcos.sdk.config.ConfigOption;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.Response;
import org.fisco.bcos.sdk.network.*;
import org.fisco.bcos.sdk.network.ConnectionInfo;
import org.fisco.bcos.sdk.network.MsgHandler;
import org.fisco.bcos.sdk.network.Network;
import org.fisco.bcos.sdk.network.NetworkException;
import org.fisco.bcos.sdk.network.NetworkImp;
import org.fisco.bcos.sdk.utils.ChannelUtils;
import org.fisco.bcos.sdk.utils.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,20 +62,40 @@ public class ChannelImp implements Channel {
private Network network;
private Map<String, List<String>> groupId2PeerIpPortList; // upper module settings are required
private Timer timeoutHandler = new HashedWheelTimer();
private long heartBeatDelay = (long) 2000;
private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);

public ChannelImp(String filepath) {
try {
ConfigOption config = Config.load(filepath);
msgHandler = new ChannelMsgHandler();
network = new NetworkImp(config, msgHandler);
network.start();
} catch (ConfigException e) {
logger.error("init channel config error, {} ", e.getMessage());
}
}

@Override
public void start() {
try {
network.start();
startPeriodTask();
} catch (NetworkException e) {
logger.error("init channel network error, {} ", e.getMessage());
}
}

private void startPeriodTask() {
scheduledExecutorService.scheduleAtFixedRate(
() -> broadcastHeartbeat(), 0, heartBeatDelay, TimeUnit.MILLISECONDS);
}

@Override
public void stop() {
scheduledExecutorService.shutdownNow();
network.stop();
}

@Override
public void addConnectHandler(MsgHandler handler) {
msgHandler.addConnectHandler(handler);
Expand Down Expand Up @@ -291,4 +323,77 @@ public List<String> getAvailablePeer() {
public EnumChannelProtocolVersion getVersion() {
return null;
}

private void broadcastHeartbeat() {
msgHandler
.getAvailablePeer()
.forEach(
(peer, ctx) -> {
sendHeartbeatMessage(ctx);
logger.trace("broadcastHeartbeat to {} success ", peer);
});
}

public void sendHeartbeatMessage(ChannelHandlerContext ctx) {
String seq = ChannelUtils.newSeq();
Message message = new Message();

try {
message.setSeq(seq);
message.setResult(0);
message.setType(Short.valueOf((short) MsgType.CLIENT_HEARTBEAT.getType()));
HeartBeatParser heartBeatParser =
new HeartBeatParser(ChannelVersionNegotiation.getProtocolVersion(ctx));
message.setData(heartBeatParser.encode("0"));
logger.trace(
"encodeHeartbeatToMessage, seq: {}, content: {}, messageType: {}",
message.getSeq(),
heartBeatParser.toString(),
message.getType());
} catch (JsonProcessingException e) {
logger.error(
"sendHeartbeatMessage failed for decode the message exception, errorMessage: {}",
e.getMessage());
return;
}

ResponseCallback callback =
new ResponseCallback() {
@Override
public void onResponse(Response response) {
Boolean disconnect = true;
try {
if (response.getErrorCode() != 0) {
logger.error(
" channel protocol heartbeat request failed, code: {}, message: {}",
response.getErrorCode(),
response.getErrorMessage());
throw new ChannelPrococolExceiption(
" channel protocol heartbeat request failed, code: "
+ response.getErrorCode()
+ ", message: "
+ response.getErrorMessage());
}

NodeHeartbeat nodeHeartbeat =
ObjectMapperFactory.getObjectMapper()
.readValue(response.getContent(), NodeHeartbeat.class);
int heartBeat = nodeHeartbeat.getHeartBeat();
logger.trace(" heartbeat packet, heartbeat is {} ", heartBeat);
disconnect = false;
} catch (Exception e) {
logger.error(
" channel protocol heartbeat failed, exception: {}",
e.getMessage());
}
if (disconnect) {
String host = ChannelVersionNegotiation.getPeerHost(ctx);
network.removeConnection(host);
}
}
};

ctx.writeAndFlush(message);
msgHandler.addSeq2CallBack(seq, callback);
}
}
105 changes: 18 additions & 87 deletions src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.fisco.bcos.sdk.channel.model.*;
import org.fisco.bcos.sdk.client.protocol.response.NodeVersion;
import org.fisco.bcos.sdk.model.JsonRpcRequest;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.Response;
Expand All @@ -48,13 +44,11 @@ public class ChannelMsgHandler implements MsgHandler {

private static Logger logger = LoggerFactory.getLogger(ChannelImp.class);
private final ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper();
private long heartBeatDelay = (long) 2000;

private List<MsgHandler> msgConnectHandlerList = new ArrayList<>();
private List<MsgHandler> msgDisconnectHandleList = new ArrayList<>();
private Map<MsgType, MsgHandler> msgHandlers = new ConcurrentHashMap<>();
private Map<String, ResponseCallback> seq2Callback = new ConcurrentHashMap<>();
private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
private Map<String, ChannelHandlerContext> availablePeer = new ConcurrentHashMap<>();

public Map<String, ChannelHandlerContext> getAvailablePeer() {
Expand All @@ -79,7 +73,7 @@ public void addSeq2CallBack(String seq, ResponseCallback callback) {

public void removeSeq(String seq) {
seq2Callback.remove(seq);
};
}

private void addAvailablePeer(String host, ChannelHandlerContext ctx) {
availablePeer.put(host, ctx);
Expand Down Expand Up @@ -146,16 +140,15 @@ public void onDisconnect(ChannelHandlerContext ctx) {
}

private void queryNodeVersion(ChannelHandlerContext ctx) {
JsonRpcRequest request = new JsonRpcRequest("getClientVersion", Arrays.asList());
ChannelRequest request = new ChannelRequest("getClientVersion", Arrays.asList());
String seq = ChannelUtils.newSeq();
Message message = new Message();

try {
byte[] encodedData = objectMapper.writeValueAsBytes(request);
byte[] payload = objectMapper.writeValueAsBytes(request);
message.setSeq(seq);
message.setResult(0);
message.setType(Short.valueOf((short) MsgType.CHANNEL_RPC_REQUEST.ordinal()));
message.setData(encodedData);
message.setType((short) MsgType.CHANNEL_RPC_REQUEST.getType());
message.setData(payload);
logger.trace(
"encodeRequestToMessage, seq: {}, method: {}, messageType: {}",
message.getSeq(),
Expand Down Expand Up @@ -209,37 +202,26 @@ public void onResponse(Response response) {
if (EnumNodeVersion.channelProtocolHandleShakeSupport(
nodeVersion.getResult().getSupportedVersion())) {
// node support channel protocol handshake, start it
logger.info(
" support channel handshake node: {}, content: {}",
nodeVersion.getResult(),
response.getContent());
logger.info(" support channel handshake node");
queryChannelProtocolVersion(ctx);
disconnect = false;
} else { // default channel protocol
logger.info(
" not support channel handshake set default ,node: {}, content: {}",
nodeVersion.getResult(),
response.getContent());
logger.info(" not support channel handshake set default");
ChannelVersionNegotiation.setProtocolVersion(
ctx,
EnumChannelProtocolVersion.VERSION_1,
nodeVersion.getResult().getSupportedVersion());
}

disconnect = false;
} catch (Exception e) {
logger.error(" query node version failed, message: {}", e.getMessage());
}

if (disconnect) {
// TODO: disconnect
ctx.disconnect();
ctx.close();
} else {
String host = ChannelVersionNegotiation.getPeerHost(ctx);
addAvailablePeer(host, ctx);
scheduledExecutorService.scheduleAtFixedRate(
() -> sendHeartbeatMessage(ctx),
0,
heartBeatDelay,
TimeUnit.MILLISECONDS);
}
}
};
Expand All @@ -259,8 +241,13 @@ private void queryChannelProtocolVersion(ChannelHandlerContext ctx)
byte[] payload = objectMapper.writeValueAsBytes(channelHandshake);
message.setSeq(seq);
message.setResult(0);
message.setType(Short.valueOf((short) MsgType.CLIENT_HANDSHAKE.ordinal()));
message.setType(Short.valueOf((short) MsgType.CLIENT_HANDSHAKE.getType()));
message.setData(payload);
logger.trace(
"encodeChannelHandshakeToMessage, seq: {}, data: {}, messageType: {}",
message.getSeq(),
channelHandshake.toString(),
message.getType());
} catch (JsonProcessingException e) {
logger.error(
"queryChannelProtocolVersion failed for decode the message exception, errorMessage: {}",
Expand Down Expand Up @@ -311,64 +298,8 @@ public void onResponse(Response response) {
e.getMessage());
}
if (disconnect) {
// TODO: disconnect
}
}
};

ctx.writeAndFlush(message);
addSeq2CallBack(seq, callback);
}

public void sendHeartbeatMessage(ChannelHandlerContext ctx) {
String seq = ChannelUtils.newSeq();
Message message = new Message();

try {
message.setSeq(seq);
message.setResult(0);
message.setType(Short.valueOf((short) MsgType.CLIENT_HEARTBEAT.ordinal()));
HeartBeatParser heartBeatParser =
new HeartBeatParser(ChannelVersionNegotiation.getProtocolVersion(ctx));
message.setData(heartBeatParser.encode("0"));
} catch (JsonProcessingException e) {
logger.error(
"sendHeartbeatMessage failed for decode the message exception, errorMessage: {}",
e.getMessage());
return;
}

ResponseCallback callback =
new ResponseCallback() {
@Override
public void onResponse(Response response) {
Boolean disconnect = true;
try {
if (response.getErrorCode() != 0) {
logger.error(
" channel protocol heartbeat request failed, code: {}, message: {}",
response.getErrorCode(),
response.getErrorMessage());
throw new ChannelPrococolExceiption(
" channel protocol heartbeat request failed, code: "
+ response.getErrorCode()
+ ", message: "
+ response.getErrorMessage());
}

NodeHeartbeat nodeHeartbeat =
objectMapper.readValue(
response.getContent(), NodeHeartbeat.class);
int heartBeat = nodeHeartbeat.getHeartBeat();
logger.trace(" heartbeat packet, heartbeat is {} ", heartBeat);
disconnect = false;
} catch (Exception e) {
logger.error(
" channel protocol heartbeat failed, exception: {}",
e.getMessage());
}
if (disconnect) {
// TODO: disconnect
ctx.disconnect();
ctx.close();
}
}
};
Expand Down
Loading