Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2014-2020 [fisco-dev]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package org.fisco.bcos.sdk.channel;

import io.netty.channel.ChannelHandlerContext;
import org.fisco.bcos.sdk.config.ConfigException;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.network.MsgHandler;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChannelTest {
@Test
public void testConnect() throws ConfigException {
Logger logger = LoggerFactory.getLogger(ChannelImp.class);
Channel channel = Channel.build("src/integration-test/resources/config-example.yaml");
class TestMsgHandler implements MsgHandler {
@Override
public void onConnect(ChannelHandlerContext ctx) {
logger.info("OnConnect in ChannelTest called: "+ctx.channel().remoteAddress());
}
@Override
public void onMessage(ChannelHandlerContext ctx, Message msg) {
logger.info("onMessage in ChannelTest called: "+ctx.channel().remoteAddress());
}
@Override
public void onDisconnect(ChannelHandlerContext ctx) {
logger.info("onDisconnect in ChannelTest called: "+ctx.channel().remoteAddress());
}
}
TestMsgHandler testMsgHandler = new TestMsgHandler();
channel.addConnectHandler(testMsgHandler);
channel.addMessageHandler(MsgType.CHANNEL_RPC_REQUEST, testMsgHandler);
channel.addDisconnectHandler(testMsgHandler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can add two interfaces "void start()" and "void stop()" in channel.

}
}
12 changes: 9 additions & 3 deletions src/main/java/org/fisco/bcos/sdk/channel/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.fisco.bcos.sdk.channel;

import java.util.List;
import org.fisco.bcos.sdk.channel.model.Options;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.Response;
Expand Down Expand Up @@ -110,25 +111,30 @@ static Channel build(String filepath) {
* @param out: Message to be sent
* @param peerIpPort: Remote ip:port information
* @param callback: Response callback
* @param options: Include timeout
*/
void asyncSendToPeer(Message out, String peerIpPort, ResponseCallback callback);
void asyncSendToPeer(
Message out, String peerIpPort, ResponseCallback callback, Options options);

/**
* Asynchronous interface, send to an random peer
*
* @param out: Message to be sent
* @param callback: Response callback
* @param options: Include timeout
*/
void asyncSendToRandom(Message out, ResponseCallback callback);
void asyncSendToRandom(Message out, ResponseCallback callback, Options options);

/**
* Asynchronous interface, send message to peer select by client`s rule
*
* @param out: Message to be sent
* @param rule: Rule set by client
* @param callback: Response callback
* @param options: Include timeout
*/
void asyncSendToPeerByRule(Message out, PeerSelectRule rule, ResponseCallback callback);
void asyncSendToPeerByRule(
Message out, PeerSelectRule rule, ResponseCallback callback, Options options);

/**
* Get connection information
Expand Down
49 changes: 36 additions & 13 deletions src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@
package org.fisco.bcos.sdk.channel;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.fisco.bcos.sdk.channel.model.Options;
import org.fisco.bcos.sdk.config.Config;
import org.fisco.bcos.sdk.config.ConfigException;
import org.fisco.bcos.sdk.config.ConfigOption;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.Response;
import org.fisco.bcos.sdk.network.ConnectionInfo;
import org.fisco.bcos.sdk.network.MsgHandler;
import org.fisco.bcos.sdk.network.Network;
import org.fisco.bcos.sdk.network.NetworkImp;
import org.fisco.bcos.sdk.network.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,14 +48,18 @@ public class ChannelImp implements Channel {
private ChannelMsgHandler msgHandler;
private Network network;
private Map<String, List<String>> groupId2PeerIpPortList; // upper module settings are required
private Timer timeoutHandler = new HashedWheelTimer();

public ChannelImp(String filepath) {
try {
ConfigOption config = Config.load(filepath);
msgHandler = new ChannelMsgHandler();
network = new NetworkImp(config, msgHandler);
network.start();
} catch (ConfigException e) {
logger.error("init channel error, {} ", e.getMessage());
logger.error("init channel config error, {} ", e.getMessage());
} catch (NetworkException e) {
logger.error("init channel network error, {} ", e.getMessage());
}
}

Expand Down Expand Up @@ -126,7 +133,7 @@ public void onResponse(Response response) {
}

Callback callback = new Callback();
asyncSendToPeer(out, peerIpPort, callback);
asyncSendToPeer(out, peerIpPort, callback, new Options());
try {
callback.semaphore.acquire(1);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -167,7 +174,7 @@ public void onResponse(Response response) {
}

Callback callback = new Callback();
asyncSendToRandom(out, callback);
asyncSendToRandom(out, callback, new Options());
try {
callback.semaphore.acquire(1);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -208,7 +215,7 @@ public void onResponse(Response response) {
}

Callback callback = new Callback();
asyncSendToPeerByRule(out, rule, callback);
asyncSendToPeerByRule(out, rule, callback, new Options());
try {
callback.semaphore.acquire(1);
} catch (InterruptedException e) {
Expand All @@ -220,9 +227,24 @@ public void onResponse(Response response) {
}

@Override
public void asyncSendToPeer(Message out, String peerIpPort, ResponseCallback callback) {
public void asyncSendToPeer(
Message out, String peerIpPort, ResponseCallback callback, Options options) {
msgHandler.addSeq2CallBack(out.getSeq(), callback);
ChannelHandlerContext ctx = msgHandler.getAvailablePeer().get(peerIpPort);
if (options.getTimeout() > 0) {
callback.setTimeout(
timeoutHandler.newTimeout(
new TimerTask() {
@Override
public void run(Timeout timeout) {
// handle timer
callback.onTimeout();
msgHandler.removeSeq(out.getSeq());
}
},
options.getTimeout(),
TimeUnit.MILLISECONDS));
}
if (ctx != null) {
ctx.writeAndFlush(out);
logger.debug("send message to {} success ", peerIpPort);
Expand All @@ -232,18 +254,19 @@ public void asyncSendToPeer(Message out, String peerIpPort, ResponseCallback cal
}

@Override
public void asyncSendToRandom(Message out, ResponseCallback callback) {
public void asyncSendToRandom(Message out, ResponseCallback callback, Options options) {
List<String> peerList = getAvailablePeer();
int random = (int) (Math.random() * (peerList.size()));
String peerIpPort = peerList.get(random);
logger.debug("send message to random peer {} ", peerIpPort);
asyncSendToPeer(out, peerIpPort, callback);
asyncSendToPeer(out, peerIpPort, callback, options);
}

@Override
public void asyncSendToPeerByRule(Message out, PeerSelectRule rule, ResponseCallback callback) {
public void asyncSendToPeerByRule(
Message out, PeerSelectRule rule, ResponseCallback callback, Options options) {
String target = rule.select(getConnectionInfo());
asyncSendToPeer(out, target, callback);
asyncSendToPeer(out, target, callback, options);
}

@Override
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/fisco/bcos/sdk/channel/ChannelMsgHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public void addSeq2CallBack(String seq, ResponseCallback callback) {
seq2Callback.put(seq, callback);
}

public void removeSeq(String seq) {
seq2Callback.remove(seq);
};

private void addAvailablePeer(String host, ChannelHandlerContext ctx) {
availablePeer.put(host, ctx);
}
Expand All @@ -89,6 +93,9 @@ private ResponseCallback getAndRemoveSeq(String seq) {

@Override
public void onConnect(ChannelHandlerContext ctx) {
logger.debug(
"onConnect in ChannelMsgHandler called, host : {}",
ChannelVersionNegotiation.getPeerHost(ctx));
queryNodeVersion(ctx);
for (MsgHandler handle : msgConnectHandlerList) {
handle.onConnect(ctx);
Expand All @@ -97,6 +104,10 @@ public void onConnect(ChannelHandlerContext ctx) {

@Override
public void onMessage(ChannelHandlerContext ctx, Message msg) {
logger.debug(
"onMessage in ChannelMsgHandler called, host : {}, msgType : {}",
ChannelVersionNegotiation.getPeerHost(ctx),
(int) msg.getType());
ResponseCallback callback = getAndRemoveSeq(msg.getSeq());

if (callback != null) {
Expand Down Expand Up @@ -126,6 +137,9 @@ public void onMessage(ChannelHandlerContext ctx, Message msg) {

@Override
public void onDisconnect(ChannelHandlerContext ctx) {
logger.debug(
"onDisconnect in ChannelMsgHandler called, host : {}",
ChannelVersionNegotiation.getPeerHost(ctx));
for (MsgHandler handle : msgDisconnectHandleList) {
handle.onDisconnect(ctx);
}
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/org/fisco/bcos/sdk/channel/model/Options.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.fisco.bcos.sdk.channel.model;

public class Options {
public long timeout = 0;

public long getTimeout() {
return timeout;
}

public void setTimeout(long timeout) {
this.timeout = timeout;
}
}
4 changes: 3 additions & 1 deletion src/main/java/org/fisco/bcos/sdk/client/JsonRpcService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.fisco.bcos.sdk.channel.Channel;
import org.fisco.bcos.sdk.channel.ResponseCallback;
import org.fisco.bcos.sdk.channel.model.Options;
import org.fisco.bcos.sdk.client.exceptions.ClientException;
import org.fisco.bcos.sdk.client.protocol.response.JsonRpcResponse;
import org.fisco.bcos.sdk.model.JsonRpcRequest;
Expand Down Expand Up @@ -111,7 +112,8 @@ public void onResponse(Response response) {
callback.onError(response);
}
}
});
},
new Options());
}

public <T extends JsonRpcResponse> void asyncSendRequestToGroup(
Expand Down