From 711de0efbe12c2fd1da93f66cdf5e88b78ec4545 Mon Sep 17 00:00:00 2001 From: chaychen2005 <605966652@qq.com> Date: Tue, 21 Jul 2020 19:43:04 +0800 Subject: [PATCH] channel implement --- .gitignore | 1 + .../org/fisco/bcos/sdk/channel/Channel.java | 57 ++-- .../fisco/bcos/sdk/channel/ChannelImp.java | 284 ++++++++++++++++++ .../bcos/sdk/channel/ChannelMessageError.java | 34 +++ .../bcos/sdk/channel/ResponseCallback.java | 27 ++ .../org/fisco/bcos/sdk/config/Config.java | 2 +- 6 files changed, 375 insertions(+), 30 deletions(-) create mode 100644 src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java create mode 100644 src/main/java/org/fisco/bcos/sdk/channel/ChannelMessageError.java diff --git a/.gitignore b/.gitignore index cc788f2f1..a770cbbd5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .gradle/ .idea/ build +java-sdk.iml ## eclipse ## .classpath diff --git a/src/main/java/org/fisco/bcos/sdk/channel/Channel.java b/src/main/java/org/fisco/bcos/sdk/channel/Channel.java index 9f58feed4..c5ffaffc9 100644 --- a/src/main/java/org/fisco/bcos/sdk/channel/Channel.java +++ b/src/main/java/org/fisco/bcos/sdk/channel/Channel.java @@ -36,7 +36,7 @@ public interface Channel { * @return a channel instance */ static Channel build(String filepath) { - return null; + return new ChannelImp(filepath); } /** @@ -65,28 +65,33 @@ static Channel build(String filepath) { void addDisconnectHandler(MsgHandler handler); /** - * Synchronize interface, send a message to the given peer, and get the response + * Send a message to the given group * * @param out: Message to be sent - * @param peerIpPort: Remote ip:port information - * @param callback: The callback to be called when the response returns - * @return: Remote reply + * @param groupId: ID of the group receiving the message packet */ - Response sendToPeer(Message out, String peerIpPort); + void broadcastToGroup(Message out, String groupId); + /** - * Synchronize interface, send a message to the given group, and get the response + * Broadcast to all peer * * @param out: Message to be sent - * @param groupId: ID of the group receiving the message packet - * @param callback: The callback to be called when the response returns + */ + void broadcast(Message out); + + /** + * Synchronize interface, send a message to the given peer, and get the response + * + * @param out: Message to be sent + * @param peerIpPort: Remote ip:port information * @return: Remote reply */ - Response sendToGroup(Message out, String groupId); + Response sendToPeer(Message out, String peerIpPort); + /** * Synchronize interface, randomly select nodes to send messages * * @param out: Message to be sent - * @param callback: The callback to be called when the response returns * @return: Remote reply */ Response sendToRandom(Message out); @@ -100,28 +105,11 @@ static Channel build(String filepath) { */ void asyncSendToPeer(Message out, String peerIpPort, ResponseCallback callback); - /** - * Send to a best peer with highest block height in Group - * - * @param out - * @param groupId - * @param callback - */ - void asyncSendToGroup(Message out, String groupId, ResponseCallback callback); - - /** - * Broadcast to all peer - * - * @param out - * @param callback - */ - void broadcast(Message out, ResponseCallback callback); - /** * Send to an random peer * * @param out - * @param callback + * @param callback response callback */ void asyncSendToRandom(Message out, ResponseCallback callback); @@ -136,4 +124,15 @@ public static String newSeq() { String seq = UUID.randomUUID().toString().replaceAll("-", ""); return seq; } + + /** + * Get available peer information + * + * @return List of available peer + */ + List getAvailablePeer(); + + Response sendToGroup(Message out, String groupId); + + void asyncSendToGroup(Message out, String groupId, ResponseCallback callback); } diff --git a/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java b/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java new file mode 100644 index 000000000..c4387a2c3 --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java @@ -0,0 +1,284 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.channel; + +import io.netty.channel.ChannelHandlerContext; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import org.fisco.bcos.sdk.config.Config; +import org.fisco.bcos.sdk.config.ConfigException; +import org.fisco.bcos.sdk.config.ConfigOption; +import org.fisco.bcos.sdk.model.Message; +import org.fisco.bcos.sdk.model.MsgType; +import org.fisco.bcos.sdk.model.Response; +import org.fisco.bcos.sdk.network.ConnectionInfo; +import org.fisco.bcos.sdk.network.MsgHandler; +import org.fisco.bcos.sdk.network.Network; +import org.fisco.bcos.sdk.network.NetworkImp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of channel. + * + * @author chaychen + */ +public class ChannelImp implements Channel { + + private static Logger logger = LoggerFactory.getLogger(ChannelImp.class); + + private List msgConnectHandlerList = new ArrayList<>(); + private List msgDisconnectHandleList = new ArrayList<>(); + private Map msgHandlers = new ConcurrentHashMap<>(); + private Map seq2Callback = new ConcurrentHashMap<>(); + + private MsgHandler msgHandler = + new MsgHandler() { + @Override + public void onConnect(ChannelHandlerContext ctx) { + // TODO: + // queryNodeVersion + // queryBlockNumber + // connection info -> available peers + for (MsgHandler handle : msgConnectHandlerList) { + handle.onConnect(ctx); + } + } + + @Override + public void onMessage(ChannelHandlerContext ctx, Message msg) { + // TODO: use msgHandlers to find special type to handle + ResponseCallback callback = (ResponseCallback) seq2Callback.get(msg.getSeq()); + + if (callback != null) { + if (callback.getTimeout() != null) { + callback.getTimeout().cancel(); + } + + logger.trace( + " receive response, seq: {}, result: {}, content: {}", + msg.getSeq(), + msg.getResult(), + new String(msg.getData())); + + Response response = new Response(); + if (msg.getResult() != 0) { + response.setErrorMessage("Response error"); + } + response.setErrorCode(msg.getResult()); + response.setMessageID(msg.getSeq()); + response.setContent(new String(msg.getData())); + callback.onResponse(response); + seq2Callback.remove(msg.getSeq()); + } else { + logger.debug("no callback"); + } + } + + @Override + public void onDisconnect(ChannelHandlerContext ctx) { + for (MsgHandler handle : msgDisconnectHandleList) { + handle.onDisconnect(ctx); + } + } + }; + + private Network network; + private Map> groupId2PeerIpPortList; + private Map availablePeer = new ConcurrentHashMap<>(); + + public ChannelImp(String filepath) { + try { + ConfigOption config = Config.load(filepath); + network = new NetworkImp(config, msgHandler); + } catch (ConfigException e) { + logger.debug("init channel error, {} ", e.getMessage()); + } + } + + @Override + public void addConnectHandler(MsgHandler handler) { + msgConnectHandlerList.add(handler); + } + + @Override + public void addMessageHandler(MsgType type, MsgHandler handler) { + msgHandlers.put(type, handler); + } + + @Override + public void addDisconnectHandler(MsgHandler handler) { + msgDisconnectHandleList.add(handler); + } + + public void setGroupId2PeerIpPortList(Map> groupId2PeerIpPortList) { + this.groupId2PeerIpPortList = groupId2PeerIpPortList; + } + + @Override + public void broadcastToGroup(Message out, String groupId) { + List peerIpPortList = groupId2PeerIpPortList.get(groupId); + for (String peerIpPort : peerIpPortList) { + if (availablePeer.containsKey(peerIpPort)) { + sendToPeer(out, peerIpPort); + } + } + } + + @Override + public void broadcast(Message out) { + availablePeer.forEach( + (peer, ctx) -> { + ctx.writeAndFlush(out); + logger.debug("send message to {} success ", peer); + }); + } + + @Override + public Response sendToPeer(Message out, String peerIpPort) { + class Callback extends ResponseCallback { + public transient Response retResponse; + public transient Semaphore semaphore = new Semaphore(1, true); + + Callback() { + try { + semaphore.acquire(1); + } catch (InterruptedException e) { + logger.error("error :", e); + Thread.currentThread().interrupt(); + } + } + + @Override + public void onResponse(Response response) { + retResponse = response; + + if (retResponse != null && retResponse.getContent() != null) { + logger.debug("response: {}", retResponse.getContent()); + } else { + logger.error("response is null"); + } + + semaphore.release(); + } + } + + Callback callback = new Callback(); + asyncSendToPeer(out, peerIpPort, callback); + try { + callback.semaphore.acquire(1); + } catch (InterruptedException e) { + logger.error("system error:", e); + Thread.currentThread().interrupt(); + } + + return callback.retResponse; + } + + @Override + public Response sendToRandom(Message out) { + class Callback extends ResponseCallback { + public transient Response retResponse; + public transient Semaphore semaphore = new Semaphore(1, true); + + Callback() { + try { + semaphore.acquire(1); + } catch (InterruptedException e) { + logger.error("error :", e); + Thread.currentThread().interrupt(); + } + } + + @Override + public void onResponse(Response response) { + retResponse = response; + + if (retResponse != null && retResponse.getContent() != null) { + logger.debug("response: {}", retResponse.getContent()); + } else { + logger.error("response is null"); + } + + semaphore.release(); + } + } + + Callback callback = new Callback(); + asyncSendToRandom(out, callback); + try { + callback.semaphore.acquire(1); + } catch (InterruptedException e) { + logger.error("system error:", e); + Thread.currentThread().interrupt(); + } + + return callback.retResponse; + } + + @Override + public void asyncSendToPeer(Message out, String peerIpPort, ResponseCallback callback) { + seq2Callback.put(out.getSeq(), callback); + availablePeer.forEach( + (peer, ctx) -> { + if (peer.equals(peerIpPort)) { + ctx.writeAndFlush(out); + logger.debug("send message to {} success ", peer); + } + }); + } + + @Override + public void asyncSendToRandom(Message out, ResponseCallback callback) { + List peerList = getAvailablePeer(); + int random = new SecureRandom().nextInt(peerList.size()); + String peerIpPort = peerList.get(random); + logger.debug("send message to random peer {} ", peerIpPort); + asyncSendToPeer(out, peerIpPort, callback); + } + + @Override + public List getConnectionInfo() { + return network.getConnectionInfo(); + } + + @Override + public List getAvailablePeer() { + List peerList = new ArrayList<>(); + availablePeer.forEach( + (peer, ctx) -> { + peerList.add(peer); + }); + return peerList; + } + + // TODO: remove + @Override + public Response sendToGroup(Message out, String groupId) { + Response response = new Response(); + return response; + } + + // TODO: remove + @Override + public void asyncSendToGroup(Message out, String groupId, ResponseCallback callback) { + return; + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/channel/ChannelMessageError.java b/src/main/java/org/fisco/bcos/sdk/channel/ChannelMessageError.java new file mode 100644 index 000000000..c7e9f6179 --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/channel/ChannelMessageError.java @@ -0,0 +1,34 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.channel; + +public enum ChannelMessageError { + MESSAGE_TIMEOUT(102); // timeout + + private int error; + + private ChannelMessageError(int error) { + this.setError(error); + } + + public int getError() { + return error; + } + + public void setError(int error) { + this.error = error; + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/channel/ResponseCallback.java b/src/main/java/org/fisco/bcos/sdk/channel/ResponseCallback.java index b335f5598..f8a576510 100644 --- a/src/main/java/org/fisco/bcos/sdk/channel/ResponseCallback.java +++ b/src/main/java/org/fisco/bcos/sdk/channel/ResponseCallback.java @@ -15,15 +15,42 @@ package org.fisco.bcos.sdk.channel; +import io.netty.util.Timeout; import org.fisco.bcos.sdk.model.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** ResponseCallback is to define a callback to handle response from node. */ public abstract class ResponseCallback { + private static Logger logger = LoggerFactory.getLogger(ResponseCallback.class); + + private Timeout timeout; + /** * OnResponse * * @param response */ public abstract void onResponse(Response response); + + public void onTimeout() { + logger.error("Processing message timeout:{}"); + + Response response = new Response(); + response.setErrorCode(ChannelMessageError.MESSAGE_TIMEOUT.getError()); + response.setErrorMessage("Processing message timeout"); + + response.setContent(""); + + onResponse(response); + } + + public Timeout getTimeout() { + return timeout; + } + + public void setTimeout(Timeout timeout) { + this.timeout = timeout; + } } diff --git a/src/main/java/org/fisco/bcos/sdk/config/Config.java b/src/main/java/org/fisco/bcos/sdk/config/Config.java index 4a59ea4c4..ce5372897 100644 --- a/src/main/java/org/fisco/bcos/sdk/config/Config.java +++ b/src/main/java/org/fisco/bcos/sdk/config/Config.java @@ -33,7 +33,7 @@ public class Config { * @return ConfigOption * @throws IOException */ - static ConfigOption load(String yamlConfigFile) throws ConfigException { + public static ConfigOption load(String yamlConfigFile) throws ConfigException { // Load a yaml config file to an java object ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); mapper.findAndRegisterModules();