From e9211c359e9beb942320f46482fa2167e876e11b Mon Sep 17 00:00:00 2001 From: maggie Date: Thu, 23 Jul 2020 17:29:01 +0800 Subject: [PATCH 1/2] 1. add event subscribe module and implement subscribe function 2. rename utils --- .../sdk/crypto/keypair/CryptoKeyPair.java | 4 +- .../bcos/sdk/eventsub/EventCallback.java | 31 ++- .../bcos/sdk/eventsub/EventLogParams.java | 84 +++++- .../bcos/sdk/eventsub/EventSubscribe.java | 13 +- .../bcos/sdk/eventsub/EventSubscribeImp.java | 180 ++++++++++++ .../EventSubException.java} | 16 +- .../sdk/eventsub/filter/EventLogFilter.java | 126 +++++++++ .../eventsub/filter/EventLogFilterStatus.java | 37 +++ .../sdk/eventsub/filter/EventLogResponse.java | 60 ++++ .../eventsub/filter/EventPushMsgHandler.java | 87 ++++++ .../filter/EventSubNodeRespStatus.java | 101 +++++++ .../sdk/eventsub/filter/FilterManager.java | 84 ++++++ .../org/fisco/bcos/sdk/model/EventLog.java | 256 ++++++++++++++++++ .../org/fisco/bcos/sdk/model/MsgType.java | 14 +- .../impl/executor/TransactionDecoder.java | 2 +- .../executor/TransactionDecoderInterface.java | 2 +- .../EventLog.java => utils/AddressUtils.java} | 13 +- .../utils/{ByteUtil.java => ByteUtils.java} | 112 ++++++-- .../java/org/fisco/bcos/sdk/utils/Hex.java | 2 +- .../org/fisco/bcos/sdk/utils/Numeric.java | 6 +- .../utils/{Strings.java => StringUtils.java} | 4 +- 21 files changed, 1184 insertions(+), 50 deletions(-) create mode 100644 src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java rename src/main/java/org/fisco/bcos/sdk/eventsub/{EventLogFilter.java => exception/EventSubException.java} (62%) create mode 100644 src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogFilter.java create mode 100644 src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogFilterStatus.java create mode 100644 src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogResponse.java create mode 100644 src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventPushMsgHandler.java create mode 100644 src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventSubNodeRespStatus.java create mode 100644 src/main/java/org/fisco/bcos/sdk/eventsub/filter/FilterManager.java create mode 100644 src/main/java/org/fisco/bcos/sdk/model/EventLog.java rename src/main/java/org/fisco/bcos/sdk/{transaction/domain/EventLog.java => utils/AddressUtils.java} (60%) rename src/main/java/org/fisco/bcos/sdk/utils/{ByteUtil.java => ByteUtils.java} (89%) rename src/main/java/org/fisco/bcos/sdk/utils/{Strings.java => StringUtils.java} (99%) diff --git a/src/main/java/org/fisco/bcos/sdk/crypto/keypair/CryptoKeyPair.java b/src/main/java/org/fisco/bcos/sdk/crypto/keypair/CryptoKeyPair.java index 7e32f44d0..9ef159199 100644 --- a/src/main/java/org/fisco/bcos/sdk/crypto/keypair/CryptoKeyPair.java +++ b/src/main/java/org/fisco/bcos/sdk/crypto/keypair/CryptoKeyPair.java @@ -24,7 +24,7 @@ import org.fisco.bcos.sdk.crypto.hash.Hash; import org.fisco.bcos.sdk.utils.Hex; import org.fisco.bcos.sdk.utils.Numeric; -import org.fisco.bcos.sdk.utils.Strings; +import org.fisco.bcos.sdk.utils.StringUtils; import org.fisco.bcos.sdk.utils.exceptions.DecoderException; public abstract class CryptoKeyPair { @@ -146,7 +146,7 @@ protected String getPublicKeyNoPrefix(String publicKeyStr) { // Hexadecimal public key length is less than 128, add 0 in front if (publicKeyNoPrefix.length() < PUBLIC_KEY_LENGTH_IN_HEX) { publicKeyNoPrefix = - Strings.zeros(PUBLIC_KEY_LENGTH_IN_HEX - publicKeyNoPrefix.length()) + StringUtils.zeros(PUBLIC_KEY_LENGTH_IN_HEX - publicKeyNoPrefix.length()) + publicKeyNoPrefix; } return publicKeyNoPrefix; diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/EventCallback.java b/src/main/java/org/fisco/bcos/sdk/eventsub/EventCallback.java index a85724c8b..45bddcc8b 100644 --- a/src/main/java/org/fisco/bcos/sdk/eventsub/EventCallback.java +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/EventCallback.java @@ -15,4 +15,33 @@ package org.fisco.bcos.sdk.eventsub; -public abstract class EventCallback {} +import java.math.BigInteger; +import java.util.List; +import org.fisco.bcos.sdk.model.EventLog; + +public abstract class EventCallback { + private BigInteger lastBlockNumber = null; + private long logCount = 0; + + public BigInteger getLastBlockNumber() { + return lastBlockNumber; + } + + public void updateCountsAndLatestBlock(List logs) { + if (logs.isEmpty()) { + return; + } + EventLog latestOne = logs.get(logs.size() - 1); + if (lastBlockNumber == null) { + lastBlockNumber = latestOne.getBlockNumber(); + logCount += logs.size(); + } else { + if (latestOne.getBlockNumber().compareTo(lastBlockNumber) > 0) { + lastBlockNumber = latestOne.getBlockNumber(); + logCount += logs.size(); + } + } + } + + public abstract void onReceiveLog(int status, List logs); +} diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/EventLogParams.java b/src/main/java/org/fisco/bcos/sdk/eventsub/EventLogParams.java index 2acd27f8c..acbaf8478 100644 --- a/src/main/java/org/fisco/bcos/sdk/eventsub/EventLogParams.java +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/EventLogParams.java @@ -15,4 +15,86 @@ package org.fisco.bcos.sdk.eventsub; -public class EventLogParams {} +import java.util.List; +import org.fisco.bcos.sdk.utils.AddressUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventLogParams { + private static Logger logger = LoggerFactory.getLogger(EventLogParams.class); + private String fromBlock; + private String toBlock; + private List addresses; + private List topics; + + public static Logger getLogger() { + return logger; + } + + public static void setLogger(Logger logger) { + EventLogParams.logger = logger; + } + + public String getFromBlock() { + return fromBlock; + } + + public void setFromBlock(String fromBlock) { + this.fromBlock = fromBlock; + } + + public String getToBlock() { + return toBlock; + } + + public void setToBlock(String toBlock) { + this.toBlock = toBlock; + } + + public List getAddresses() { + return addresses; + } + + public void setAddresses(List addresses) { + this.addresses = addresses; + } + + public List getTopics() { + return topics; + } + + public void setTopics(List topics) { + this.topics = topics; + } + + @Override + public String toString() { + return "EventLogFilterParams [fromBlock=" + + fromBlock + + ", toBlock=" + + toBlock + + ", addresses=" + + addresses + + ", topics=" + + topics + + "]"; + } + + public boolean validAddresses() { + if (getAddresses() == null) { + return false; + } + for (String address : getAddresses()) { + // check if address valid + if (!AddressUtils.isValidAddress(address)) { + return false; + } + } + return true; + } + + public boolean valid() { + // todo add other verifications + return validAddresses(); + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribe.java b/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribe.java index 5ac460ea4..4d220fdd8 100644 --- a/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribe.java +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribe.java @@ -16,7 +16,9 @@ package org.fisco.bcos.sdk.eventsub; import java.util.List; +import java.util.UUID; import org.fisco.bcos.sdk.channel.Channel; +import org.fisco.bcos.sdk.eventsub.filter.EventLogFilter; /** * Event subscribe interface. @@ -32,7 +34,12 @@ public interface EventSubscribe { * @return EventSubscribe Object */ static EventSubscribe build(Channel ch, String groupId) { - return null; + return new EventSubscribeImp(ch, groupId); + } + + static String newSeq() { + String seq = UUID.randomUUID().toString().replaceAll("-", ""); + return seq; } /** @@ -46,9 +53,9 @@ static EventSubscribe build(Channel ch, String groupId) { /** * Unsubscribe events * - * @param filterId + * @param filterID */ - void unsubscribeEvent(String filterId); + void unsubscribeEvent(String filterID); /** * Get all subscribed event. diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java b/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java new file mode 100644 index 000000000..e079492e4 --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java @@ -0,0 +1,180 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.eventsub; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.List; +import org.fisco.bcos.sdk.channel.Channel; +import org.fisco.bcos.sdk.channel.ResponseCallback; +import org.fisco.bcos.sdk.eventsub.filter.EventLogFilter; +import org.fisco.bcos.sdk.eventsub.filter.EventLogFilterStatus; +import org.fisco.bcos.sdk.eventsub.filter.EventLogResponse; +import org.fisco.bcos.sdk.eventsub.filter.EventPushMsgHandler; +import org.fisco.bcos.sdk.eventsub.filter.EventSubNodeRespStatus; +import org.fisco.bcos.sdk.eventsub.filter.FilterManager; +import org.fisco.bcos.sdk.model.Message; +import org.fisco.bcos.sdk.model.MsgType; +import org.fisco.bcos.sdk.model.Response; +import org.fisco.bcos.sdk.utils.ObjectMapperFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventSubscribeImp implements EventSubscribe { + private static final Logger logger = LoggerFactory.getLogger(EventSubscribeImp.class); + private Channel ch; + private String groupId; + private FilterManager filterManager; + private EventPushMsgHandler msgHander; + private boolean running = false; + + public EventSubscribeImp(Channel ch, String groupId) { + this.ch = ch; + this.groupId = groupId; + filterManager = new FilterManager(); + msgHander = new EventPushMsgHandler(filterManager); + ch.addMessageHandler(MsgType.EVENT_LOG_PUSH, msgHander); + } + + @Override + public void subscribeEvent(EventLogParams params, EventCallback callback) { + if (!params.valid()) { + callback.onReceiveLog(EventSubNodeRespStatus.INVALID_PARAMS.getStatus(), null); + return; + } + EventLogFilter filter = new EventLogFilter(); + filter.setRegisterID(EventSubscribe.newSeq()); + filter.setParams(params); + filter.setCallback(callback); + filterManager.addFilter(filter); + sendFilter(filter); + } + + @Override + public void unsubscribeEvent(String filterID) { + // Todo need node support + } + + @Override + public List getAllSubscribedEvent() { + return filterManager.getAllSubscribedEvent(); + } + + @Override + public void start() { + if (running) { + return; + } + + running = true; + } + + @Override + public void stop() { + // todo + } + + private void sendFilter(EventLogFilter filter) { + Message msg = new Message(); + msg.setSeq(EventSubscribe.newSeq()); + msg.setType((short) MsgType.CLIENT_REGISTER_EVENT_LOG.ordinal()); + msg.setResult(0); + try { + String content = filter.getNewParamJsonString(groupId); + msg.setData(content.getBytes()); + } catch (JsonProcessingException e) { + logger.error( + "send filter error, registerID: {},filterID : {}, error: {}", + filter.getRegisterID(), + filter.getFilterID(), + e.getMessage()); + logger.error( + "remove bad filter , registerID: {},filterID : {}", + filter.getRegisterID(), + filter.getFilterID()); + filterManager.removeFilter(filter.getRegisterID()); + } + + filterManager.addCallback(filter.getFilterID(), filter.getCallback()); + + // Todo check send to group function. this function may not in channel module. + ch.asyncSendToGroup( + msg, + groupId, + new RegisterEventSubRespCallback( + filterManager, filter, filter.getFilterID(), filter.getRegisterID())); + } + + class RegisterEventSubRespCallback extends ResponseCallback { + FilterManager filterManager; + EventLogFilter filter; + String filterID; + String registerID; + + public RegisterEventSubRespCallback( + FilterManager filterManager, + EventLogFilter filter, + String filterID, + String registerID) { + this.filterManager = filterManager; + this.filter = filter; + this.filterID = filterID; + this.registerID = registerID; + } + + @Override + public void onResponse(Response response) { + logger.info( + " event filter callback response, registerID: {}, filterID: {}, seq: {}, error code: {}, content: {}", + registerID, + filterID, + response.getMessageID(), + response.getErrorCode(), + response.getContent()); + try { + if (0 == response.getErrorCode()) { + // todo add success logic + EventLogResponse resp = + ObjectMapperFactory.getObjectMapper() + .readValue(response.getContent(), EventLogResponse.class); + if (resp.getResult() == 0) { + // node give an "OK" response, event log will be pushed soon + filterManager.updateFilterStatus( + filter, EventLogFilterStatus.EVENT_LOG_PUSHING, null); + } else { + // node give a bad response, will not push event log, trigger callback + filter.getCallback().onReceiveLog(resp.getResult(), null); + filterManager.removeFilter(registerID); + filterManager.removeCallback(filterID); + } + } else { + filterManager.updateFilterStatus( + filter, EventLogFilterStatus.WAITING_REQUEST, null); + filterManager.removeCallback(filterID); + } + } catch (Exception e) { + logger.error( + " event filter response message exception, filterID: {}, registerID: {}, exception message: {}", + filterID, + registerID, + e.getMessage()); + filter.getCallback() + .onReceiveLog(EventSubNodeRespStatus.OTHER_ERROR.getStatus(), null); + filterManager.removeFilter(registerID); + filterManager.removeCallback(filterID); + } + } + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/EventLogFilter.java b/src/main/java/org/fisco/bcos/sdk/eventsub/exception/EventSubException.java similarity index 62% rename from src/main/java/org/fisco/bcos/sdk/eventsub/EventLogFilter.java rename to src/main/java/org/fisco/bcos/sdk/eventsub/exception/EventSubException.java index 350f0e6ed..c0227a8ff 100644 --- a/src/main/java/org/fisco/bcos/sdk/eventsub/EventLogFilter.java +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/exception/EventSubException.java @@ -13,6 +13,18 @@ * */ -package org.fisco.bcos.sdk.eventsub; +package org.fisco.bcos.sdk.eventsub.exception; -public class EventLogFilter {} +public class EventSubException extends Exception { + public EventSubException(String message) { + super(message); + } + + public EventSubException(Throwable cause) { + super(cause); + } + + public EventSubException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogFilter.java b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogFilter.java new file mode 100644 index 000000000..b8123921c --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogFilter.java @@ -0,0 +1,126 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.eventsub.filter; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.netty.channel.ChannelHandlerContext; +import org.fisco.bcos.sdk.eventsub.EventCallback; +import org.fisco.bcos.sdk.eventsub.EventLogParams; +import org.fisco.bcos.sdk.eventsub.EventSubscribe; +import org.fisco.bcos.sdk.utils.ObjectMapperFactory; + +/** An event log filter is a subscription. */ +public class EventLogFilter { + private String registerID; + private String filterID; + private EventLogParams params; + private EventCallback callback; + private EventLogFilterStatus status = EventLogFilterStatus.WAITING_REQUEST; + private ChannelHandlerContext ctx = null; + + public String getNewParamJsonString(String groupId) throws JsonProcessingException { + String newFilterId = EventSubscribe.newSeq(); + EventLogRequestParams requestParams = + new EventLogRequestParams(generateNewParams(), groupId, newFilterId); + String content = ObjectMapperFactory.getObjectMapper().writeValueAsString(requestParams); + filterID = newFilterId; + return content; + } + + public class EventLogRequestParams extends EventLogParams { + private String groupID; + private String filterID; + private int timeout = 0; + + public EventLogRequestParams(EventLogParams params, String groupID, String filterID) { + this.setFromBlock(params.getFromBlock()); + this.setToBlock(params.getToBlock()); + this.setAddresses(params.getAddresses()); + this.setTopics(params.getTopics()); + this.setGroupID(groupID); + this.setFilterID(filterID); + } + + public void setGroupID(String groupID) { + this.groupID = groupID; + } + + public void setFilterID(String filterID) { + this.filterID = filterID; + } + } + + private EventLogParams generateNewParams() { + EventLogParams params = new EventLogParams(); + params.setToBlock(getParams().getToBlock()); + params.setAddresses(getParams().getAddresses()); + params.setTopics(getParams().getTopics()); + if (callback.getLastBlockNumber() == null) { + params.setFromBlock(params.getFromBlock()); + } else { + params.setFromBlock(callback.getLastBlockNumber().toString()); + } + return params; + } + + public String getRegisterID() { + return registerID; + } + + public void setRegisterID(String registerID) { + this.registerID = registerID; + } + + public EventLogParams getParams() { + return params; + } + + public void setParams(EventLogParams params) { + this.params = params; + } + + public EventLogFilterStatus getStatus() { + return status; + } + + public void setStatus(EventLogFilterStatus status) { + this.status = status; + } + + public EventCallback getCallback() { + return callback; + } + + public void setCallback(EventCallback callback) { + this.callback = callback; + } + + public String getFilterID() { + return filterID; + } + + public void setFilterID(String filterID) { + this.filterID = filterID; + } + + public ChannelHandlerContext getCtx() { + return ctx; + } + + public void setCtx(ChannelHandlerContext ctx) { + this.ctx = ctx; + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogFilterStatus.java b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogFilterStatus.java new file mode 100644 index 000000000..661b89242 --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogFilterStatus.java @@ -0,0 +1,37 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.eventsub.filter; + +public enum EventLogFilterStatus { + // event log is pushing from node normally + EVENT_LOG_PUSHING(0x1), + // request already send, wait for response + WAITING_RESPONSE(0x2), + // response not ok, wait for resend + WAITING_REQUEST(0x3); + + private int status; + + private EventLogFilterStatus(int i) {} + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogResponse.java b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogResponse.java new file mode 100644 index 000000000..f211426c8 --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventLogResponse.java @@ -0,0 +1,60 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.eventsub.filter; + +import java.util.List; +import org.fisco.bcos.sdk.model.EventLog; + +public class EventLogResponse { + private int result; + private String filterID; + private List logs; + + @Override + public String toString() { + return "EventLogResponse [result=" + + result + + ", filterID=" + + filterID + + ", logs=" + + logs + + "]"; + } + + public int getResult() { + return result; + } + + public void setResult(int result) { + this.result = result; + } + + public String getFilterID() { + return filterID; + } + + public void setFilterID(String filterID) { + this.filterID = filterID; + } + + public List getLogs() { + return logs; + } + + public void setLogs(List logs) { + this.logs = logs; + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventPushMsgHandler.java b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventPushMsgHandler.java new file mode 100644 index 000000000..752e08661 --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventPushMsgHandler.java @@ -0,0 +1,87 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.eventsub.filter; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.netty.channel.ChannelHandlerContext; +import java.util.List; +import org.fisco.bcos.sdk.eventsub.EventCallback; +import org.fisco.bcos.sdk.model.EventLog; +import org.fisco.bcos.sdk.model.Message; +import org.fisco.bcos.sdk.network.MsgHandler; +import org.fisco.bcos.sdk.utils.ObjectMapperFactory; +import org.fisco.bcos.sdk.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** EventPushMsgHandler is the type of EVENT_LOG_PUSH message handler. */ +public class EventPushMsgHandler implements MsgHandler { + private static final Logger logger = LoggerFactory.getLogger(EventPushMsgHandler.class); + FilterManager filterManager; + + public EventPushMsgHandler(FilterManager filterManager) { + this.filterManager = filterManager; + } + + @Override + public void onConnect(ChannelHandlerContext ctx) { + logger.warn("onConnect accidentally called"); + } + + @Override + public void onMessage(ChannelHandlerContext ctx, Message msg) { + String content = new String(msg.getData()); + try { + EventLogResponse resp = + ObjectMapperFactory.getObjectMapper() + .readValue(content, EventLogResponse.class); + if (resp == null || StringUtils.isEmpty(resp.getFilterID())) { + logger.error(" event log response invalid format, content: {}", content); + return; + } + + EventCallback callback = filterManager.getCallBack(resp.getFilterID()); + + if (callback == null) { + logger.debug( + " event log push message cannot find callback, filterID: {}, content: {}", + resp.getFilterID(), + content); + return; + } + + if (resp.getResult() == EventSubNodeRespStatus.SUCCESS.getStatus()) { + List logs = resp.getLogs(); + if (!logs.isEmpty()) { + callback.onReceiveLog(resp.getResult(), logs); + // update status + callback.updateCountsAndLatestBlock(logs); + logger.info( + " log size: {}, blocknumber: {}", + logs.size(), + logs.get(0).getBlockNumber()); + } + } + } catch (JsonProcessingException e) { + // todo handle exception + } + } + + @Override + public void onDisconnect(ChannelHandlerContext ctx) { + logger.warn("onDisconnect accidentally called"); + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventSubNodeRespStatus.java b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventSubNodeRespStatus.java new file mode 100644 index 000000000..75da90faf --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventSubNodeRespStatus.java @@ -0,0 +1,101 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.eventsub.filter; + +public enum EventSubNodeRespStatus { + /** When make a subscribe of an event, the node would response the status. */ + SUCCESS(0), + PUSH_COMPLETED(1), + INVALID_PARAMS(-41000), + INVALID_REQUEST(-41001), + GROUP_NOT_EXIST(-41002), + INVALID_RANGE(-41003), + INVALID_RESPONSE(-41004), + REQUEST_TIMEOUT(-41005), + SDK_PERMISSION_DENIED(-41006), + // reserve 100 errors + OTHER_ERROR(42000), + ; + + private int status; + + private EventSubNodeRespStatus(int status) { + this.setStatus(status); + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public static EventSubNodeRespStatus fromIntStatus(int status) { + for (EventSubNodeRespStatus e : EventSubNodeRespStatus.values()) { + if (e.getStatus() == status) { + return e; + } + } + return EventSubNodeRespStatus.OTHER_ERROR; + } + + public static String getDescMessage(int status) { + return getDescMessage(fromIntStatus(status)); + } + + public static String getDescMessage(EventSubNodeRespStatus status) { + + String desc; + + switch (status) { + case SUCCESS: + desc = "success"; + break; + case PUSH_COMPLETED: + desc = "push completed"; + break; + case INVALID_PARAMS: + desc = "params invalid"; + break; + case INVALID_REQUEST: + desc = "register request not valid format"; + break; + case REQUEST_TIMEOUT: + desc = "register request timeout"; + break; + case GROUP_NOT_EXIST: + desc = "group not exist"; + break; + case INVALID_RANGE: + desc = "register parameters not in a range within permision"; + break; + case INVALID_RESPONSE: + desc = "response message not invalid format"; + break; + case SDK_PERMISSION_DENIED: + desc = "the SDK is not allowed to access this group."; + break; + default: + { + desc = "other errors"; + break; + } + } + + return desc; + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/filter/FilterManager.java b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/FilterManager.java new file mode 100644 index 000000000..2e840375a --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/FilterManager.java @@ -0,0 +1,84 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.eventsub.filter; + +import io.netty.channel.ChannelHandlerContext; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.fisco.bcos.sdk.eventsub.EventCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Filter manager is to maintain a EventLogFilter list, as well as a EventCallback list. Include + * add, remove, update operations to the two list and filters. + */ +public class FilterManager { + private static final Logger logger = LoggerFactory.getLogger(FilterManager.class); + + /** + * Register id of a filter is generated when register a subscribe. Filter id of a filter is + * generated when send one subscribe request to node. + */ + private Map regId2Filter = + new ConcurrentHashMap(); + + private Map filterID2Callback = + new ConcurrentHashMap(); + + public List getAllSubscribedEvent() { + List list = new ArrayList<>(); + regId2Filter.forEach( + (regId, filter) -> { + list.add(filter); + }); + return list; + } + + public void addFilter(EventLogFilter filter) { + regId2Filter.put(filter.getRegisterID(), filter); + logger.info( + "add event log filter , registerID: {}, filter: {}", + filter.getRegisterID(), + filter); + } + + public void removeFilter(String registerId) { + logger.info("remove filter"); + } + + public void addCallback(String filterID, EventCallback callback) { + filterID2Callback.put(filterID, callback); + } + + public void removeCallback(String filterID) { + filterID2Callback.remove(filterID); + } + + public void updateFilterStatus( + EventLogFilter filter, EventLogFilterStatus status, ChannelHandlerContext ctx) { + synchronized (this) { + filter.setStatus(status); + filter.setCtx(ctx); + } + } + + public EventCallback getCallBack(String filterID) { + return filterID2Callback.get(filterID); + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/model/EventLog.java b/src/main/java/org/fisco/bcos/sdk/model/EventLog.java new file mode 100644 index 000000000..ffb7e718c --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/model/EventLog.java @@ -0,0 +1,256 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ +package org.fisco.bcos.sdk.model; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.math.BigInteger; +import java.util.List; +import org.fisco.bcos.sdk.utils.Numeric; + +public class EventLog { + private boolean removed; + private String logIndex; + private String transactionIndex; + private String transactionHash; + private String blockHash; + private String blockNumber; + private String address; + private String data; + private String type; + private List topics; + + public EventLog() {} + + public EventLog( + boolean removed, + String logIndex, + String transactionIndex, + String transactionHash, + String blockHash, + String blockNumber, + String address, + String data, + String type, + List topics) { + this.removed = removed; + this.logIndex = logIndex; + this.transactionIndex = transactionIndex; + this.transactionHash = transactionHash; + this.blockHash = blockHash; + this.blockNumber = blockNumber; + this.address = address; + this.data = data; + this.type = type; + this.topics = topics; + } + + @JsonIgnore + public boolean isRemoved() { + return removed; + } + + public void setRemoved(boolean removed) { + this.removed = removed; + } + + public BigInteger getLogIndex() { + return convert(logIndex); + } + + @JsonIgnore + public String getLogIndexRaw() { + return logIndex; + } + + public void setLogIndex(String logIndex) { + this.logIndex = logIndex; + } + + public BigInteger getTransactionIndex() { + return convert(transactionIndex); + } + + @JsonIgnore + public String getTransactionIndexRaw() { + return transactionIndex; + } + + public void setTransactionIndex(String transactionIndex) { + this.transactionIndex = transactionIndex; + } + + public String getTransactionHash() { + return transactionHash; + } + + public void setTransactionHash(String transactionHash) { + this.transactionHash = transactionHash; + } + + public String getBlockHash() { + return blockHash; + } + + public void setBlockHash(String blockHash) { + this.blockHash = blockHash; + } + + public BigInteger getBlockNumber() { + return convert(blockNumber); + } + + @JsonIgnore + public String getBlockNumberRaw() { + return blockNumber; + } + + public void setBlockNumber(String blockNumber) { + this.blockNumber = blockNumber; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public String getData() { + return data; + } + + public void setData(String data) { + this.data = data; + } + + @JsonIgnore + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public List getTopics() { + return topics; + } + + public void setTopics(List topics) { + this.topics = topics; + } + + private BigInteger convert(String value) { + if (value != null) { + return Numeric.decodeQuantity(value); + } else { + return null; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof EventLog)) { + return false; + } + + EventLog log = (EventLog) o; + + if (isRemoved() != log.isRemoved()) { + return false; + } + if (getLogIndexRaw() != null + ? !getLogIndexRaw().equals(log.getLogIndexRaw()) + : log.getLogIndexRaw() != null) { + return false; + } + if (getTransactionIndexRaw() != null + ? !getTransactionIndexRaw().equals(log.getTransactionIndexRaw()) + : log.getTransactionIndexRaw() != null) { + return false; + } + if (getTransactionHash() != null + ? !getTransactionHash().equals(log.getTransactionHash()) + : log.getTransactionHash() != null) { + return false; + } + if (getBlockHash() != null + ? !getBlockHash().equals(log.getBlockHash()) + : log.getBlockHash() != null) { + return false; + } + if (getBlockNumberRaw() != null + ? !getBlockNumberRaw().equals(log.getBlockNumberRaw()) + : log.getBlockNumberRaw() != null) { + return false; + } + if (getAddress() != null + ? !getAddress().equals(log.getAddress()) + : log.getAddress() != null) { + return false; + } + if (getData() != null ? !getData().equals(log.getData()) : log.getData() != null) { + return false; + } + if (getType() != null ? !getType().equals(log.getType()) : log.getType() != null) { + return false; + } + return getTopics() != null ? getTopics().equals(log.getTopics()) : log.getTopics() == null; + } + + @Override + public int hashCode() { + int result = (isRemoved() ? 1 : 0); + result = 31 * result + (getLogIndexRaw() != null ? getLogIndexRaw().hashCode() : 0); + result = + 31 * result + + (getTransactionIndexRaw() != null + ? getTransactionIndexRaw().hashCode() + : 0); + result = 31 * result + (getTransactionHash() != null ? getTransactionHash().hashCode() : 0); + result = 31 * result + (getBlockHash() != null ? getBlockHash().hashCode() : 0); + result = 31 * result + (getBlockNumberRaw() != null ? getBlockNumberRaw().hashCode() : 0); + result = 31 * result + (getAddress() != null ? getAddress().hashCode() : 0); + result = 31 * result + (getData() != null ? getData().hashCode() : 0); + result = 31 * result + (getType() != null ? getType().hashCode() : 0); + result = 31 * result + (getTopics() != null ? getTopics().hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "Log [logIndex=" + + logIndex + + ", transactionIndex=" + + transactionIndex + + ", transactionHash=" + + transactionHash + + ", blockHash=" + + blockHash + + ", blockNumber=" + + blockNumber + + ", address=" + + address + + ", data=" + + data + + ", topics=" + + topics + + "]"; + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/model/MsgType.java b/src/main/java/org/fisco/bcos/sdk/model/MsgType.java index e6e3c62ff..fcdbcd151 100644 --- a/src/main/java/org/fisco/bcos/sdk/model/MsgType.java +++ b/src/main/java/org/fisco/bcos/sdk/model/MsgType.java @@ -54,5 +54,17 @@ public enum MsgType { REQUEST_TOPICCERT(0x37), UPDATE_TOPIICSTATUS(0x38); - MsgType(int i) {} + private int type; + + private MsgType(int type) { + this.setType(type); + } + + public int getType() { + return type; + } + + public void setType(int type) { + this.type = type; + } } diff --git a/src/main/java/org/fisco/bcos/sdk/transaction/core/impl/executor/TransactionDecoder.java b/src/main/java/org/fisco/bcos/sdk/transaction/core/impl/executor/TransactionDecoder.java index 4298b5c0c..95d8253d2 100644 --- a/src/main/java/org/fisco/bcos/sdk/transaction/core/impl/executor/TransactionDecoder.java +++ b/src/main/java/org/fisco/bcos/sdk/transaction/core/impl/executor/TransactionDecoder.java @@ -16,8 +16,8 @@ import java.util.List; import java.util.Map; +import org.fisco.bcos.sdk.model.EventLog; import org.fisco.bcos.sdk.transaction.core.interf.executor.TransactionDecoderInterface; -import org.fisco.bcos.sdk.transaction.domain.EventLog; import org.fisco.bcos.sdk.transaction.domain.EventResultEntity; import org.fisco.bcos.sdk.transaction.domain.InputAndOutputResult; import org.fisco.bcos.sdk.transaction.domain.RawTransaction; diff --git a/src/main/java/org/fisco/bcos/sdk/transaction/core/interf/executor/TransactionDecoderInterface.java b/src/main/java/org/fisco/bcos/sdk/transaction/core/interf/executor/TransactionDecoderInterface.java index 2ee6542ec..60cce3538 100644 --- a/src/main/java/org/fisco/bcos/sdk/transaction/core/interf/executor/TransactionDecoderInterface.java +++ b/src/main/java/org/fisco/bcos/sdk/transaction/core/interf/executor/TransactionDecoderInterface.java @@ -16,7 +16,7 @@ import java.util.List; import java.util.Map; -import org.fisco.bcos.sdk.transaction.domain.EventLog; +import org.fisco.bcos.sdk.model.EventLog; import org.fisco.bcos.sdk.transaction.domain.EventResultEntity; import org.fisco.bcos.sdk.transaction.domain.InputAndOutputResult; import org.fisco.bcos.sdk.transaction.domain.RawTransaction; diff --git a/src/main/java/org/fisco/bcos/sdk/transaction/domain/EventLog.java b/src/main/java/org/fisco/bcos/sdk/utils/AddressUtils.java similarity index 60% rename from src/main/java/org/fisco/bcos/sdk/transaction/domain/EventLog.java rename to src/main/java/org/fisco/bcos/sdk/utils/AddressUtils.java index 59b5b81ae..7386f2993 100644 --- a/src/main/java/org/fisco/bcos/sdk/transaction/domain/EventLog.java +++ b/src/main/java/org/fisco/bcos/sdk/utils/AddressUtils.java @@ -12,6 +12,15 @@ * the License. * */ -package org.fisco.bcos.sdk.transaction.domain; -public class EventLog {} +package org.fisco.bcos.sdk.utils; + +public class AddressUtils { + public static final int ADDRESS_SIZE = 160; + public static final int ADDRESS_LENGTH_IN_HEX = ADDRESS_SIZE >> 2; + + public static boolean isValidAddress(String address) { + String addressNoPrefix = Numeric.cleanHexPrefix(address); + return addressNoPrefix.length() == ADDRESS_LENGTH_IN_HEX; + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/utils/ByteUtil.java b/src/main/java/org/fisco/bcos/sdk/utils/ByteUtils.java similarity index 89% rename from src/main/java/org/fisco/bcos/sdk/utils/ByteUtil.java rename to src/main/java/org/fisco/bcos/sdk/utils/ByteUtils.java index 627538519..79f24a922 100644 --- a/src/main/java/org/fisco/bcos/sdk/utils/ByteUtil.java +++ b/src/main/java/org/fisco/bcos/sdk/utils/ByteUtils.java @@ -23,7 +23,7 @@ import java.util.HashSet; import java.util.Set; -public class ByteUtil { +public class ByteUtils { public static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; public static final byte[] ZERO_BYTE_ARRAY = new byte[] {0}; @@ -44,7 +44,9 @@ public static byte[] appendByte(byte[] bytes, byte b) { * @return numBytes byte long array. */ public static byte[] bigIntegerToBytes(BigInteger b, int numBytes) { - if (b == null) return null; + if (b == null) { + return null; + } byte[] bytes = new byte[numBytes]; byte[] biBytes = b.toByteArray(); int start = (biBytes.length == numBytes + 1) ? 1 : 0; @@ -54,7 +56,9 @@ public static byte[] bigIntegerToBytes(BigInteger b, int numBytes) { } public static byte[] bigIntegerToBytes(BigInteger value) { - if (value == null) return null; + if (value == null) { + return null; + } byte[] data = value.toByteArray(); @@ -67,7 +71,9 @@ public static byte[] bigIntegerToBytes(BigInteger value) { } public static byte[] bigIntegerToBytesSigned(BigInteger b, int numBytes) { - if (b == null) return null; + if (b == null) { + return null; + } byte[] bytes = new byte[numBytes]; Arrays.fill(bytes, b.signum() < 0 ? (byte) 0xFF : 0x00); byte[] biBytes = b.toByteArray(); @@ -99,7 +105,9 @@ public static int matchingNibbleLength(byte[] a, byte[] b) { int i = 0; int length = a.length < b.length ? a.length : b.length; while (i < length) { - if (a[i] != b[i]) return i; + if (a[i] != b[i]) { + return i; + } i++; } return i; @@ -124,7 +132,9 @@ public static byte[] longToBytes(long val) { public static byte[] longToBytesNoLeadZeroes(long val) { // todo: improve performance by while strip numbers until (long >> 8 == 0) - if (val == 0) return EMPTY_BYTE_ARRAY; + if (val == 0) { + return EMPTY_BYTE_ARRAY; + } byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(val).array(); @@ -149,7 +159,9 @@ public static byte[] intToBytes(int val) { */ public static byte[] intToBytesNoLeadZeroes(int val) { - if (val == 0) return EMPTY_BYTE_ARRAY; + if (val == 0) { + return EMPTY_BYTE_ARRAY; + } int length = 0; @@ -197,7 +209,9 @@ public static byte[] calcPacketLength(byte[] msg) { * @return unsigned positive int value. */ public static int byteArrayToInt(byte[] b) { - if (b == null || b.length == 0) return 0; + if (b == null || b.length == 0) { + return 0; + } return new BigInteger(1, b).intValue(); } @@ -210,7 +224,9 @@ public static int byteArrayToInt(byte[] b) { * @return unsigned positive long value. */ public static long byteArrayToLong(byte[] b) { - if (b == null || b.length == 0) return 0; + if (b == null || b.length == 0) { + return 0; + } return new BigInteger(1, b).longValue(); } @@ -233,7 +249,9 @@ public static String nibblesToPrettyString(byte[] nibbles) { public static String oneByteToHexString(byte value) { String retVal = Integer.toString(value & 0xFF, 16); - if (retVal.length() == 1) retVal = "0" + retVal; + if (retVal.length() == 1) { + retVal = "0" + retVal; + } return retVal; } @@ -252,7 +270,9 @@ public static int numBytes(String val) { bInt = bInt.shiftRight(8); ++bytes; } - if (bytes == 0) ++bytes; + if (bytes == 0) { + ++bytes; + } return bytes; } @@ -265,14 +285,19 @@ private static byte[] encodeValFor32Bits(Object arg) { byte[] data; // check if the string is numeric - if (arg.toString().trim().matches("-?\\d+(\\.\\d+)?")) + if (arg.toString().trim().matches("-?\\d+(\\.\\d+)?")) { data = new BigInteger(arg.toString().trim()).toByteArray(); + } // check if it's hex number - else if (arg.toString().trim().matches("0[xX][0-9a-fA-F]+")) + else if (arg.toString().trim().matches("0[xX][0-9a-fA-F]+")) { data = new BigInteger(arg.toString().trim().substring(2), 16).toByteArray(); - else data = arg.toString().trim().getBytes(); + } else { + data = arg.toString().trim().getBytes(); + } - if (data.length > 32) throw new RuntimeException("values can't be more than 32 byte"); + if (data.length > 32) { + throw new RuntimeException("values can't be more than 32 byte"); + } byte[] val = new byte[32]; @@ -314,7 +339,9 @@ public static int firstNonZeroByte(byte[] data) { public static byte[] stripLeadingZeroes(byte[] data) { - if (data == null) return null; + if (data == null) { + return null; + } final int firstNonZero = firstNonZeroByte(data); switch (firstNonZero) { @@ -343,7 +370,9 @@ public static boolean increment(byte[] bytes) { int i; for (i = bytes.length - 1; i >= startIndex; i--) { bytes[i]++; - if (bytes[i] != 0) break; + if (bytes[i] != 0) { + break; + } } // we return false when all bytes are 0 again return (i >= startIndex || bytes[startIndex] != 0); @@ -357,7 +386,7 @@ public static boolean increment(byte[] bytes) { * @return Byte array of given size with a copy of the src */ public static byte[] copyToArray(BigInteger value) { - byte[] src = ByteUtil.bigIntegerToBytes(value); + byte[] src = ByteUtils.bigIntegerToBytes(value); byte[] dest = ByteBuffer.allocate(32).array(); if (src != null) { System.arraycopy(src, 0, dest, dest.length - src.length, src.length); @@ -371,15 +400,20 @@ public static byte[] copyToArray(BigInteger value) { public static byte[] setBit(byte[] data, int pos, int val) { - if ((data.length * 8) - 1 < pos) throw new Error("outside byte array limit, pos: " + pos); + if ((data.length * 8) - 1 < pos) { + throw new Error("outside byte array limit, pos: " + pos); + } int posByte = data.length - 1 - (pos) / 8; int posBit = (pos) % 8; byte setter = (byte) (1 << (posBit)); byte toBeSet = data[posByte]; byte result; - if (val == 1) result = (byte) (toBeSet | setter); - else result = (byte) (toBeSet & ~setter); + if (val == 1) { + result = (byte) (toBeSet | setter); + } else { + result = (byte) (toBeSet & ~setter); + } data[posByte] = result; return data; @@ -387,7 +421,9 @@ public static byte[] setBit(byte[] data, int pos, int val) { public static int getBit(byte[] data, int pos) { - if ((data.length * 8) - 1 < pos) throw new Error("outside byte array limit, pos: " + pos); + if ((data.length * 8) - 1 < pos) { + throw new Error("outside byte array limit, pos: " + pos); + } int posByte = data.length - 1 - pos / 8; int posBit = pos % 8; @@ -396,7 +432,9 @@ public static int getBit(byte[] data, int pos) { } public static byte[] and(byte[] b1, byte[] b2) { - if (b1.length != b2.length) throw new RuntimeException("Array sizes differ"); + if (b1.length != b2.length) { + throw new RuntimeException("Array sizes differ"); + } byte[] ret = new byte[b1.length]; for (int i = 0; i < ret.length; i++) { ret[i] = (byte) (b1[i] & b2[i]); @@ -405,7 +443,9 @@ public static byte[] and(byte[] b1, byte[] b2) { } public static byte[] or(byte[] b1, byte[] b2) { - if (b1.length != b2.length) throw new RuntimeException("Array sizes differ"); + if (b1.length != b2.length) { + throw new RuntimeException("Array sizes differ"); + } byte[] ret = new byte[b1.length]; for (int i = 0; i < ret.length; i++) { ret[i] = (byte) (b1[i] | b2[i]); @@ -414,7 +454,9 @@ public static byte[] or(byte[] b1, byte[] b2) { } public static byte[] xor(byte[] b1, byte[] b2) { - if (b1.length != b2.length) throw new RuntimeException("Array sizes differ"); + if (b1.length != b2.length) { + throw new RuntimeException("Array sizes differ"); + } byte[] ret = new byte[b1.length]; for (int i = 0; i < ret.length; i++) { ret[i] = (byte) (b1[i] ^ b2[i]); @@ -481,7 +523,9 @@ public static Set difference(Set setA, Set setB) { break; } } - if (!found) result.add(elementA); + if (!found) { + result.add(elementA); + } } return result; @@ -574,9 +618,15 @@ public static byte[] shortToBytes(short n) { * @return decoded bytes array */ public static byte[] hexStringToBytes(String data) { - if (data == null) return EMPTY_BYTE_ARRAY; - if (data.startsWith("0x")) data = data.substring(2); - if (data.length() % 2 == 1) data = "0" + data; + if (data == null) { + return EMPTY_BYTE_ARRAY; + } + if (data.startsWith("0x")) { + data = data.substring(2); + } + if (data.length() % 2 == 1) { + data = "0" + data; + } return Hex.decode(data); } @@ -632,7 +682,9 @@ public static int numberOfLeadingZeros(byte[] bytes) { */ public static byte[] parseBytes(byte[] input, int offset, int len) { - if (offset >= input.length || len == 0) return EMPTY_BYTE_ARRAY; + if (offset >= input.length || len == 0) { + return EMPTY_BYTE_ARRAY; + } byte[] bytes = new byte[len]; System.arraycopy(input, offset, bytes, 0, Math.min(input.length - offset, len)); diff --git a/src/main/java/org/fisco/bcos/sdk/utils/Hex.java b/src/main/java/org/fisco/bcos/sdk/utils/Hex.java index c59015647..03684631c 100644 --- a/src/main/java/org/fisco/bcos/sdk/utils/Hex.java +++ b/src/main/java/org/fisco/bcos/sdk/utils/Hex.java @@ -29,7 +29,7 @@ public static String toHexString(byte[] data) { public static String toHexString(byte[] data, int off, int length) { byte[] encoded = encode(data, off, length); - return Strings.fromByteArray(encoded); + return StringUtils.fromByteArray(encoded); } /** diff --git a/src/main/java/org/fisco/bcos/sdk/utils/Numeric.java b/src/main/java/org/fisco/bcos/sdk/utils/Numeric.java index 83e2988cb..2c4ec7793 100644 --- a/src/main/java/org/fisco/bcos/sdk/utils/Numeric.java +++ b/src/main/java/org/fisco/bcos/sdk/utils/Numeric.java @@ -79,7 +79,7 @@ public static String prependHexPrefix(String input) { } public static boolean containsHexPrefix(String input) { - return !Strings.isEmpty(input) + return !StringUtils.isEmpty(input) && input.length() > 1 && input.charAt(0) == '0' && input.charAt(1) == 'x'; @@ -121,7 +121,7 @@ public static String toHexStringWithPrefixZeroPadded(BigInteger value, int size) public static String toHexStringWithPrefixSafe(BigInteger value) { String result = toHexStringNoPrefix(value); if (result.length() < 2) { - result = Strings.zeros(1) + result; + result = StringUtils.zeros(1) + result; } return HEX_PREFIX + result; } @@ -142,7 +142,7 @@ private static String toHexStringZeroPadded(BigInteger value, int size, boolean } if (length < size) { - result = Strings.zeros(size - length) + result; + result = StringUtils.zeros(size - length) + result; } if (withPrefix) { diff --git a/src/main/java/org/fisco/bcos/sdk/utils/Strings.java b/src/main/java/org/fisco/bcos/sdk/utils/StringUtils.java similarity index 99% rename from src/main/java/org/fisco/bcos/sdk/utils/Strings.java rename to src/main/java/org/fisco/bcos/sdk/utils/StringUtils.java index d22ee72c9..72d846201 100644 --- a/src/main/java/org/fisco/bcos/sdk/utils/Strings.java +++ b/src/main/java/org/fisco/bcos/sdk/utils/StringUtils.java @@ -20,9 +20,9 @@ import java.util.Vector; /** String utility functions. */ -public class Strings { +public class StringUtils { - private Strings() {} + private StringUtils() {} public static String toCsv(List src) { // return src == null ? null : String.join(", ", src.toArray(new String[0])); From a2daf8e03ec0367dea449c80a10080edf39e1f7b Mon Sep 17 00:00:00 2001 From: maggie Date: Fri, 24 Jul 2020 18:59:51 +0800 Subject: [PATCH 2/2] add decoder of EventLog --- .../fisco/bcos/sdk/network/ConnectTest.java | 2 +- .../bcos/sdk/eventsub/EventCallback.java | 28 +++++-- .../bcos/sdk/eventsub/EventSubscribeImp.java | 23 +++++- .../eventsub/filter/EventPushMsgHandler.java | 13 +++- .../sdk/eventsub/filter/FilterManager.java | 18 +++++ .../eventsub/filter/ScheduleTimeConfig.java | 21 +++++ .../bcos/sdk/model/EventResultEntity.java | 18 +++++ .../org/fisco/bcos/sdk/model/LogResult.java | 44 +++++++++++ .../bcos/sdk/network/ChannelHandler.java | 1 - .../bcos/sdk/network/ConnectionManager.java | 10 +++ .../org/fisco/bcos/sdk/network/Network.java | 7 ++ .../fisco/bcos/sdk/network/NetworkImp.java | 5 ++ .../fisco/bcos/sdk/network/TimeoutConfig.java | 1 - .../org/fisco/bcos/sdk/network/CodecTest.java | 78 +++++++++++++++++++ 14 files changed, 255 insertions(+), 14 deletions(-) create mode 100644 src/main/java/org/fisco/bcos/sdk/eventsub/filter/ScheduleTimeConfig.java create mode 100644 src/main/java/org/fisco/bcos/sdk/model/EventResultEntity.java create mode 100644 src/main/java/org/fisco/bcos/sdk/model/LogResult.java create mode 100644 src/test/java/org/fisco/bcos/sdk/network/CodecTest.java diff --git a/src/integration-test/java/org/fisco/bcos/sdk/network/ConnectTest.java b/src/integration-test/java/org/fisco/bcos/sdk/network/ConnectTest.java index b2ca89f15..4ce1e64c2 100644 --- a/src/integration-test/java/org/fisco/bcos/sdk/network/ConnectTest.java +++ b/src/integration-test/java/org/fisco/bcos/sdk/network/ConnectTest.java @@ -46,7 +46,7 @@ public void onDisconnect(ChannelHandlerContext ctx) { Network network = Network.build(config,new TestMsgHandler()); try{ network.start(); - Thread.sleep(4000); + Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); fail("Exception is not expected"); diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/EventCallback.java b/src/main/java/org/fisco/bcos/sdk/eventsub/EventCallback.java index 45bddcc8b..f095a3838 100644 --- a/src/main/java/org/fisco/bcos/sdk/eventsub/EventCallback.java +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/EventCallback.java @@ -18,7 +18,9 @@ import java.math.BigInteger; import java.util.List; import org.fisco.bcos.sdk.model.EventLog; +import org.fisco.bcos.sdk.model.LogResult; +/** Event callback */ public abstract class EventCallback { private BigInteger lastBlockNumber = null; private long logCount = 0; @@ -27,21 +29,35 @@ public BigInteger getLastBlockNumber() { return lastBlockNumber; } - public void updateCountsAndLatestBlock(List logs) { + public void updateCountsAndLatestBlock(List logs) { if (logs.isEmpty()) { return; } - EventLog latestOne = logs.get(logs.size() - 1); + LogResult latestOne = logs.get(logs.size() - 1); if (lastBlockNumber == null) { - lastBlockNumber = latestOne.getBlockNumber(); + lastBlockNumber = latestOne.getLog().getBlockNumber(); logCount += logs.size(); } else { - if (latestOne.getBlockNumber().compareTo(lastBlockNumber) > 0) { - lastBlockNumber = latestOne.getBlockNumber(); + if (latestOne.getLog().getBlockNumber().compareTo(lastBlockNumber) > 0) { + lastBlockNumber = latestOne.getLog().getBlockNumber(); logCount += logs.size(); } } } - public abstract void onReceiveLog(int status, List logs); + /** + * according to the abi, decode the log. + * + * @param log log receive from peer + * @return decoded log result + */ + public abstract LogResult decodeLog(EventLog log); + + /** + * onReceiveLog called when sdk receive any response of the target subscription. + * + * @param status the status that peer response to sdk. + * @param logs logs from the message. + */ + public abstract void onReceiveLog(int status, List logs); } diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java b/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java index e079492e4..f794e3eaa 100644 --- a/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java @@ -17,6 +17,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.fisco.bcos.sdk.channel.Channel; import org.fisco.bcos.sdk.channel.ResponseCallback; import org.fisco.bcos.sdk.eventsub.filter.EventLogFilter; @@ -25,6 +27,7 @@ import org.fisco.bcos.sdk.eventsub.filter.EventPushMsgHandler; import org.fisco.bcos.sdk.eventsub.filter.EventSubNodeRespStatus; import org.fisco.bcos.sdk.eventsub.filter.FilterManager; +import org.fisco.bcos.sdk.eventsub.filter.ScheduleTimeConfig; import org.fisco.bcos.sdk.model.Message; import org.fisco.bcos.sdk.model.MsgType; import org.fisco.bcos.sdk.model.Response; @@ -39,6 +42,7 @@ public class EventSubscribeImp implements EventSubscribe { private FilterManager filterManager; private EventPushMsgHandler msgHander; private boolean running = false; + ScheduledThreadPoolExecutor resendSchedule = new ScheduledThreadPoolExecutor(1); public EventSubscribeImp(Channel ch, String groupId) { this.ch = ch; @@ -77,13 +81,27 @@ public void start() { if (running) { return; } - running = true; + resendSchedule.scheduleAtFixedRate( + () -> { + resendWaitingFilters(); + }, + 0, + ScheduleTimeConfig.resendFrequency, + TimeUnit.SECONDS); } @Override public void stop() { - // todo + resendSchedule.shutdown(); + } + + private void resendWaitingFilters() { + List filters = filterManager.getWaitingReqFilters(); + for (EventLogFilter filter : filters) { + sendFilter(filter); + } + logger.info("Resend waiting filters, size: {}", filters.size()); } private void sendFilter(EventLogFilter filter) { @@ -145,7 +163,6 @@ public void onResponse(Response response) { response.getContent()); try { if (0 == response.getErrorCode()) { - // todo add success logic EventLogResponse resp = ObjectMapperFactory.getObjectMapper() .readValue(response.getContent(), EventLogResponse.class); diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventPushMsgHandler.java b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventPushMsgHandler.java index 752e08661..6d4380148 100644 --- a/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventPushMsgHandler.java +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/EventPushMsgHandler.java @@ -17,9 +17,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import io.netty.channel.ChannelHandlerContext; +import java.util.ArrayList; import java.util.List; import org.fisco.bcos.sdk.eventsub.EventCallback; import org.fisco.bcos.sdk.model.EventLog; +import org.fisco.bcos.sdk.model.LogResult; import org.fisco.bcos.sdk.model.Message; import org.fisco.bcos.sdk.network.MsgHandler; import org.fisco.bcos.sdk.utils.ObjectMapperFactory; @@ -66,9 +68,16 @@ public void onMessage(ChannelHandlerContext ctx, Message msg) { if (resp.getResult() == EventSubNodeRespStatus.SUCCESS.getStatus()) { List logs = resp.getLogs(); if (!logs.isEmpty()) { - callback.onReceiveLog(resp.getResult(), logs); + List logResults = new ArrayList<>(); + for (EventLog log : logs) { + LogResult decodedLog = callback.decodeLog(log); + if (decodedLog != null) { + logResults.add(decodedLog); + } + } + callback.onReceiveLog(resp.getResult(), logResults); // update status - callback.updateCountsAndLatestBlock(logs); + callback.updateCountsAndLatestBlock(logResults); logger.info( " log size: {}, blocknumber: {}", logs.size(), diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/filter/FilterManager.java b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/FilterManager.java index 2e840375a..ac72105f4 100644 --- a/src/main/java/org/fisco/bcos/sdk/eventsub/filter/FilterManager.java +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/FilterManager.java @@ -81,4 +81,22 @@ public void updateFilterStatus( public EventCallback getCallBack(String filterID) { return filterID2Callback.get(filterID); } + + public List getWaitingReqFilters() { + List filters = new ArrayList(); + synchronized (this) { + for (EventLogFilter filter : regId2Filter.values()) { + if (filter.getStatus() == EventLogFilterStatus.WAITING_REQUEST) { + logger.info( + " resend filter, update event filter status: {}, registerID: {}, filter: {}", + filter.getStatus(), + filter.getRegisterID(), + filter); + filters.add(filter); + filter.setStatus(EventLogFilterStatus.WAITING_RESPONSE); + } + } + } + return filters; + } } diff --git a/src/main/java/org/fisco/bcos/sdk/eventsub/filter/ScheduleTimeConfig.java b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/ScheduleTimeConfig.java new file mode 100644 index 000000000..59c8d3310 --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/eventsub/filter/ScheduleTimeConfig.java @@ -0,0 +1,21 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.eventsub.filter; + +/** Resend task time cofing */ +public class ScheduleTimeConfig { + public static long resendFrequency = (long) 10000; +} diff --git a/src/main/java/org/fisco/bcos/sdk/model/EventResultEntity.java b/src/main/java/org/fisco/bcos/sdk/model/EventResultEntity.java new file mode 100644 index 000000000..a67340bb1 --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/model/EventResultEntity.java @@ -0,0 +1,18 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.model; + +public class EventResultEntity {} diff --git a/src/main/java/org/fisco/bcos/sdk/model/LogResult.java b/src/main/java/org/fisco/bcos/sdk/model/LogResult.java new file mode 100644 index 000000000..28783543f --- /dev/null +++ b/src/main/java/org/fisco/bcos/sdk/model/LogResult.java @@ -0,0 +1,44 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.model; + +import java.util.List; + +public class LogResult { + private List logParams; + private EventLog log; + + public List getLogParams() { + return logParams; + } + + public void setLogParams(List logParams) { + this.logParams = logParams; + } + + public EventLog getLog() { + return log; + } + + public void setLog(EventLog log) { + this.log = log; + } + + @Override + public String toString() { + return "LogResult [logParams=" + logParams + ", log=" + log + "]"; + } +} diff --git a/src/main/java/org/fisco/bcos/sdk/network/ChannelHandler.java b/src/main/java/org/fisco/bcos/sdk/network/ChannelHandler.java index 7123e6f3a..954fdc86f 100644 --- a/src/main/java/org/fisco/bcos/sdk/network/ChannelHandler.java +++ b/src/main/java/org/fisco/bcos/sdk/network/ChannelHandler.java @@ -35,7 +35,6 @@ public class ChannelHandler extends SimpleChannelInboundHandler { private ConnectionManager connectionManager; public ChannelHandler(ConnectionManager connManager, MsgHandler msgHandler) { - // todo: add thread pool this.msgHandler = msgHandler; this.connectionManager = connManager; } diff --git a/src/main/java/org/fisco/bcos/sdk/network/ConnectionManager.java b/src/main/java/org/fisco/bcos/sdk/network/ConnectionManager.java index 5ecc84d30..fa7874e8e 100644 --- a/src/main/java/org/fisco/bcos/sdk/network/ConnectionManager.java +++ b/src/main/java/org/fisco/bcos/sdk/network/ConnectionManager.java @@ -336,4 +336,14 @@ protected void removeConnectionContext(String ip, int port, ChannelHandlerContex System.identityHashCode(ctx)); } } + + protected void removeConnection(String peerIpPort) { + for (ConnectionInfo conn : connectionInfoList) { + String ipPort = conn.getIp() + ":" + conn.getPort(); + if (ipPort.equals(peerIpPort)) { + connectionInfoList.remove(conn); + return; + } + } + } } diff --git a/src/main/java/org/fisco/bcos/sdk/network/Network.java b/src/main/java/org/fisco/bcos/sdk/network/Network.java index 3ba067914..cc0c1d4c7 100644 --- a/src/main/java/org/fisco/bcos/sdk/network/Network.java +++ b/src/main/java/org/fisco/bcos/sdk/network/Network.java @@ -70,6 +70,13 @@ static Network build(ConfigOption config, MsgHandler handler) { */ Map getAvailableConnections(); + /** + * Remove the connection if version negotiation failed + * + * @param peerIpPort + */ + void removeConnection(String peerIpPort); + /** Exit gracefully */ void stop(); } diff --git a/src/main/java/org/fisco/bcos/sdk/network/NetworkImp.java b/src/main/java/org/fisco/bcos/sdk/network/NetworkImp.java index 7e768a788..7620b3385 100644 --- a/src/main/java/org/fisco/bcos/sdk/network/NetworkImp.java +++ b/src/main/java/org/fisco/bcos/sdk/network/NetworkImp.java @@ -76,6 +76,11 @@ public Map getAvailableConnections() { return connManager.getAvailableConnections(); } + @Override + public void removeConnection(String peerIpPort) { + connManager.removeConnection(peerIpPort); + } + @Override public void stop() { connManager.stopReconnectSchedule(); diff --git a/src/main/java/org/fisco/bcos/sdk/network/TimeoutConfig.java b/src/main/java/org/fisco/bcos/sdk/network/TimeoutConfig.java index 60862ccf3..5f5d3cfbd 100644 --- a/src/main/java/org/fisco/bcos/sdk/network/TimeoutConfig.java +++ b/src/main/java/org/fisco/bcos/sdk/network/TimeoutConfig.java @@ -17,7 +17,6 @@ public class TimeoutConfig { public static long idleTimeout = (long) 10000; - public static long heartBeatDelay = (long) 2000; public static long reconnectDelay = (long) 20000; public static long connectTimeout = (long) 10000; public static long sslHandShakeTimeout = (long) 10000; diff --git a/src/test/java/org/fisco/bcos/sdk/network/CodecTest.java b/src/test/java/org/fisco/bcos/sdk/network/CodecTest.java new file mode 100644 index 000000000..e195c8217 --- /dev/null +++ b/src/test/java/org/fisco/bcos/sdk/network/CodecTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2014-2020 [fisco-dev] + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.fisco.bcos.sdk.network; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import org.fisco.bcos.sdk.model.Message; +import org.fisco.bcos.sdk.model.MsgType; +import org.junit.Assert; +import org.junit.Test; + +public class CodecTest { + String seq = UUID.randomUUID().toString().replaceAll("-", ""); + + @Test + public void testMsgDecoder() { + ByteBuf in = getEventMsgByteBuf(); + MessageDecoder decoder = new MessageDecoder(); + List out = new ArrayList<>(); + try { + decoder.decode(null, in, out); + Message msg = (Message) out.get(0); + Assert.assertEquals(seq, msg.getSeq()); + Assert.assertEquals(MsgType.EVENT_LOG_PUSH.ordinal(), msg.getType().intValue()); + Assert.assertEquals("data", new String(msg.getData())); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testMsgEncoder() { + MessageEncoder encoder = new MessageEncoder(); + ByteBuf out = Unpooled.buffer(); + try { + encoder.encode(null, getEventMsg(), out); + Assert.assertEquals(out, getEventMsgByteBuf()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private Message getEventMsg() { + Message msg = new Message(); + msg.setSeq(seq); + msg.setType((short) MsgType.EVENT_LOG_PUSH.ordinal()); + msg.setResult(0); + msg.setData("data".getBytes()); + return msg; + } + + private ByteBuf getEventMsgByteBuf() { + ByteBuf byteBuf = Unpooled.buffer(); + Message msg = new Message(); + msg.setSeq(seq); + msg.setType((short) MsgType.EVENT_LOG_PUSH.ordinal()); + msg.setResult(0); + msg.setData("data".getBytes()); + msg.encode(byteBuf); + return byteBuf; + } +}