From edaf1f25d8892216aace3a231f293a9a8b808828 Mon Sep 17 00:00:00 2001 From: octopus <912554887@qq.com> Date: Tue, 30 Nov 2021 12:50:44 +0800 Subject: [PATCH] add event sub impl --- .../bcos/sdk/codec/abi/tools/TopicTools.java | 17 +- .../org/fisco/bcos/sdk/model/EventLog.java | 90 +------ .../bcos/sdk/eventsub/EventLogParams.java | 227 ------------------ ...entCallback.java => EventSubCallback.java} | 5 +- .../bcos/sdk/eventsub/EventSubParams.java | 99 ++++++++ ...LogResponse.java => EventSubResponse.java} | 34 +-- ...odeRespStatus.java => EventSubStatus.java} | 14 +- .../bcos/sdk/eventsub/EventSubscribe.java | 23 +- .../bcos/sdk/eventsub/EventSubscribeImp.java | 103 +++++++- .../eventsub/exception/EventSubException.java | 30 --- .../fisco/bcos/sdk/service/GroupService.java | 72 ------ .../bcos/sdk/service/GroupServiceImpl.java | 206 ---------------- 12 files changed, 266 insertions(+), 654 deletions(-) delete mode 100644 sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventLogParams.java rename sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/{EventCallback.java => EventSubCallback.java} (86%) create mode 100644 sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubParams.java rename sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/{filter/EventLogResponse.java => EventSubResponse.java} (67%) rename sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/{filter/EventSubNodeRespStatus.java => EventSubStatus.java} (86%) delete mode 100644 sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/exception/EventSubException.java delete mode 100644 sdk-service/src/main/java/org/fisco/bcos/sdk/service/GroupService.java delete mode 100644 sdk-service/src/main/java/org/fisco/bcos/sdk/service/GroupServiceImpl.java diff --git a/sdk-codec/src/main/java/org/fisco/bcos/sdk/codec/abi/tools/TopicTools.java b/sdk-codec/src/main/java/org/fisco/bcos/sdk/codec/abi/tools/TopicTools.java index c5a78f229..9c89e02d3 100644 --- a/sdk-codec/src/main/java/org/fisco/bcos/sdk/codec/abi/tools/TopicTools.java +++ b/sdk-codec/src/main/java/org/fisco/bcos/sdk/codec/abi/tools/TopicTools.java @@ -1,6 +1,8 @@ package org.fisco.bcos.sdk.codec.abi.tools; import java.math.BigInteger; +import java.util.Objects; + import org.fisco.bcos.sdk.codec.abi.TypeEncoder; import org.fisco.bcos.sdk.codec.datatypes.Bytes; import org.fisco.bcos.sdk.crypto.CryptoSuite; @@ -10,7 +12,7 @@ public class TopicTools { public static final int MAX_NUM_TOPIC_EVENT_LOG = 4; - public static final String LATEST = "latest"; + public static final int TOPIC_LENGTH = 64; private final CryptoSuite cryptoSuite; @@ -22,6 +24,19 @@ public String integerToTopic(BigInteger i) { return Numeric.toHexStringWithPrefixZeroPadded(i, 64); } + public static boolean validTopic(String topic) { + if (Objects.isNull(topic)) { + return false; + } + + if (topic.startsWith("0x") || topic.startsWith("0X")) { + return topic.length() == (TOPIC_LENGTH + 2); + } + + return topic.length() == TOPIC_LENGTH; + } + + public String boolToTopic(boolean b) { if (b) { return Numeric.toHexStringWithPrefixZeroPadded(BigInteger.ONE, 64); diff --git a/sdk-core/src/main/java/org/fisco/bcos/sdk/model/EventLog.java b/sdk-core/src/main/java/org/fisco/bcos/sdk/model/EventLog.java index 266b9dca6..61d097f91 100644 --- a/sdk-core/src/main/java/org/fisco/bcos/sdk/model/EventLog.java +++ b/sdk-core/src/main/java/org/fisco/bcos/sdk/model/EventLog.java @@ -17,42 +17,35 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import java.math.BigInteger; import java.util.List; +import java.util.Objects; + import org.fisco.bcos.sdk.utils.Numeric; public class EventLog { - private boolean removed; private String logIndex; private String transactionIndex; private String transactionHash; - private String blockHash; private String blockNumber; private String address; private String data; - private String type; private List topics; public EventLog() {} public EventLog( - boolean removed, String logIndex, String transactionIndex, String transactionHash, - String blockHash, String blockNumber, String address, String data, - String type, List topics) { - this.removed = removed; this.logIndex = logIndex; this.transactionIndex = transactionIndex; this.transactionHash = transactionHash; - this.blockHash = blockHash; this.blockNumber = blockNumber; this.address = address; this.data = data; - this.type = type; this.topics = topics; } @@ -61,15 +54,6 @@ public EventLog(String data, List topics) { this.topics = topics; } - @JsonIgnore - public boolean isRemoved() { - return removed; - } - - public void setRemoved(boolean removed) { - this.removed = removed; - } - public BigInteger getLogIndex() { return convert(logIndex); } @@ -104,14 +88,6 @@ public void setTransactionHash(String transactionHash) { this.transactionHash = transactionHash; } - public String getBlockHash() { - return blockHash; - } - - public void setBlockHash(String blockHash) { - this.blockHash = blockHash; - } - public BigInteger getBlockNumber() { return convert(blockNumber); } @@ -141,15 +117,6 @@ public void setData(String data) { this.data = data; } - @JsonIgnore - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - public List getTopics() { return topics; } @@ -177,9 +144,6 @@ public boolean equals(Object o) { EventLog log = (EventLog) o; - if (isRemoved() != log.isRemoved()) { - return false; - } if (getLogIndexRaw() != null ? !getLogIndexRaw().equals(log.getLogIndexRaw()) : log.getLogIndexRaw() != null) { @@ -195,11 +159,6 @@ public boolean equals(Object o) { : log.getTransactionHash() != null) { return false; } - if (getBlockHash() != null - ? !getBlockHash().equals(log.getBlockHash()) - : log.getBlockHash() != null) { - return false; - } if (getBlockNumberRaw() != null ? !getBlockNumberRaw().equals(log.getBlockNumberRaw()) : log.getBlockNumberRaw() != null) { @@ -213,49 +172,24 @@ public boolean equals(Object o) { if (getData() != null ? !getData().equals(log.getData()) : log.getData() != null) { return false; } - if (getType() != null ? !getType().equals(log.getType()) : log.getType() != null) { - return false; - } return getTopics() != null ? getTopics().equals(log.getTopics()) : log.getTopics() == null; } @Override public int hashCode() { - int result = (isRemoved() ? 1 : 0); - result = 31 * result + (getLogIndexRaw() != null ? getLogIndexRaw().hashCode() : 0); - result = - 31 * result - + (getTransactionIndexRaw() != null - ? getTransactionIndexRaw().hashCode() - : 0); - result = 31 * result + (getTransactionHash() != null ? getTransactionHash().hashCode() : 0); - result = 31 * result + (getBlockHash() != null ? getBlockHash().hashCode() : 0); - result = 31 * result + (getBlockNumberRaw() != null ? getBlockNumberRaw().hashCode() : 0); - result = 31 * result + (getAddress() != null ? getAddress().hashCode() : 0); - result = 31 * result + (getData() != null ? getData().hashCode() : 0); - result = 31 * result + (getType() != null ? getType().hashCode() : 0); - result = 31 * result + (getTopics() != null ? getTopics().hashCode() : 0); - return result; + return Objects.hash(logIndex, transactionIndex, transactionHash, blockNumber, address, data, topics); } @Override public String toString() { - return "Log [logIndex=" - + logIndex - + ", transactionIndex=" - + transactionIndex - + ", transactionHash=" - + transactionHash - + ", blockHash=" - + blockHash - + ", blockNumber=" - + blockNumber - + ", address=" - + address - + ", data=" - + data - + ", topics=" - + topics - + "]"; + return "EventLog{" + + "logIndex='" + logIndex + '\'' + + ", transactionIndex='" + transactionIndex + '\'' + + ", transactionHash='" + transactionHash + '\'' + + ", blockNumber='" + blockNumber + '\'' + + ", address='" + address + '\'' + + ", data='" + data + '\'' + + ", topics=" + topics + + '}'; } } diff --git a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventLogParams.java b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventLogParams.java deleted file mode 100644 index 4334d3b53..000000000 --- a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventLogParams.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Copyright 2014-2020 [fisco-dev] - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * - */ - -package org.fisco.bcos.sdk.eventsub; - -import java.math.BigInteger; -import java.util.List; -import org.fisco.bcos.sdk.codec.abi.tools.TopicTools; -import org.fisco.bcos.sdk.utils.AddressUtils; -import org.fisco.bcos.sdk.utils.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EventLogParams { - private static Logger logger = LoggerFactory.getLogger(EventLogParams.class); - private String fromBlock; - private String toBlock; - private List addresses; - private List topics; - - public static Logger getLogger() { - return logger; - } - - public static void setLogger(Logger logger) { - EventLogParams.logger = logger; - } - - public String getFromBlock() { - return fromBlock; - } - - public void setFromBlock(String fromBlock) { - this.fromBlock = fromBlock; - } - - public String getToBlock() { - return toBlock; - } - - public void setToBlock(String toBlock) { - this.toBlock = toBlock; - } - - public List getAddresses() { - return addresses; - } - - public void setAddresses(List addresses) { - this.addresses = addresses; - } - - public List getTopics() { - return topics; - } - - public void setTopics(List topics) { - this.topics = topics; - } - - @Override - public String toString() { - return "EventLogFilterParams [fromBlock=" - + fromBlock - + ", toBlock=" - + toBlock - + ", addresses=" - + addresses - + ", topics=" - + topics - + "]"; - } - - public boolean checkAddresses() { - if (getAddresses() == null) { - return false; - } - for (String address : getAddresses()) { - // check if address valid - if (!AddressUtils.isValidAddress(address)) { - return false; - } - } - return true; - } - - /** - * check if topics valid - * - * @return - */ - private boolean checkTopics() { - - // topics - if ((getTopics() == null) || (getTopics().size() > TopicTools.MAX_NUM_TOPIC_EVENT_LOG)) { - return false; - } - - for (Object topic : getTopics()) { - if (topic == null) { - continue; - } - if (topic instanceof String) { - // if valid topic - if (((String) topic).isEmpty()) { - return false; - } - } else if (topic instanceof List) { - for (Object o : (List) topic) { - // if valid topic - if (((String) o).isEmpty()) { - return false; - } - } - } else { - return false; - } - } - - return true; - } - - private boolean checkToBlock(BigInteger blockNumber) { - - // fromBlock="latest" but toBlock is not - // range in (latest,10), blockNumber 1 ok, 6 ok, 11 fail - BigInteger toBlock = new BigInteger(getToBlock()); - return (blockNumber.compareTo(BigInteger.ONE) <= 0) - || (blockNumber.compareTo(BigInteger.ONE) > 0 - && (toBlock.compareTo(blockNumber)) > 0); - } - - private boolean checkFromBlock(BigInteger blockNumber) { - - // toBlock="latest" but fromBlock is not - // range in (10,latest), blockNumber 6 ok, 11 fail - BigInteger fromBlock = new BigInteger(getFromBlock()); - // fromBlock is bigger than block number of the blockchain - if (fromBlock.compareTo(BigInteger.ZERO) <= 0) { - return false; - } - - if (blockNumber.compareTo(BigInteger.ONE) > 0 && (fromBlock.compareTo(blockNumber) > 0)) { - logger.info( - " future block range request, from: {}, to: {}", getFromBlock(), getToBlock()); - } - - return true; - } - - private boolean checkFromToBlock(BigInteger blockNumber) throws NumberFormatException { - - // fromBlock and toBlock none is "latest" - // range in (10,20), blockNumber 6 ok, 11 ok, 25 ok - BigInteger fromBlock = new BigInteger(getFromBlock()); - BigInteger toBlock = new BigInteger(getToBlock()); - - if ((fromBlock.compareTo(BigInteger.ZERO) <= 0) || (fromBlock.compareTo(toBlock) > 0)) { - return false; - } else { - if (blockNumber.compareTo(BigInteger.ONE) > 0 - && (fromBlock.compareTo(blockNumber) > 0)) { - logger.info( - " future block range request, from: {}, to: {}", - getFromBlock(), - getToBlock()); - } - return true; - } - } - - /** - * check if valid fromBlock and toBlock - * - * @param blockNumber - * @return - */ - private boolean checkBlockRange(BigInteger blockNumber) { - - if (StringUtils.isEmpty(getFromBlock()) || StringUtils.isEmpty(getToBlock())) { - return false; - } - - boolean isValidBlockRange = true; - - try { - if (getFromBlock().equals(TopicTools.LATEST) - && !getToBlock().equals(TopicTools.LATEST)) { - // fromBlock="latest" but toBlock is not - isValidBlockRange = checkToBlock(blockNumber); - } else if (!getFromBlock().equals(TopicTools.LATEST) - && getToBlock().equals(TopicTools.LATEST)) { - // toBlock="latest" but fromBlock is not - isValidBlockRange = checkFromBlock(blockNumber); - } else if (!getFromBlock().equals(TopicTools.LATEST) - && !getToBlock().equals(TopicTools.LATEST)) { - isValidBlockRange = checkFromToBlock(blockNumber); - } - } catch (Exception e) { - // invalid blockNumber format string - isValidBlockRange = false; - } - - return isValidBlockRange; - } - - /** - * @param blockNumber block number of blockchain - * @return check 3 params - */ - @SuppressWarnings("unchecked") - public boolean checkParams(BigInteger blockNumber) { - return checkBlockRange(blockNumber) && checkAddresses() && checkTopics(); - } -} diff --git a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventCallback.java b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubCallback.java similarity index 86% rename from sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventCallback.java rename to sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubCallback.java index 545eb1ad5..140fcbcc3 100644 --- a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventCallback.java +++ b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubCallback.java @@ -19,14 +19,15 @@ import org.fisco.bcos.sdk.model.EventLog; /** Event callback */ -public interface EventCallback { +public interface EventSubCallback { /** * onReceiveLog called when sdk receive any response of the target subscription. logs will be * parsed by the user through the ABI module. * + * @param eventSubId the event sub task id. * @param status the status that peer response to sdk. * @param logs logs from the message. */ - void onReceiveLog(int status, List logs); + void onReceiveLog(String eventSubId, int status, List logs); } diff --git a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubParams.java b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubParams.java new file mode 100644 index 000000000..6f9ca9084 --- /dev/null +++ b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubParams.java @@ -0,0 +1,99 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.eventsub; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import org.fisco.bcos.sdk.codec.abi.tools.TopicTools; + +public class EventSubParams { + + private BigInteger fromBlock = BigInteger.valueOf(-1); + private BigInteger toBlock = BigInteger.valueOf(-1); + private List addresses = new ArrayList<>(); + private List> topics = new ArrayList>() {{ add(null); add(null); add(null); add(null);}}; + + public BigInteger getFromBlock() { + return fromBlock; + } + + public void setFromBlock(BigInteger fromBlock) { + this.fromBlock = fromBlock; + } + + public BigInteger getToBlock() { + return toBlock; + } + + public void setToBlock(BigInteger toBlock) { + this.toBlock = toBlock; + } + + public List getAddresses() { + return addresses; + } + + public List> getTopics() { + return topics; + } + + public boolean addAddress(String addr) { + // TODO: check address valid + return this.addresses.add(addr); + } + + public boolean addTopic(int index, String topic) { + if (!TopicTools.validTopic(topic)) { + return false; + } + + if (index >= TopicTools.MAX_NUM_TOPIC_EVENT_LOG) { + return false; + } + + List strings = this.topics.get(index); + if (strings == null) { + strings = new ArrayList<>(); + this.topics.set(index, strings); + } + + strings.add(topic); + return true; + } + + /** + * @return check params + */ + @SuppressWarnings("unchecked") + public boolean checkParams() { + if (fromBlock.compareTo(BigInteger.ZERO) > 0 && toBlock.compareTo(BigInteger.ZERO) > 0) { + return fromBlock.compareTo(toBlock) <= 0; + } + return true; + } + + @Override + public String toString() { + return "EventLogParams{" + + "fromBlock=" + fromBlock + + ", toBlock=" + toBlock + + ", addresses=" + addresses + + ", topics=" + topics + + '}'; + } + +} diff --git a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogResponse.java b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubResponse.java similarity index 67% rename from sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogResponse.java rename to sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubResponse.java index f211426c8..309dc0bdd 100644 --- a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogResponse.java +++ b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubResponse.java @@ -13,41 +13,45 @@ * */ -package org.fisco.bcos.sdk.eventsub.filter; +package org.fisco.bcos.sdk.eventsub; import java.util.List; + +import com.fasterxml.jackson.annotation.JsonProperty; import org.fisco.bcos.sdk.model.EventLog; -public class EventLogResponse { - private int result; - private String filterID; +public class EventSubResponse { + private int status; + private String id; + + @JsonProperty("result") private List logs; @Override public String toString() { return "EventLogResponse [result=" - + result - + ", filterID=" - + filterID + + status + + ", id=" + + id + ", logs=" + logs + "]"; } - public int getResult() { - return result; + public int getStatus() { + return status; } - public void setResult(int result) { - this.result = result; + public void setStatus(int status) { + this.status = status; } - public String getFilterID() { - return filterID; + public String getId() { + return id; } - public void setFilterID(String filterID) { - this.filterID = filterID; + public void setId(String id) { + this.id = id; } public List getLogs() { diff --git a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventSubNodeRespStatus.java b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubStatus.java similarity index 86% rename from sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventSubNodeRespStatus.java rename to sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubStatus.java index 75da90faf..a97f170e0 100644 --- a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventSubNodeRespStatus.java +++ b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubStatus.java @@ -13,9 +13,9 @@ * */ -package org.fisco.bcos.sdk.eventsub.filter; +package org.fisco.bcos.sdk.eventsub; -public enum EventSubNodeRespStatus { +public enum EventSubStatus { /** When make a subscribe of an event, the node would response the status. */ SUCCESS(0), PUSH_COMPLETED(1), @@ -32,7 +32,7 @@ public enum EventSubNodeRespStatus { private int status; - private EventSubNodeRespStatus(int status) { + private EventSubStatus(int status) { this.setStatus(status); } @@ -44,20 +44,20 @@ public void setStatus(int status) { this.status = status; } - public static EventSubNodeRespStatus fromIntStatus(int status) { - for (EventSubNodeRespStatus e : EventSubNodeRespStatus.values()) { + public static EventSubStatus fromIntStatus(int status) { + for (EventSubStatus e : EventSubStatus.values()) { if (e.getStatus() == status) { return e; } } - return EventSubNodeRespStatus.OTHER_ERROR; + return EventSubStatus.OTHER_ERROR; } public static String getDescMessage(int status) { return getDescMessage(fromIntStatus(status)); } - public static String getDescMessage(EventSubNodeRespStatus status) { + public static String getDescMessage(EventSubStatus status) { String desc; diff --git a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribe.java b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribe.java index b2944073c..12d115e08 100644 --- a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribe.java +++ b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribe.java @@ -18,6 +18,7 @@ import java.util.Set; import org.fisco.bcos.sdk.client.Client; import org.fisco.bcos.sdk.config.ConfigOption; +import org.fisco.bcos.sdk.jni.common.JniException; /** * Event subscribe interface. @@ -29,10 +30,12 @@ public interface EventSubscribe { * Create Event subscribe instance * * @param group group + * @param group configOption * @return EventSubscribe Object */ - static EventSubscribe build(String group, ConfigOption configOption) { - return new EventSubscribeImp(group, configOption); + static EventSubscribe build(String group, ConfigOption configOption) throws JniException { + Client client = Client.build(group, configOption); + return new EventSubscribeImp(client, configOption); } /** @@ -41,8 +44,8 @@ static EventSubscribe build(String group, ConfigOption configOption) { * @param client Client * @return EventSubscribe Object */ - static EventSubscribe build(Client client) { - return new EventSubscribeImp(client.getGroup(), client.getConfigOption()); + static EventSubscribe build(Client client) throws JniException { + return new EventSubscribeImp(client, client.getConfigOption()); } /** @@ -52,17 +55,19 @@ static EventSubscribe build(Client client) { * @param callback the EventCallback instance * @return registerId of event */ - String subscribeEvent(EventLogParams params, EventCallback callback); + void subscribeEvent(EventSubParams params, EventSubCallback callback); /** * Unsubscribe events * - * @param id the ID of event subscribe task - * @param callback the EventCallback instance + * @param eventSubId the ID of event subscribe task */ - void unsubscribeEvent(String id, EventCallback callback); + void unsubscribeEvent(String eventSubId); - /** @return */ + /** + * get all events subscribed by clients + * @return + */ Set getAllSubscribedEvents(); /** Start */ diff --git a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java index caff0ff90..03a516b51 100644 --- a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java +++ b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java @@ -16,29 +16,118 @@ package org.fisco.bcos.sdk.eventsub; import java.util.Set; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import org.fisco.bcos.sdk.client.Client; import org.fisco.bcos.sdk.config.ConfigOption; +import org.fisco.bcos.sdk.crypto.CryptoSuite; +import org.fisco.bcos.sdk.jni.common.JniException; +import org.fisco.bcos.sdk.jni.common.Response; +import org.fisco.bcos.sdk.jni.event.EventSubscribeCallback; +import org.fisco.bcos.sdk.utils.ObjectMapperFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class EventSubscribeImp implements EventSubscribe { - public EventSubscribeImp(String group, ConfigOption configOption) { - // TODO: + + private static final Logger logger = LoggerFactory.getLogger(EventSubscribeImp.class); + + private String groupId; + private ConfigOption configOption; + private CryptoSuite cryptoSuite; + private ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper(); + private org.fisco.bcos.sdk.jni.event.EventSubscribe eventSubscribe; + + public EventSubscribeImp(Client client, ConfigOption configOption) throws JniException { + this.groupId = client.getGroup(); + this.configOption = configOption; + this.cryptoSuite = client.getCryptoSuite(); + this.eventSubscribe = org.fisco.bcos.sdk.jni.event.EventSubscribe.build(configOption.getJniConfig()); + + logger.info(" EventSub constructor, group: {}, config: {}", groupId, configOption.getJniConfig()); + } + + public CryptoSuite getCryptoSuite() { + return cryptoSuite; + } + + public void setCryptoSuite(CryptoSuite cryptoSuite) { + this.cryptoSuite = cryptoSuite; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public ConfigOption getConfigOption() { + return configOption; + } + + public void setConfigOption(ConfigOption configOption) { + this.configOption = configOption; } @Override - public String subscribeEvent(EventLogParams params, EventCallback callback) { - return ""; + public void subscribeEvent(EventSubParams params, EventSubCallback callback) { + + if (!params.checkParams()) { + callback.onReceiveLog("", EventSubStatus.INVALID_PARAMS.getStatus(), null); + return; + } + + String strParams = null; + try { + strParams = objectMapper.writeValueAsString(params); + } catch (JsonProcessingException e) { + logger.error("e: ", e); + return; + } + + logger.info("EventSub subscribeEvent, params: {}", params); + + eventSubscribe.subscribeEvent(groupId, strParams, new EventSubscribeCallback() { + @Override + public void onResponse(Response response) { + if (response.getErrorCode() != 0) { + logger.error("subscribeEvent response error, errorCode: {}, errorMessage: {}", response.getErrorCode(), response.getErrorMessage()); + callback.onReceiveLog("", response.getErrorCode(), null); + return; + } + + String strResp = new String(response.getData()); + logger.debug("subscribeEvent response, errorCode: {}, errorMessage: {}, data: {}", response.getErrorCode(), response.getErrorMessage(), strResp); + + ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper(); + try { + EventSubResponse eventSubResponse = objectMapper.readValue(strResp, EventSubResponse.class); + callback.onReceiveLog(eventSubResponse.getId(), eventSubResponse.getStatus(), eventSubResponse.getLogs()); + } catch (JsonProcessingException e) { + logger.error("subscribeEvent response parser json error, resp: {}, e: {}", strResp, e); + } + } + }); } @Override - public void unsubscribeEvent(String registerID, EventCallback callback) {} + public void unsubscribeEvent(String eventId) { + eventSubscribe.unsubscribeEvent(eventId); + } @Override public Set getAllSubscribedEvents() { + // TODO: return null; } @Override - public void start() {} + public void start() { eventSubscribe.start(); } @Override - public void stop() {} + public void stop() { eventSubscribe.stop();} } diff --git a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/exception/EventSubException.java b/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/exception/EventSubException.java deleted file mode 100644 index c0227a8ff..000000000 --- a/sdk-service/src/main/java/org/fisco/bcos/sdk/eventsub/exception/EventSubException.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2014-2020 [fisco-dev] - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * - */ - -package org.fisco.bcos.sdk.eventsub.exception; - -public class EventSubException extends Exception { - public EventSubException(String message) { - super(message); - } - - public EventSubException(Throwable cause) { - super(cause); - } - - public EventSubException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/sdk-service/src/main/java/org/fisco/bcos/sdk/service/GroupService.java b/sdk-service/src/main/java/org/fisco/bcos/sdk/service/GroupService.java deleted file mode 100644 index 856291bcd..000000000 --- a/sdk-service/src/main/java/org/fisco/bcos/sdk/service/GroupService.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Copyright 2014-2020 [fisco-dev] - * - *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.fisco.bcos.sdk.service; - -import java.math.BigInteger; -import java.util.Set; - -public interface GroupService { - /** - * Get the node information of the group - * - * @return Nodes' ip and port list - */ - Set getGroupNodesInfo(); - - /** - * remove node from the group - * - * @param nodeAddress the ip and port of the removed node - * @return if nodes in the original list that needed to be removed return True, else false. - */ - boolean removeNode(String nodeAddress); - - /** - * add nodeInfo to the group - * - * @param nodeAddress the node ip and port - * @return if nodes in the original list that needed to be inserted return True, else false. - */ - boolean insertNode(String nodeAddress); - - /** - * update the latest block number of the specified group - * - * @param peerIpAndPort the node that notify the block number info - * @param blockNumber the notified block number - */ - void updatePeersBlockNumberInfo(String peerIpAndPort, BigInteger blockNumber); - - /** - * Get latest block number of this group - * - * @return block number - */ - BigInteger getLatestBlockNumber(); - - /** - * Get node which has the latest block number - * - * @return the node - */ - String getNodeWithTheLatestBlockNumber(); - - /** - * Check the node is exit in the group - * - * @param peer ip and port - * @return whether peer exit - */ - boolean existPeer(String peer); -} diff --git a/sdk-service/src/main/java/org/fisco/bcos/sdk/service/GroupServiceImpl.java b/sdk-service/src/main/java/org/fisco/bcos/sdk/service/GroupServiceImpl.java deleted file mode 100644 index c88be7d82..000000000 --- a/sdk-service/src/main/java/org/fisco/bcos/sdk/service/GroupServiceImpl.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Copyright 2014-2020 [fisco-dev] - * - *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.fisco.bcos.sdk.service; - -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicLong; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class GroupServiceImpl implements GroupService { - - private static Logger logger = LoggerFactory.getLogger(GroupServiceImpl.class); - - private ConcurrentHashMap groupNodeToBlockNumber = - new ConcurrentHashMap<>(); - private Set groupNodeSet = new CopyOnWriteArraySet<>(); - private final String groupId; - private AtomicLong latestBlockNumber = new AtomicLong(0); - private List nodeWithLatestBlockNumber = new CopyOnWriteArrayList(); - - public GroupServiceImpl(String groupId) { - this.groupId = groupId; - } - - public GroupServiceImpl(String groupId, String groupNodeAddress) { - this.groupId = groupId; - this.groupNodeSet.add(groupNodeAddress); - logger.debug("insert group: {} for peer {}", groupId, groupNodeAddress); - } - - @Override - public Set getGroupNodesInfo() { - return this.groupNodeSet; - } - - @Override - public boolean removeNode(String nodeAddress) { - boolean shouldResetLatestBlockNumber = false; - if (this.groupNodeToBlockNumber.containsKey(nodeAddress)) { - this.groupNodeToBlockNumber.remove(nodeAddress); - shouldResetLatestBlockNumber = true; - } - if (this.nodeWithLatestBlockNumber.contains(nodeAddress)) { - this.nodeWithLatestBlockNumber.remove(nodeAddress); - shouldResetLatestBlockNumber = true; - } - if (shouldResetLatestBlockNumber) { - this.resetLatestBlockNumber(); - } - logger.debug( - "g:{}, removeNode={}, blockNumberInfoSize={}, latestBlockNumber:{}", - this.groupId, - nodeAddress, - this.groupNodeToBlockNumber.size(), - this.latestBlockNumber); - if (this.groupNodeSet.contains(nodeAddress)) { - this.groupNodeSet.remove(nodeAddress); - return true; - } - return false; - } - - @Override - public boolean insertNode(String nodeAddress) { - if (!this.groupNodeSet.contains(nodeAddress)) { - this.groupNodeSet.add(nodeAddress); - logger.debug( - "g:{}, insertNode={}, nodeSize={}, blockNumberInfoSize={}", - this.groupId, - nodeAddress, - this.groupNodeSet.size(), - this.groupNodeToBlockNumber.size()); - return true; - } - if (!this.groupNodeToBlockNumber.containsKey(nodeAddress)) { - this.groupNodeToBlockNumber.put(nodeAddress, BigInteger.valueOf(0)); - } - return false; - } - - @Override - public void updatePeersBlockNumberInfo(String peerIpAndPort, BigInteger blockNumber) { - // Note: In order to ensure that the cache information is updated in time when the node is - // restarted, the block height information of the node must be directly updated - if (!this.groupNodeToBlockNumber.containsKey(peerIpAndPort) - || !this.groupNodeToBlockNumber.get(peerIpAndPort).equals(blockNumber)) { - logger.debug( - "updatePeersBlockNumberInfo for {}, updated blockNumber: {}, groupId: {}", - peerIpAndPort, - blockNumber, - this.groupId); - this.groupNodeToBlockNumber.put(peerIpAndPort, blockNumber); - } - if (!this.groupNodeSet.contains(peerIpAndPort)) { - this.groupNodeSet.add(peerIpAndPort); - } - this.updateLatestBlockNumber(peerIpAndPort, blockNumber); - } - - private void updateLatestBlockNumber(String peerIpAndPort, BigInteger blockNumber) { - if (blockNumber.longValue() == this.latestBlockNumber.get() - && !this.nodeWithLatestBlockNumber.contains(peerIpAndPort)) { - this.nodeWithLatestBlockNumber.add(peerIpAndPort); - } - if (blockNumber.longValue() > this.latestBlockNumber.get()) { - this.latestBlockNumber.getAndSet(blockNumber.longValue()); - this.nodeWithLatestBlockNumber.clear(); - this.nodeWithLatestBlockNumber.add(peerIpAndPort); - } - logger.debug( - "g:{}, updateLatestBlockNumber, latestBlockNumber: {}, nodeWithLatestBlockNumber:{}", - this.groupId, - this.latestBlockNumber.get(), - this.nodeWithLatestBlockNumber.toString()); - } - - private void resetLatestBlockNumber() { - BigInteger maxBlockNumber = null; - if (this.groupNodeToBlockNumber.size() == 0) { - this.latestBlockNumber.getAndSet(BigInteger.ZERO.longValue()); - return; - } - for (String groupNode : this.groupNodeToBlockNumber.keySet()) { - BigInteger blockNumber = this.groupNodeToBlockNumber.get(groupNode); - if (blockNumber == null) { - continue; - } - if (maxBlockNumber == null || blockNumber.compareTo(maxBlockNumber) > 0) { - maxBlockNumber = blockNumber; - } - } - - if (maxBlockNumber == null) { - return; - } - this.latestBlockNumber.getAndSet(maxBlockNumber.longValue()); - this.nodeWithLatestBlockNumber.clear(); - for (String groupNode : this.groupNodeToBlockNumber.keySet()) { - BigInteger blockNumber = this.groupNodeToBlockNumber.get(groupNode); - if (this.latestBlockNumber.equals(blockNumber)) { - this.nodeWithLatestBlockNumber.add(groupNode); - } - } - logger.debug( - "g:{}, resetLatestBlockNumber, latestBlockNumber: {}, nodeWithLatestBlockNumber:{}, maxBlockNumber: {}", - this.groupId, - this.latestBlockNumber.get(), - this.nodeWithLatestBlockNumber.toString(), - maxBlockNumber); - } - - @Override - public BigInteger getLatestBlockNumber() { - return BigInteger.valueOf(this.latestBlockNumber.get()); - } - - @Override - public String getNodeWithTheLatestBlockNumber() { - try { - // in case of nodeWithLatestBlockNumber modified - final List tmpNodeWithLatestBlockNumber = - new ArrayList<>(this.nodeWithLatestBlockNumber); - // the case that the sdk is allowed to access all the connected node, select the first - // connected node to send the request - if (tmpNodeWithLatestBlockNumber.size() > 0) { - // Note: when the nodeWithLatestBlockNumber modified, and the random value - // calculated after the modification, and the nodeWithLatestBlockNumber.get is - // called after the modification, this function will throw - // ArrayIndexOutOfBoundsException - int random = (int) (Math.random() * (tmpNodeWithLatestBlockNumber.size())); - return tmpNodeWithLatestBlockNumber.get(random); - } - } catch (Exception e) { - logger.warn( - "getNodeWithTheLatestBlockNumber failed for {}, select the node to send message randomly", - e); - } - // select the first element - if (!this.groupNodeSet.isEmpty()) { - return this.groupNodeSet.iterator().next(); - } - return null; - } - - @Override - public boolean existPeer(String peer) { - return this.groupNodeSet.contains(peer); - } -}