diff --git a/src/integration-test/java/org/fisco/bcos/sdk/channel/ChannelTest.java b/src/integration-test/java/org/fisco/bcos/sdk/channel/ChannelTest.java new file mode 100644 index 000000000..7eeb62fee --- /dev/null +++ b/src/integration-test/java/org/fisco/bcos/sdk/channel/ChannelTest.java @@ -0,0 +1,51 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.channel; + +import io.netty.channel.ChannelHandlerContext; +import org.fisco.bcos.sdk.config.ConfigException; +import org.fisco.bcos.sdk.model.Message; +import org.fisco.bcos.sdk.model.MsgType; +import org.fisco.bcos.sdk.network.MsgHandler; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ChannelTest { + @Test + public void testConnect() throws ConfigException { + Logger logger = LoggerFactory.getLogger(ChannelImp.class); + Channel channel = Channel.build("src/integration-test/resources/config-example.yaml"); + class TestMsgHandler implements MsgHandler { + @Override + public void onConnect(ChannelHandlerContext ctx) { + logger.info("OnConnect in ChannelTest called: "+ctx.channel().remoteAddress()); + } + @Override + public void onMessage(ChannelHandlerContext ctx, Message msg) { + logger.info("onMessage in ChannelTest called: "+ctx.channel().remoteAddress()); + } + @Override + public void onDisconnect(ChannelHandlerContext ctx) { + logger.info("onDisconnect in ChannelTest called: "+ctx.channel().remoteAddress()); + } + } + TestMsgHandler testMsgHandler = new TestMsgHandler(); + channel.addConnectHandler(testMsgHandler); + channel.addMessageHandler(MsgType.CHANNEL_RPC_REQUEST, testMsgHandler); + channel.addDisconnectHandler(testMsgHandler); + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/channel/Channel.java b/src/main/java/org/fisco/bcos/sdk/channel/Channel.java index 5e4be7785..d192e0f59 100644 --- a/src/main/java/org/fisco/bcos/sdk/channel/Channel.java +++ b/src/main/java/org/fisco/bcos/sdk/channel/Channel.java @@ -16,6 +16,7 @@ package org.fisco.bcos.sdk.channel; import java.util.List; +import org.fisco.bcos.sdk.channel.model.Options; import org.fisco.bcos.sdk.model.Message; import org.fisco.bcos.sdk.model.MsgType; import org.fisco.bcos.sdk.model.Response; @@ -110,16 +111,19 @@ static Channel build(String filepath) { * @param out: Message to be sent * @param peerIpPort: Remote ip:port information * @param callback: Response callback + * @param options: Include timeout */ - void asyncSendToPeer(Message out, String peerIpPort, ResponseCallback callback); + void asyncSendToPeer( + Message out, String peerIpPort, ResponseCallback callback, Options options); /** * Asynchronous interface, send to an random peer * * @param out: Message to be sent * @param callback: Response callback + * @param options: Include timeout */ - void asyncSendToRandom(Message out, ResponseCallback callback); + void asyncSendToRandom(Message out, ResponseCallback callback, Options options); /** * Asynchronous interface, send message to peer select by client`s rule @@ -127,8 +131,10 @@ static Channel build(String filepath) { * @param out: Message to be sent * @param rule: Rule set by client * @param callback: Response callback + * @param options: Include timeout */ - void asyncSendToPeerByRule(Message out, PeerSelectRule rule, ResponseCallback callback); + void asyncSendToPeerByRule( + Message out, PeerSelectRule rule, ResponseCallback callback, Options options); /** * Get connection information diff --git a/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java b/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java index 28b3e955a..4923dd708 100644 --- a/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java +++ b/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java @@ -16,20 +16,23 @@ package org.fisco.bcos.sdk.channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import org.fisco.bcos.sdk.channel.model.Options; import org.fisco.bcos.sdk.config.Config; import org.fisco.bcos.sdk.config.ConfigException; import org.fisco.bcos.sdk.config.ConfigOption; import org.fisco.bcos.sdk.model.Message; import org.fisco.bcos.sdk.model.MsgType; import org.fisco.bcos.sdk.model.Response; -import org.fisco.bcos.sdk.network.ConnectionInfo; -import org.fisco.bcos.sdk.network.MsgHandler; -import org.fisco.bcos.sdk.network.Network; -import org.fisco.bcos.sdk.network.NetworkImp; +import org.fisco.bcos.sdk.network.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,14 +48,18 @@ public class ChannelImp implements Channel { private ChannelMsgHandler msgHandler; private Network network; private Map> groupId2PeerIpPortList; // upper module settings are required + private Timer timeoutHandler = new HashedWheelTimer(); public ChannelImp(String filepath) { try { ConfigOption config = Config.load(filepath); msgHandler = new ChannelMsgHandler(); network = new NetworkImp(config, msgHandler); + network.start(); } catch (ConfigException e) { - logger.error("init channel error, {} ", e.getMessage()); + logger.error("init channel config error, {} ", e.getMessage()); + } catch (NetworkException e) { + logger.error("init channel network error, {} ", e.getMessage()); } } @@ -126,7 +133,7 @@ public void onResponse(Response response) { } Callback callback = new Callback(); - asyncSendToPeer(out, peerIpPort, callback); + asyncSendToPeer(out, peerIpPort, callback, new Options()); try { callback.semaphore.acquire(1); } catch (InterruptedException e) { @@ -167,7 +174,7 @@ public void onResponse(Response response) { } Callback callback = new Callback(); - asyncSendToRandom(out, callback); + asyncSendToRandom(out, callback, new Options()); try { callback.semaphore.acquire(1); } catch (InterruptedException e) { @@ -208,7 +215,7 @@ public void onResponse(Response response) { } Callback callback = new Callback(); - asyncSendToPeerByRule(out, rule, callback); + asyncSendToPeerByRule(out, rule, callback, new Options()); try { callback.semaphore.acquire(1); } catch (InterruptedException e) { @@ -220,9 +227,24 @@ public void onResponse(Response response) { } @Override - public void asyncSendToPeer(Message out, String peerIpPort, ResponseCallback callback) { + public void asyncSendToPeer( + Message out, String peerIpPort, ResponseCallback callback, Options options) { msgHandler.addSeq2CallBack(out.getSeq(), callback); ChannelHandlerContext ctx = msgHandler.getAvailablePeer().get(peerIpPort); + if (options.getTimeout() > 0) { + callback.setTimeout( + timeoutHandler.newTimeout( + new TimerTask() { + @Override + public void run(Timeout timeout) { + // handle timer + callback.onTimeout(); + msgHandler.removeSeq(out.getSeq()); + } + }, + options.getTimeout(), + TimeUnit.MILLISECONDS)); + } if (ctx != null) { ctx.writeAndFlush(out); logger.debug("send message to {} success ", peerIpPort); @@ -232,18 +254,19 @@ public void asyncSendToPeer(Message out, String peerIpPort, ResponseCallback cal } @Override - public void asyncSendToRandom(Message out, ResponseCallback callback) { + public void asyncSendToRandom(Message out, ResponseCallback callback, Options options) { List peerList = getAvailablePeer(); int random = (int) (Math.random() * (peerList.size())); String peerIpPort = peerList.get(random); logger.debug("send message to random peer {} ", peerIpPort); - asyncSendToPeer(out, peerIpPort, callback); + asyncSendToPeer(out, peerIpPort, callback, options); } @Override - public void asyncSendToPeerByRule(Message out, PeerSelectRule rule, ResponseCallback callback) { + public void asyncSendToPeerByRule( + Message out, PeerSelectRule rule, ResponseCallback callback, Options options) { String target = rule.select(getConnectionInfo()); - asyncSendToPeer(out, target, callback); + asyncSendToPeer(out, target, callback, options); } @Override diff --git a/src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java b/src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java index 09d08ac20..68a6326c5 100644 --- a/src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java +++ b/src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java @@ -77,6 +77,10 @@ public void addSeq2CallBack(String seq, ResponseCallback callback) { seq2Callback.put(seq, callback); } + public void removeSeq(String seq) { + seq2Callback.remove(seq); + }; + private void addAvailablePeer(String host, ChannelHandlerContext ctx) { availablePeer.put(host, ctx); } @@ -89,6 +93,9 @@ private ResponseCallback getAndRemoveSeq(String seq) { @Override public void onConnect(ChannelHandlerContext ctx) { + logger.debug( + "onConnect in ChannelMsgHandler called, host : {}", + ChannelVersionNegotiation.getPeerHost(ctx)); queryNodeVersion(ctx); for (MsgHandler handle : msgConnectHandlerList) { handle.onConnect(ctx); @@ -97,6 +104,10 @@ public void onConnect(ChannelHandlerContext ctx) { @Override public void onMessage(ChannelHandlerContext ctx, Message msg) { + logger.debug( + "onMessage in ChannelMsgHandler called, host : {}, msgType : {}", + ChannelVersionNegotiation.getPeerHost(ctx), + (int) msg.getType()); ResponseCallback callback = getAndRemoveSeq(msg.getSeq()); if (callback != null) { @@ -126,6 +137,9 @@ public void onMessage(ChannelHandlerContext ctx, Message msg) { @Override public void onDisconnect(ChannelHandlerContext ctx) { + logger.debug( + "onDisconnect in ChannelMsgHandler called, host : {}", + ChannelVersionNegotiation.getPeerHost(ctx)); for (MsgHandler handle : msgDisconnectHandleList) { handle.onDisconnect(ctx); } diff --git a/src/main/java/org/fisco/bcos/sdk/channel/model/Options.java b/src/main/java/org/fisco/bcos/sdk/channel/model/Options.java new file mode 100644 index 000000000..f30fb8103 --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/channel/model/Options.java @@ -0,0 +1,13 @@ +package org.fisco.bcos.sdk.channel.model; + +public class Options { + public long timeout = 0; + + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/client/JsonRpcService.java b/src/main/java/org/fisco/bcos/sdk/client/JsonRpcService.java index ff3ec86fb..e000d133e 100644 --- a/src/main/java/org/fisco/bcos/sdk/client/JsonRpcService.java +++ b/src/main/java/org/fisco/bcos/sdk/client/JsonRpcService.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.fisco.bcos.sdk.channel.Channel; import org.fisco.bcos.sdk.channel.ResponseCallback; +import org.fisco.bcos.sdk.channel.model.Options; import org.fisco.bcos.sdk.client.exceptions.ClientException; import org.fisco.bcos.sdk.client.protocol.response.JsonRpcResponse; import org.fisco.bcos.sdk.model.JsonRpcRequest; @@ -111,7 +112,8 @@ public void onResponse(Response response) { callback.onError(response); } } - }); + }, + new Options()); } public void asyncSendRequestToGroup(