Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/ci_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ LOG_INFO() {
check_basic()
{
# check code format
bash gradlew verifyGoogleJavaFormat
# bash gradlew verifyGoogleJavaFormat
# build
bash gradlew build --info
}
Expand Down
17 changes: 3 additions & 14 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Apply the java-library plugin to add support for Java Library
plugins {
id 'com.github.sherter.google-java-format' version '0.8'
id 'maven-publish'
}
println("Notice: current gradle version is " + gradle.gradleVersion)
Expand All @@ -20,10 +19,10 @@ ext {
javapoetVersion = "1.7.0"
picocliVersion = "3.6.0"
nettyVersion = "4.1.50.Final"
nettySMSSLContextVersion = "1.1.0"
nettySMSSLContextVersion = "1.2.0"
toml4jVersion = "0.7.2"
bcprovJDK15onVersion = "1.60"
keyMiniToolkit = "1.0.0"
keyMiniToolkit = "1.0.2-SNAPSHOT"

solcJVersion = "0.4.25.1"
//solcJVersion = "0.5.2.0"
Expand All @@ -38,7 +37,7 @@ ext {
// integrationTest.mustRunAfter test
allprojects {
group = 'org.fisco-bcos.java-sdk'
version = '2.6.1-rc1'
version = '2.6.1-SNAPSHOT'
apply plugin: 'maven'
apply plugin: 'maven-publish'
apply plugin: 'idea'
Expand Down Expand Up @@ -186,16 +185,6 @@ dependencies {
testCompile ("org.fisco-bcos:solcJ:${solcJVersion}")
testCompile ("com.google.guava:guava:${guavaVersion}")
}
googleJavaFormat {
options style: 'AOSP'
source = sourceSets*.allJava
include '**/*.java'
}

verifyGoogleJavaFormat {
source = sourceSets*.allJava
include '**/*.java'
}

javadoc {
options.addStringOption('Xdoclint:none', '-quiet')
Expand Down
14 changes: 1 addition & 13 deletions sdk-amop/src/main/java/org/fisco/bcos/sdk/amop/Amop.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.UUID;
import org.fisco.bcos.sdk.amop.topic.TopicManager;
import org.fisco.bcos.sdk.channel.Channel;
import org.fisco.bcos.sdk.channel.ResponseCallback;
import org.fisco.bcos.sdk.config.ConfigOption;
import org.fisco.bcos.sdk.crypto.keystore.KeyTool;

Expand Down Expand Up @@ -81,7 +80,7 @@ static Amop build(Channel channel, ConfigOption config) {
* @param content the sent message
* @param callback the callback that will be called when receive the AMOP response
*/
void sendAmopMsg(AmopMsgOut content, ResponseCallback callback);
void sendAmopMsg(AmopMsgOut content, AmopResponseCallback callback);

/**
* Send amop msg
Expand All @@ -97,14 +96,6 @@ static Amop build(Channel channel, ConfigOption config) {
*/
Set<String> getSubTopics();

/**
* Get list of subscribers to a topic
*
* @param topicName the topic you want to query
* @return List of subscribers
*/
List<String> getTopicSubscribers(String topicName);

/**
* set amop default callback
*
Expand All @@ -118,9 +109,6 @@ static Amop build(Channel channel, ConfigOption config) {
/** Stop. */
void stop();

/** If configured private topic, wait until finish verify */
void waitFinishPrivateTopicVerify();

/**
* generate message sequence string
*
Expand Down
23 changes: 11 additions & 12 deletions sdk-amop/src/main/java/org/fisco/bcos/sdk/amop/AmopImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.fisco.bcos.sdk.model.AmopMsg;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.Response;
import org.fisco.bcos.sdk.utils.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void unsubscribeTopic(String topicName) {
}

@Override
public void sendAmopMsg(AmopMsgOut content, ResponseCallback callback) {
public void sendAmopMsg(AmopMsgOut content, AmopResponseCallback callback) {
if (!topicManager.canSendTopicMsg(content)) {
logger.error(
"can not send this amop private msg out, you have not configured the public keys. topic:{}",
Expand All @@ -110,7 +111,15 @@ public void sendAmopMsg(AmopMsgOut content, ResponseCallback callback) {
msg.setData(content.getContent());
Options ops = new Options();
ops.setTimeout(content.getTimeout());
this.channel.asyncSendToRandom(msg, callback, ops);
ResponseCallback cb =
new ResponseCallback() {
@Override
public void onResponse(Response response) {
AmopResponse amopResponse = new AmopResponse(response);
callback.onResponse(amopResponse);
}
};
this.channel.asyncSendToRandom(msg, cb, ops);
logger.info(
"send amop msg to a random peer, seq{} topic{}", msg.getSeq(), content.getTopic());
}
Expand Down Expand Up @@ -141,11 +150,6 @@ public Set<String> getSubTopics() {
return topicManager.getTopicNames();
}

@Override
public List<String> getTopicSubscribers(String topicName) {
return null;
}

@Override
public void setCallback(AmopCallback cb) {
topicManager.setCallback(cb);
Expand All @@ -165,11 +169,6 @@ public void stop() {
unSubscribeAll();
}

@Override
public void waitFinishPrivateTopicVerify() {
// todo add wait function
}

private void unSubscribeAll() {
List<String> peers = this.channel.getAvailablePeer();
logger.info("unsubscribe all topics, inform {} peers", peers.size());
Expand Down
93 changes: 93 additions & 0 deletions sdk-amop/src/main/java/org/fisco/bcos/sdk/amop/AmopResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.fisco.bcos.sdk.amop;

import io.netty.channel.ChannelHandlerContext;
import org.fisco.bcos.sdk.amop.topic.AmopMsgHandler;
import org.fisco.bcos.sdk.amop.topic.AmopMsgIn;
import org.fisco.bcos.sdk.amop.topic.TopicType;
import org.fisco.bcos.sdk.model.AmopMsg;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmopResponse {
private static Logger logger = LoggerFactory.getLogger(AmopResponse.class);
private Integer errorCode;
private String errorMessage;
private String messageID;
private ChannelHandlerContext ctx;
private AmopMsgIn amopMsgIn;

public AmopResponse(Response response) {
this.setCtx(response.getCtx());
this.setErrorCode(response.getErrorCode());
this.setErrorMessage(response.getErrorMessage());
this.setMessageID(response.getMessageID());
if (response.getErrorCode().intValue() == 0) {
amopMsgIn = new AmopMsgIn();
Message msg = new Message();
msg.setSeq(response.getMessageID());
msg.setResult(response.getErrorCode());
msg.setType((short) MsgType.AMOP_RESPONSE.getType());
msg.setData(response.getContentBytes());
AmopMsg amopMsg = new AmopMsg(msg);
try {
amopMsg.decodeAmopBody(response.getContentBytes());
} catch (Exception e) {
logger.error("Receive an invalid amop response, seq:{}", response.getMessageID());
return;
}
amopMsgIn.setTopic(amopMsg.getTopic());
if (AmopMsgHandler.isPrivateTopic(amopMsg.getTopic())) {
amopMsgIn.setTopicType(TopicType.PRIVATE_TOPIC);
amopMsgIn.setTopic(AmopMsgHandler.removePrivateTopicPrefix(amopMsg.getTopic()));
}
amopMsgIn.setContent(amopMsg.getData());
amopMsgIn.setMessageID(response.getMessageID());
amopMsgIn.setCtx(response.getCtx());
amopMsgIn.setType((short) MsgType.AMOP_RESPONSE.getType());
amopMsgIn.setResult(response.getErrorCode());
}
}

public AmopMsgIn getAmopMsgIn() {
return amopMsgIn;
}

public void setAmopMsgIn(AmopMsgIn amopMsgIn) {
this.amopMsgIn = amopMsgIn;
}

public Integer getErrorCode() {
return errorCode;
}

public void setErrorCode(Integer errorCode) {
this.errorCode = errorCode;
}

public String getErrorMessage() {
return errorMessage;
}

public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}

public String getMessageID() {
return messageID;
}

public void setMessageID(String messageID) {
this.messageID = messageID;
}

public ChannelHandlerContext getCtx() {
return ctx;
}

public void setCtx(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.fisco.bcos.sdk.amop;

import io.netty.util.Timeout;

public abstract class AmopResponseCallback {
private Timeout timeout;

public Timeout getTimeout() {
return timeout;
}

public void setTimeout(Timeout timeout) {
this.timeout = timeout;
}

public abstract void onResponse(AmopResponse response);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.fisco.bcos.sdk.amop.topic;

import static org.fisco.bcos.sdk.amop.topic.TopicManager.topicNeedVerifyPrefix;
import static org.fisco.bcos.sdk.amop.topic.TopicManager.verifyChannelPrefix;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -235,6 +236,19 @@ private boolean isVerifyingPrivateTopic(AmopMsg amopMsg) {
amopMsg.getTopic().substring(0, verifyChannelPrefix.length()));
}

public static boolean isPrivateTopic(String topic) {
return topic.length() > topicNeedVerifyPrefix.length()
&& topicNeedVerifyPrefix.equals(topic.substring(0, topicNeedVerifyPrefix.length()));
}

public static String removePrivateTopicPrefix(String topic) {
if (isPrivateTopic(topic)) {
return topic.substring(topicNeedVerifyPrefix.length());
} else {
return topic;
}
}

private String getSimpleTopic(String fullTopic) {
return fullTopic.substring(verifyChannelPrefix.length(), fullTopic.length() - 33);
}
Expand Down Expand Up @@ -284,6 +298,10 @@ public void onAmopMsg(ChannelHandlerContext ctx, AmopMsg amopMsg) {
}
AmopMsgIn msgIn = new AmopMsgIn();
msgIn.setTopic(amopMsg.getTopic());
if (isPrivateTopic(amopMsg.getTopic())) {
msgIn.setTopic(removePrivateTopicPrefix(amopMsg.getTopic()));
msgIn.setTopicType(TopicType.PRIVATE_TOPIC);
}
msgIn.setMessageID(amopMsg.getSeq());
msgIn.setContent(amopMsg.getData());
msgIn.setResult(amopMsg.getResult());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class AmopMsgIn {
private String topic;
private Integer result;
protected Short type = 0;
private TopicType topicType = TopicType.NORMAL_TOPIC;
private ChannelHandlerContext ctx;

public String getMessageID() {
Expand Down Expand Up @@ -60,4 +61,12 @@ public Short getType() {
public void setType(Short type) {
this.type = type;
}

public TopicType getTopicType() {
return topicType;
}

public void setTopicType(TopicType topicType) {
this.topicType = topicType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import java.util.Random;
import org.fisco.bcos.sdk.amop.Amop;
import org.fisco.bcos.sdk.amop.AmopMsgOut;
import org.fisco.bcos.sdk.amop.AmopResponse;
import org.fisco.bcos.sdk.amop.AmopResponseCallback;
import org.fisco.bcos.sdk.amop.topic.TopicType;
import org.fisco.bcos.sdk.channel.ResponseCallback;
import org.fisco.bcos.sdk.model.Response;

public class AmopMsgBuilder {

Expand All @@ -17,10 +17,10 @@ public void sendMsg(
out.setTimeout(5000);
out.setContent(getRandomBytes(contentLen));

ResponseCallback callback =
new ResponseCallback() {
AmopResponseCallback callback =
new AmopResponseCallback() {
@Override
public void onResponse(Response response) {
public void onResponse(AmopResponse response) {
collector.addResponse();
if (response.getErrorCode() != 0) {
System.out.println(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ public class PerformanceAmop {
private static final String senderConfig =
PerformanceAmop.class
.getClassLoader()
.getResource("config-sender-for-test.toml")
.getResource("amop/config-publisher-for-test.toml")
.getPath();
private static final String subscriberConfig =
PerformanceAmop.class
.getClassLoader()
.getResource("config-subscriber-for-test.toml")
.getResource("amop/config-subscriber-for-test.toml")
.getPath();
private static AtomicInteger sendedMsg = new AtomicInteger(0);
private static AmopMsgBuilder msgBuilder = new AmopMsgBuilder();

/** @param args count, qps, msgSize */
public static void main(String[] args) {
try {
Integer count = Integer.valueOf(args[0]);
Expand Down
Loading