Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.gradle/
.idea/
build
java-sdk.iml

## eclipse ##
.classpath
Expand Down
57 changes: 28 additions & 29 deletions src/main/java/org/fisco/bcos/sdk/channel/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface Channel {
* @return a channel instance
*/
static Channel build(String filepath) {
return null;
return new ChannelImp(filepath);
}

/**
Expand Down Expand Up @@ -65,28 +65,33 @@ static Channel build(String filepath) {
void addDisconnectHandler(MsgHandler handler);

/**
* Synchronize interface, send a message to the given peer, and get the response
* Send a message to the given group
*
* @param out: Message to be sent
* @param peerIpPort: Remote ip:port information
* @param callback: The callback to be called when the response returns
* @return: Remote reply
* @param groupId: ID of the group receiving the message packet
*/
Response sendToPeer(Message out, String peerIpPort);
void broadcastToGroup(Message out, String groupId);

/**
* Synchronize interface, send a message to the given group, and get the response
* Broadcast to all peer
*
* @param out: Message to be sent
* @param groupId: ID of the group receiving the message packet
* @param callback: The callback to be called when the response returns
*/
void broadcast(Message out);

/**
* Synchronize interface, send a message to the given peer, and get the response
*
* @param out: Message to be sent
* @param peerIpPort: Remote ip:port information
* @return: Remote reply
*/
Response sendToGroup(Message out, String groupId);
Response sendToPeer(Message out, String peerIpPort);

/**
* Synchronize interface, randomly select nodes to send messages
*
* @param out: Message to be sent
* @param callback: The callback to be called when the response returns
* @return: Remote reply
*/
Response sendToRandom(Message out);
Expand All @@ -100,28 +105,11 @@ static Channel build(String filepath) {
*/
void asyncSendToPeer(Message out, String peerIpPort, ResponseCallback callback);

/**
* Send to a best peer with highest block height in Group
*
* @param out
* @param groupId
* @param callback
*/
void asyncSendToGroup(Message out, String groupId, ResponseCallback callback);

/**
* Broadcast to all peer
*
* @param out
* @param callback
*/
void broadcast(Message out, ResponseCallback callback);

/**
* Send to an random peer
*
* @param out
* @param callback
* @param callback response callback
*/
void asyncSendToRandom(Message out, ResponseCallback callback);

Expand All @@ -136,4 +124,15 @@ public static String newSeq() {
String seq = UUID.randomUUID().toString().replaceAll("-", "");
return seq;
}

/**
* Get available peer information
*
* @return List of available peer
*/
List<String> getAvailablePeer();

Response sendToGroup(Message out, String groupId);

void asyncSendToGroup(Message out, String groupId, ResponseCallback callback);
}
284 changes: 284 additions & 0 deletions src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
/*
* Copyright 2014-2020 [fisco-dev]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package org.fisco.bcos.sdk.channel;

import io.netty.channel.ChannelHandlerContext;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import org.fisco.bcos.sdk.config.Config;
import org.fisco.bcos.sdk.config.ConfigException;
import org.fisco.bcos.sdk.config.ConfigOption;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.Response;
import org.fisco.bcos.sdk.network.ConnectionInfo;
import org.fisco.bcos.sdk.network.MsgHandler;
import org.fisco.bcos.sdk.network.Network;
import org.fisco.bcos.sdk.network.NetworkImp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An implementation of channel.
*
* @author chaychen
*/
public class ChannelImp implements Channel {

private static Logger logger = LoggerFactory.getLogger(ChannelImp.class);

private List<MsgHandler> msgConnectHandlerList = new ArrayList<>();
private List<MsgHandler> msgDisconnectHandleList = new ArrayList<>();
private Map<MsgType, MsgHandler> msgHandlers = new ConcurrentHashMap<>();
private Map<String, Object> seq2Callback = new ConcurrentHashMap<>();

private MsgHandler msgHandler =
new MsgHandler() {
@Override
public void onConnect(ChannelHandlerContext ctx) {
// TODO:
// queryNodeVersion
// queryBlockNumber
// connection info -> available peers
for (MsgHandler handle : msgConnectHandlerList) {
handle.onConnect(ctx);
}
}

@Override
public void onMessage(ChannelHandlerContext ctx, Message msg) {
// TODO: use msgHandlers to find special type to handle
ResponseCallback callback = (ResponseCallback) seq2Callback.get(msg.getSeq());

if (callback != null) {
if (callback.getTimeout() != null) {
callback.getTimeout().cancel();
}

logger.trace(
" receive response, seq: {}, result: {}, content: {}",
msg.getSeq(),
msg.getResult(),
new String(msg.getData()));

Response response = new Response();
if (msg.getResult() != 0) {
response.setErrorMessage("Response error");
}
response.setErrorCode(msg.getResult());
response.setMessageID(msg.getSeq());
response.setContent(new String(msg.getData()));
callback.onResponse(response);
seq2Callback.remove(msg.getSeq());
} else {
logger.debug("no callback");
}
}

@Override
public void onDisconnect(ChannelHandlerContext ctx) {
for (MsgHandler handle : msgDisconnectHandleList) {
handle.onDisconnect(ctx);
}
}
};

private Network network;
private Map<String, List<String>> groupId2PeerIpPortList;
private Map<String, ChannelHandlerContext> availablePeer = new ConcurrentHashMap<>();

public ChannelImp(String filepath) {
try {
ConfigOption config = Config.load(filepath);
network = new NetworkImp(config, msgHandler);
} catch (ConfigException e) {
logger.debug("init channel error, {} ", e.getMessage());
}
}

@Override
public void addConnectHandler(MsgHandler handler) {
msgConnectHandlerList.add(handler);
}

@Override
public void addMessageHandler(MsgType type, MsgHandler handler) {
msgHandlers.put(type, handler);
}

@Override
public void addDisconnectHandler(MsgHandler handler) {
msgDisconnectHandleList.add(handler);
}

public void setGroupId2PeerIpPortList(Map<String, List<String>> groupId2PeerIpPortList) {
this.groupId2PeerIpPortList = groupId2PeerIpPortList;
}

@Override
public void broadcastToGroup(Message out, String groupId) {
List<String> peerIpPortList = groupId2PeerIpPortList.get(groupId);
for (String peerIpPort : peerIpPortList) {
if (availablePeer.containsKey(peerIpPort)) {
sendToPeer(out, peerIpPort);
}
}
}

@Override
public void broadcast(Message out) {
availablePeer.forEach(
(peer, ctx) -> {
ctx.writeAndFlush(out);
logger.debug("send message to {} success ", peer);
});
}

@Override
public Response sendToPeer(Message out, String peerIpPort) {
class Callback extends ResponseCallback {
public transient Response retResponse;
public transient Semaphore semaphore = new Semaphore(1, true);

Callback() {
try {
semaphore.acquire(1);
} catch (InterruptedException e) {
logger.error("error :", e);
Thread.currentThread().interrupt();
}
}

@Override
public void onResponse(Response response) {
retResponse = response;

if (retResponse != null && retResponse.getContent() != null) {
logger.debug("response: {}", retResponse.getContent());
} else {
logger.error("response is null");
}

semaphore.release();
}
}

Callback callback = new Callback();
asyncSendToPeer(out, peerIpPort, callback);
try {
callback.semaphore.acquire(1);
} catch (InterruptedException e) {
logger.error("system error:", e);
Thread.currentThread().interrupt();
}

return callback.retResponse;
}

@Override
public Response sendToRandom(Message out) {
class Callback extends ResponseCallback {
public transient Response retResponse;
public transient Semaphore semaphore = new Semaphore(1, true);

Callback() {
try {
semaphore.acquire(1);
} catch (InterruptedException e) {
logger.error("error :", e);
Thread.currentThread().interrupt();
}
}

@Override
public void onResponse(Response response) {
retResponse = response;

if (retResponse != null && retResponse.getContent() != null) {
logger.debug("response: {}", retResponse.getContent());
} else {
logger.error("response is null");
}

semaphore.release();
}
}

Callback callback = new Callback();
asyncSendToRandom(out, callback);
try {
callback.semaphore.acquire(1);
} catch (InterruptedException e) {
logger.error("system error:", e);
Thread.currentThread().interrupt();
}

return callback.retResponse;
}

@Override
public void asyncSendToPeer(Message out, String peerIpPort, ResponseCallback callback) {
seq2Callback.put(out.getSeq(), callback);
availablePeer.forEach(
(peer, ctx) -> {
if (peer.equals(peerIpPort)) {
ctx.writeAndFlush(out);
logger.debug("send message to {} success ", peer);
}
});
}

@Override
public void asyncSendToRandom(Message out, ResponseCallback callback) {
List<String> peerList = getAvailablePeer();
int random = new SecureRandom().nextInt(peerList.size());
String peerIpPort = peerList.get(random);
logger.debug("send message to random peer {} ", peerIpPort);
asyncSendToPeer(out, peerIpPort, callback);
}

@Override
public List<ConnectionInfo> getConnectionInfo() {
return network.getConnectionInfo();
}

@Override
public List<String> getAvailablePeer() {
List<String> peerList = new ArrayList<>();
availablePeer.forEach(
(peer, ctx) -> {
peerList.add(peer);
});
return peerList;
}

// TODO: remove
@Override
public Response sendToGroup(Message out, String groupId) {
Response response = new Response();
return response;
}

// TODO: remove
@Override
public void asyncSendToGroup(Message out, String groupId, ResponseCallback callback) {
return;
}
}
Loading