Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void onDisconnect(ChannelHandlerContext ctx) {
Network network = Network.build(config,new TestMsgHandler());
try{
network.start();
Thread.sleep(4000);
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
fail("Exception is not expected");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.fisco.bcos.sdk.crypto.hash.Hash;
import org.fisco.bcos.sdk.utils.Hex;
import org.fisco.bcos.sdk.utils.Numeric;
import org.fisco.bcos.sdk.utils.Strings;
import org.fisco.bcos.sdk.utils.StringUtils;
import org.fisco.bcos.sdk.utils.exceptions.DecoderException;

public abstract class CryptoKeyPair {
Expand Down Expand Up @@ -146,7 +146,7 @@ protected String getPublicKeyNoPrefix(String publicKeyStr) {
// Hexadecimal public key length is less than 128, add 0 in front
if (publicKeyNoPrefix.length() < PUBLIC_KEY_LENGTH_IN_HEX) {
publicKeyNoPrefix =
Strings.zeros(PUBLIC_KEY_LENGTH_IN_HEX - publicKeyNoPrefix.length())
StringUtils.zeros(PUBLIC_KEY_LENGTH_IN_HEX - publicKeyNoPrefix.length())
+ publicKeyNoPrefix;
}
return publicKeyNoPrefix;
Expand Down
47 changes: 46 additions & 1 deletion src/main/java/org/fisco/bcos/sdk/eventsub/EventCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,49 @@

package org.fisco.bcos.sdk.eventsub;

public abstract class EventCallback {}
import java.math.BigInteger;
import java.util.List;
import org.fisco.bcos.sdk.model.EventLog;
import org.fisco.bcos.sdk.model.LogResult;

/** Event callback */
public abstract class EventCallback {
private BigInteger lastBlockNumber = null;
private long logCount = 0;

public BigInteger getLastBlockNumber() {
return lastBlockNumber;
}

public void updateCountsAndLatestBlock(List<LogResult> logs) {
if (logs.isEmpty()) {
return;
}
LogResult latestOne = logs.get(logs.size() - 1);
if (lastBlockNumber == null) {
lastBlockNumber = latestOne.getLog().getBlockNumber();
logCount += logs.size();
} else {
if (latestOne.getLog().getBlockNumber().compareTo(lastBlockNumber) > 0) {
lastBlockNumber = latestOne.getLog().getBlockNumber();
logCount += logs.size();
}
}
}

/**
* according to the abi, decode the log.
*
* @param log log receive from peer
* @return decoded log result
*/
public abstract LogResult decodeLog(EventLog log);

/**
* onReceiveLog called when sdk receive any response of the target subscription.
*
* @param status the status that peer response to sdk.
* @param logs logs from the message.
*/
public abstract void onReceiveLog(int status, List<LogResult> logs);
}
84 changes: 83 additions & 1 deletion src/main/java/org/fisco/bcos/sdk/eventsub/EventLogParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,86 @@

package org.fisco.bcos.sdk.eventsub;

public class EventLogParams {}
import java.util.List;
import org.fisco.bcos.sdk.utils.AddressUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventLogParams {
private static Logger logger = LoggerFactory.getLogger(EventLogParams.class);
private String fromBlock;
private String toBlock;
private List<String> addresses;
private List<Object> topics;

public static Logger getLogger() {
return logger;
}

public static void setLogger(Logger logger) {
EventLogParams.logger = logger;
}

public String getFromBlock() {
return fromBlock;
}

public void setFromBlock(String fromBlock) {
this.fromBlock = fromBlock;
}

public String getToBlock() {
return toBlock;
}

public void setToBlock(String toBlock) {
this.toBlock = toBlock;
}

public List<String> getAddresses() {
return addresses;
}

public void setAddresses(List<String> addresses) {
this.addresses = addresses;
}

public List<Object> getTopics() {
return topics;
}

public void setTopics(List<Object> topics) {
this.topics = topics;
}

@Override
public String toString() {
return "EventLogFilterParams [fromBlock="
+ fromBlock
+ ", toBlock="
+ toBlock
+ ", addresses="
+ addresses
+ ", topics="
+ topics
+ "]";
}

public boolean validAddresses() {
if (getAddresses() == null) {
return false;
}
for (String address : getAddresses()) {
// check if address valid
if (!AddressUtils.isValidAddress(address)) {
return false;
}
}
return true;
}

public boolean valid() {
// todo add other verifications
return validAddresses();
}
}
13 changes: 10 additions & 3 deletions src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package org.fisco.bcos.sdk.eventsub;

import java.util.List;
import java.util.UUID;
import org.fisco.bcos.sdk.channel.Channel;
import org.fisco.bcos.sdk.eventsub.filter.EventLogFilter;

/**
* Event subscribe interface.
Expand All @@ -32,7 +34,12 @@ public interface EventSubscribe {
* @return EventSubscribe Object
*/
static EventSubscribe build(Channel ch, String groupId) {
return null;
return new EventSubscribeImp(ch, groupId);
}

static String newSeq() {
String seq = UUID.randomUUID().toString().replaceAll("-", "");
return seq;
}

/**
Expand All @@ -46,9 +53,9 @@ static EventSubscribe build(Channel ch, String groupId) {
/**
* Unsubscribe events
*
* @param filterId
* @param filterID
*/
void unsubscribeEvent(String filterId);
void unsubscribeEvent(String filterID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该是filterId吧,和groupId一样保持命名风格


/**
* Get all subscribed event.
Expand Down
197 changes: 197 additions & 0 deletions src/main/java/org/fisco/bcos/sdk/eventsub/EventSubscribeImp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright 2014-2020 [fisco-dev]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package org.fisco.bcos.sdk.eventsub;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.fisco.bcos.sdk.channel.Channel;
import org.fisco.bcos.sdk.channel.ResponseCallback;
import org.fisco.bcos.sdk.eventsub.filter.EventLogFilter;
import org.fisco.bcos.sdk.eventsub.filter.EventLogFilterStatus;
import org.fisco.bcos.sdk.eventsub.filter.EventLogResponse;
import org.fisco.bcos.sdk.eventsub.filter.EventPushMsgHandler;
import org.fisco.bcos.sdk.eventsub.filter.EventSubNodeRespStatus;
import org.fisco.bcos.sdk.eventsub.filter.FilterManager;
import org.fisco.bcos.sdk.eventsub.filter.ScheduleTimeConfig;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.Response;
import org.fisco.bcos.sdk.utils.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventSubscribeImp implements EventSubscribe {
private static final Logger logger = LoggerFactory.getLogger(EventSubscribeImp.class);
private Channel ch;
private String groupId;
private FilterManager filterManager;
private EventPushMsgHandler msgHander;
private boolean running = false;
ScheduledThreadPoolExecutor resendSchedule = new ScheduledThreadPoolExecutor(1);

public EventSubscribeImp(Channel ch, String groupId) {
this.ch = ch;
this.groupId = groupId;
filterManager = new FilterManager();
msgHander = new EventPushMsgHandler(filterManager);
ch.addMessageHandler(MsgType.EVENT_LOG_PUSH, msgHander);
}

@Override
public void subscribeEvent(EventLogParams params, EventCallback callback) {
if (!params.valid()) {
callback.onReceiveLog(EventSubNodeRespStatus.INVALID_PARAMS.getStatus(), null);
return;
}
EventLogFilter filter = new EventLogFilter();
filter.setRegisterID(EventSubscribe.newSeq());
filter.setParams(params);
filter.setCallback(callback);
filterManager.addFilter(filter);
sendFilter(filter);
}

@Override
public void unsubscribeEvent(String filterID) {
// Todo need node support
}

@Override
public List<EventLogFilter> getAllSubscribedEvent() {
return filterManager.getAllSubscribedEvent();
}

@Override
public void start() {
if (running) {
return;
}
running = true;
resendSchedule.scheduleAtFixedRate(
() -> {
resendWaitingFilters();
},
0,
ScheduleTimeConfig.resendFrequency,
TimeUnit.SECONDS);
}

@Override
public void stop() {
resendSchedule.shutdown();
}

private void resendWaitingFilters() {
List<EventLogFilter> filters = filterManager.getWaitingReqFilters();
for (EventLogFilter filter : filters) {
sendFilter(filter);
}
logger.info("Resend waiting filters, size: {}", filters.size());
}

private void sendFilter(EventLogFilter filter) {
Message msg = new Message();
msg.setSeq(EventSubscribe.newSeq());
msg.setType((short) MsgType.CLIENT_REGISTER_EVENT_LOG.ordinal());
msg.setResult(0);
try {
String content = filter.getNewParamJsonString(groupId);
msg.setData(content.getBytes());
} catch (JsonProcessingException e) {
logger.error(
"send filter error, registerID: {},filterID : {}, error: {}",
filter.getRegisterID(),
filter.getFilterID(),
e.getMessage());
logger.error(
"remove bad filter , registerID: {},filterID : {}",
filter.getRegisterID(),
filter.getFilterID());
filterManager.removeFilter(filter.getRegisterID());
}

filterManager.addCallback(filter.getFilterID(), filter.getCallback());

// Todo check send to group function. this function may not in channel module.
ch.asyncSendToGroup(
msg,
groupId,
new RegisterEventSubRespCallback(
filterManager, filter, filter.getFilterID(), filter.getRegisterID()));
}

class RegisterEventSubRespCallback extends ResponseCallback {
FilterManager filterManager;
EventLogFilter filter;
String filterID;
String registerID;

public RegisterEventSubRespCallback(
FilterManager filterManager,
EventLogFilter filter,
String filterID,
String registerID) {
this.filterManager = filterManager;
this.filter = filter;
this.filterID = filterID;
this.registerID = registerID;
}

@Override
public void onResponse(Response response) {
logger.info(
" event filter callback response, registerID: {}, filterID: {}, seq: {}, error code: {}, content: {}",
registerID,
filterID,
response.getMessageID(),
response.getErrorCode(),
response.getContent());
try {
if (0 == response.getErrorCode()) {
EventLogResponse resp =
ObjectMapperFactory.getObjectMapper()
.readValue(response.getContent(), EventLogResponse.class);
if (resp.getResult() == 0) {
// node give an "OK" response, event log will be pushed soon
filterManager.updateFilterStatus(
filter, EventLogFilterStatus.EVENT_LOG_PUSHING, null);
} else {
// node give a bad response, will not push event log, trigger callback
filter.getCallback().onReceiveLog(resp.getResult(), null);
filterManager.removeFilter(registerID);
filterManager.removeCallback(filterID);
}
} else {
filterManager.updateFilterStatus(
filter, EventLogFilterStatus.WAITING_REQUEST, null);
filterManager.removeCallback(filterID);
}
} catch (Exception e) {
logger.error(
" event filter response message exception, filterID: {}, registerID: {}, exception message: {}",
filterID,
registerID,
e.getMessage());
filter.getCallback()
.onReceiveLog(EventSubNodeRespStatus.OTHER_ERROR.getStatus(), null);
filterManager.removeFilter(registerID);
filterManager.removeCallback(filterID);
}
}
}
}
Loading