Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ java-sdk.iml
.project
.settings/
bin/
out/

## integration test files
nodes/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,33 @@

package org.fisco.bcos.sdk.channel;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.channel.ChannelHandlerContext;
import org.fisco.bcos.sdk.channel.model.EnumChannelProtocolVersion;
import org.fisco.bcos.sdk.channel.model.HeartBeatParser;
import org.fisco.bcos.sdk.channel.model.NodeHeartbeat;
import org.fisco.bcos.sdk.channel.model.Options;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.Response;
import org.fisco.bcos.sdk.network.MsgHandler;
import org.fisco.bcos.sdk.utils.ChannelUtils;
import org.fisco.bcos.sdk.utils.ObjectMapperFactory;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static org.junit.Assert.fail;

public class ChannelTest {
private Logger logger = LoggerFactory.getLogger(ChannelImp.class);
private Channel channel;

@Test
public void testConnect() {
Logger logger = LoggerFactory.getLogger(ChannelImp.class);
Channel channel = Channel.build("src/integration-test/resources/config-example.yaml");
channel = Channel.build("src/integration-test/resources/config-example.yaml");
class TestMsgHandler implements MsgHandler {
@Override
public void onConnect(ChannelHandlerContext ctx) {
Expand All @@ -50,11 +62,63 @@ public void onDisconnect(ChannelHandlerContext ctx) {
channel.addDisconnectHandler(testMsgHandler);
try{
channel.start();
sendMessage();
Thread.sleep(10000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid use Thread.sleep, it makes the test takes long time.

channel.stop();
} catch (Exception e) {
e.printStackTrace();
fail("Exception is not expected");
}
}

// use heart beat for case to send
private void sendMessage() {
List<String> peers = channel.getAvailablePeer();
if (peers.size() == 0) {
fail("Empty available peer");
}
String host = peers.get(0);
Message message = new Message();
try {
message.setSeq(ChannelUtils.newSeq());
message.setResult(0);
message.setType(Short.valueOf((short) MsgType.CLIENT_HEARTBEAT.getType()));
HeartBeatParser heartBeatParser = new HeartBeatParser(EnumChannelProtocolVersion.VERSION_1);
message.setData(heartBeatParser.encode("0"));
logger.trace(
"encodeHeartbeatToMessage, seq: {}, content: {}, messageType: {}",
message.getSeq(),
heartBeatParser.toString(),
message.getType());
} catch (JsonProcessingException e) {
logger.error(
"sendHeartbeatMessage failed for decode the message exception, errorMessage: {}",
e.getMessage());
return;
}

ResponseCallback callback =
new ResponseCallback() {
@Override
public void onResponse(Response response) {
try {
NodeHeartbeat nodeHeartbeat =
ObjectMapperFactory.getObjectMapper()
.readValue(response.getContent(), NodeHeartbeat.class);
int heartBeat = nodeHeartbeat.getHeartBeat();
logger.trace(" heartbeat packet in ChannelTest, heartbeat is {} ", heartBeat);
if (heartBeat != 1) {
fail("heartbeat packet in ChannelTest fail");
}
} catch (Exception e) {
fail(" channel protocol heartbeat failed, exception: " + e.getMessage());
}
}
};

logger.info(" test sendToPeer");
channel.sendToPeer(message, host);
logger.info(" test asyncSendToPeer");
channel.asyncSendToPeer(message, host, callback, new Options());
}
}
90 changes: 75 additions & 15 deletions src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.fisco.bcos.sdk.channel.model.ChannelMessageError;
import org.fisco.bcos.sdk.channel.model.ChannelPrococolExceiption;
import org.fisco.bcos.sdk.channel.model.EnumChannelProtocolVersion;
import org.fisco.bcos.sdk.channel.model.HeartBeatParser;
Expand Down Expand Up @@ -57,6 +59,8 @@
public class ChannelImp implements Channel {

private static Logger logger = LoggerFactory.getLogger(ChannelImp.class);
private Integer connectSeconds = 30;
private Integer connectSleepPerMillis = 30;

private ChannelMsgHandler msgHandler;
private Network network;
Expand All @@ -79,13 +83,57 @@ public ChannelImp(String filepath) {
public void start() {
try {
network.start();
startPeriodTask();
checkConnectionsToStartPeriodTask();
} catch (NetworkException e) {
logger.error("init channel network error, {} ", e.getMessage());
}
}

private void checkConnectionsToStartPeriodTask() {
try {
int sleepTime = 0;
while (true) {
if (getAvailablePeer().size() > 0 || sleepTime > connectSeconds * 1000) {
break;
} else {
Thread.sleep(connectSleepPerMillis);
sleepTime += connectSleepPerMillis;
}
}

List<String> peers = getAvailablePeer();
String connectionInfoStr = "";
for (String peer : peers) {
connectionInfoStr += peer + ", ";
}

String baseMessage =
" nodes: "
+ connectionInfoStr
+ "java version: "
+ System.getProperty("java.version")
+ " ,java vendor: "
+ System.getProperty("java.vm.vendor");

if (getAvailablePeer().size() == 0) {
String errorMessage = " Failed to connect to " + baseMessage;
logger.error(errorMessage);
throw new Exception(errorMessage);
}

logger.info(" Connect to " + baseMessage);

startPeriodTask();
} catch (InterruptedException e) {
logger.warn(" thread interrupted exception: ", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error(" service init failed, error message: {}, error: ", e.getMessage(), e);
}
}

private void startPeriodTask() {
/** periodically send heartbeat message to all connected node, default period : 2s */
scheduledExecutorService.scheduleAtFixedRate(
() -> broadcastHeartbeat(), 0, heartBeatDelay, TimeUnit.MILLISECONDS);
}
Expand Down Expand Up @@ -264,25 +312,33 @@ public void asyncSendToPeer(
Message out, String peerIpPort, ResponseCallback callback, Options options) {
msgHandler.addSeq2CallBack(out.getSeq(), callback);
ChannelHandlerContext ctx = msgHandler.getAvailablePeer().get(peerIpPort);
if (options.getTimeout() > 0) {
callback.setTimeout(
timeoutHandler.newTimeout(
new TimerTask() {
@Override
public void run(Timeout timeout) {
// handle timer
callback.onTimeout();
msgHandler.removeSeq(out.getSeq());
}
},
options.getTimeout(),
TimeUnit.MILLISECONDS));
}
if (ctx != null) {
if (options.getTimeout() > 0) {
callback.setTimeout(
timeoutHandler.newTimeout(
new TimerTask() {
@Override
public void run(Timeout timeout) {
// handle timer
callback.onTimeout();
msgHandler.removeSeq(out.getSeq());
}
},
options.getTimeout(),
TimeUnit.MILLISECONDS));
}
ctx.writeAndFlush(out);
logger.debug("send message to {} success ", peerIpPort);
} else {
logger.debug("send message to {} failed ", peerIpPort);
Response response = new Response();
response.setErrorCode(ChannelMessageError.CONNECTION_INVALID.getError());
response.setErrorMessage(
"The connection to peer "
+ ChannelVersionNegotiation.getPeerHost(ctx)
+ " is invalid.");
response.setContent("");
callback.onResponse(response);
}
}

Expand Down Expand Up @@ -396,4 +452,8 @@ public void onResponse(Response response) {
ctx.writeAndFlush(message);
msgHandler.addSeq2CallBack(seq, callback);
}

public void setThreadPool(ThreadPoolExecutor threadPool) {
network.setMsgHandleThreadPool(threadPool);
}
}
14 changes: 10 additions & 4 deletions src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ public void onConnect(ChannelHandlerContext ctx) {
@Override
public void onMessage(ChannelHandlerContext ctx, Message msg) {
logger.debug(
"onMessage in ChannelMsgHandler called, host : {}, msgType : {}",
"onMessage in ChannelMsgHandler called, host : {}, seq : {}, msgType : {}",
ChannelVersionNegotiation.getPeerHost(ctx),
msg.getSeq(),
(int) msg.getType());
ResponseCallback callback = getAndRemoveSeq(msg.getSeq());

Expand All @@ -124,6 +125,11 @@ public void onMessage(ChannelHandlerContext ctx, Message msg) {
response.setContent(new String(msg.getData()));
callback.onResponse(response);
} else {
logger.trace(
" receive response with invalid seq, type: {}, result: {}, content: {}",
(int) msg.getType(),
msg.getResult(),
new String(msg.getData()));
MsgHandler msgHandler = msgHandlers.get(msg.getType());
msgHandler.onMessage(ctx, msg);
}
Expand Down Expand Up @@ -219,9 +225,6 @@ public void onResponse(Response response) {
if (disconnect) {
ctx.disconnect();
ctx.close();
} else {
String host = ChannelVersionNegotiation.getPeerHost(ctx);
addAvailablePeer(host, ctx);
}
}
};
Expand Down Expand Up @@ -300,6 +303,9 @@ public void onResponse(Response response) {
if (disconnect) {
ctx.disconnect();
ctx.close();
} else {
String host = ChannelVersionNegotiation.getPeerHost(ctx);
addAvailablePeer(host, ctx);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

public enum ChannelMessageError {
MESSAGE_TIMEOUT(102), // timeout
INTERNAL_MESSAGE_HANDLE_FAILED(-5000);
INTERNAL_MESSAGE_HANDLE_FAILED(-5000),
CONNECTION_INVALID(-5001);

private int error;

Expand Down