Skip to content

Commit

Permalink
bug fix: getsubscribers verify topic bug (#748)
Browse files Browse the repository at this point in the history
  • Loading branch information
caseone committed Mar 15, 2021
1 parent a6e7c16 commit 692b146
Showing 1 changed file with 32 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.fisco.bcos.sdk.amop.AmopMsgOut;
import org.fisco.bcos.sdk.amop.AmopResponse;
import org.fisco.bcos.sdk.amop.topic.AmopMsgIn;
import org.fisco.bcos.sdk.amop.topic.TopicManager;
import org.fisco.bcos.sdk.amop.topic.TopicType;
import org.fisco.bcos.sdk.client.protocol.response.NodeInfo;
import org.fisco.bcos.sdk.client.protocol.response.Peers;
Expand Down Expand Up @@ -115,18 +116,21 @@ public Set<String> getSubVerifyTopics() {
}

public Set<String> getSubscribers(String topic) {
Integer groupId = Integer.parseInt(WeEvent.DEFAULT_GROUP_ID);
Integer groupId = Integer.parseInt(WeEvent.DEFAULT_GROUP_ID);
Set<String> subscriberIPs = new HashSet<>();
Set<String> subscriberNodeIds = new HashSet<>();

try {
List<PeerInfo> peers = this.bcosSDK.getClient(groupId).getPeers().getPeers();
log.info("getSubscribers: peers {}, {}", topic, peers);
for (Peers.PeerInfo peer : peers){
if(peer.getTopic().contains(topic)){
subscriberIPs.add(peer.getIpAndPort());
subscriberNodeIds.add(peer.getNodeID());
log.info("subscribers peer {}, {}", topic, peer.getNodeID(), peer.getIpAndPort());
for (Peers.PeerInfo peer : peers) {
for (String subTopic : peer.getTopic()) {
if (matchTopic(subTopic, topic)) {
subscriberIPs.add(peer.getIpAndPort());
subscriberNodeIds.add(peer.getNodeID());
log.info("subscribers peer {}, {}", topic, peer.getNodeID(), peer.getIpAndPort());
break;
}
}
}
} catch (Exception e) {
Expand All @@ -136,7 +140,7 @@ public Set<String> getSubscribers(String topic) {

List<String> connectedNodes = this.bcosSDK.getConfig().getNetworkConfig().getPeers();
log.info("getSubscribers: nodes {}", connectedNodes);
for (String nodeIp : connectedNodes){
for (String nodeIp : connectedNodes) {
NodeInfo.NodeInformation nodeInfo = null;
try {
nodeInfo = this.bcosSDK.getClient(groupId).getNodeInfo(nodeIp).getNodeInfo();
Expand All @@ -145,10 +149,13 @@ public Set<String> getSubscribers(String topic) {
continue;
}
log.info("getSubscribers: node {}, {}, {}", topic, nodeIp, nodeInfo);
if(nodeInfo.getTopic().contains(topic)){
subscriberIPs.add(nodeInfo.getIpAndPort());
subscriberNodeIds.add(nodeInfo.getNodeID());
log.info("subscribers node {}, {}", topic, nodeInfo.getNodeID(), nodeInfo.getIpAndPort());
for (String subTopic : nodeInfo.getTopic()) {
if (matchTopic(subTopic, topic)) {
subscriberIPs.add(nodeInfo.getIpAndPort());
subscriberNodeIds.add(nodeInfo.getNodeID());
log.info("subscribers node {}, {}", topic, nodeInfo.getNodeID(), nodeInfo.getIpAndPort());
}
break;
}
}
log.info("getSubscribers:{} {}", subscriberIPs, subscriberNodeIds);
Expand Down Expand Up @@ -404,7 +411,7 @@ public String switchTopic(String topic) throws BrokerException {
public FileChunksMeta getNewFileChunksMeta(FileChunksMeta fileChunksMeta) throws BrokerException {
// get old topic from new topic
String topic = null;
for(Map.Entry<String, String> entry : this.old2NewTopic.entrySet()) {
for (Map.Entry<String, String> entry : this.old2NewTopic.entrySet()) {
if (entry.getValue().equals(fileChunksMeta.getTopic())) {
topic = entry.getKey();
}
Expand Down Expand Up @@ -461,7 +468,7 @@ public AmopResponse sendEvent(String topic, FileEvent fileEvent) throws BrokerEx
break;
}
Thread.sleep(1000);
log.warn("send amop message failed, retry count: {}.", i+1);
log.warn("send amop message failed, retry count: {}.", i + 1);
}

sw.stop();
Expand Down Expand Up @@ -618,4 +625,16 @@ public byte[] receiveAmopMsg(AmopMsgIn msg) {
return DataTypeUtils.toChannelResponse(ErrorCode.UNKNOWN_ERROR);
}
}

public static boolean matchTopic(String pattenTopic, String topic) {
if (pattenTopic == topic) {
return true;
}
//verity topic
String verifyTopicPre = TopicManager.verifyChannelPrefix + TopicManager.topicNeedVerifyPrefix + topic;
if (pattenTopic.startsWith(verifyTopicPre)) {
return true;
}
return false;
}
}

0 comments on commit 692b146

Please sign in to comment.