diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java index 2cd22ad92b4..7a12248d76e 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java @@ -82,7 +82,8 @@ public final class TradeStatistics2 implements LazyProcessedPayload, Persistable private final long tradeDate; private final String depositTxId; - // hash get set in constructor from json of all the other data fields (with hash = null). + // Hash get set in constructor from json of all the other data fields (with hash = null). + @JsonExclude private final byte[] hash; // PB field signature_pub_key_bytes not used anymore from v0.6 on @@ -90,6 +91,7 @@ public final class TradeStatistics2 implements LazyProcessedPayload, Persistable // at the P2P network storage checks. The hash of the object will be used to verify if the data is valid. Any new // field in a class would break that hash and therefore break the storage mechanism. @Nullable + @JsonExclude private Map extraDataMap; public TradeStatistics2(OfferPayload offerPayload, @@ -152,12 +154,14 @@ public TradeStatistics2(OfferPayload.Direction direction, this.depositTxId = depositTxId; this.extraDataMap = ExtraDataMapValidator.getValidatedExtraDataMap(extraDataMap); - if (hash == null) - // We create hash from all fields excluding hash itself. We use json as simple data serialisation. - // tradeDate is different for both peers so we ignore it for hash. - this.hash = Hash.getSha256Ripemd160hash(Utilities.objectToJson(this).getBytes(Charsets.UTF_8)); - else - this.hash = hash; + this.hash = hash == null ? createHash() : hash; + } + + public byte[] createHash() { + // We create hash from all fields excluding hash itself. We use json as simple data serialisation. + // TradeDate is different for both peers so we ignore it for hash. ExtraDataMap is ignored as well as at + // software updates we might have different entries which would cause a different hash. + return Hash.getSha256Ripemd160hash(Utilities.objectToJson(this).getBytes(Charsets.UTF_8)); } @Override diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java index cb684ec9353..a06ca30c67b 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java @@ -29,12 +29,12 @@ import java.io.File; +import java.util.Collection; +import java.util.HashMap; import java.util.Map; import lombok.extern.slf4j.Slf4j; -import static com.google.common.base.Preconditions.checkArgument; - @Slf4j public class TradeStatistics2StorageService extends MapStoreService { private static final String FILE_NAME = "TradeStatistics2Store"; @@ -70,6 +70,19 @@ public boolean canHandle(PersistableNetworkPayload payload) { return payload instanceof TradeStatistics2; } + Collection cleanupMap(Collection collection) { + Map tempMap = new HashMap<>(); + // We recreate the hash as there have been duplicates from diff. extraMap entries introduced at software updates + collection.forEach(item -> tempMap.putIfAbsent(new P2PDataStorage.ByteArray(item.createHash()), item)); + + Map map = getMap(); + map.clear(); + map.putAll(tempMap); + persist(); + + return tempMap.values(); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Protected @@ -83,8 +96,5 @@ protected TradeStatistics2Store createStore() { @Override protected void readStore() { super.readStore(); - checkArgument(store instanceof TradeStatistics2Store, - "Store is not instance of TradeStatistics2Store. That can happen if the ProtoBuffer " + - "file got changed. We clear the data store and recreated it again."); } } diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java index 3f40cd660bd..78d6b845044 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java @@ -43,11 +43,13 @@ import java.io.File; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -60,9 +62,11 @@ public class TradeStatisticsManager { private final JsonFileManager jsonFileManager; private final P2PService p2PService; private final PriceFeedService priceFeedService; + private final TradeStatistics2StorageService tradeStatistics2StorageService; private final ReferralIdService referralIdService; private final boolean dumpStatistics; private final ObservableSet observableTradeStatisticsSet = FXCollections.observableSet(); + private int duplicates = 0; @Inject public TradeStatisticsManager(P2PService p2PService, @@ -74,6 +78,7 @@ public TradeStatisticsManager(P2PService p2PService, @Named(AppOptionKeys.DUMP_STATISTICS) boolean dumpStatistics) { this.p2PService = p2PService; this.priceFeedService = priceFeedService; + this.tradeStatistics2StorageService = tradeStatistics2StorageService; this.referralIdService = referralIdService; this.dumpStatistics = dumpStatistics; jsonFileManager = new JsonFileManager(storageDir); @@ -97,16 +102,36 @@ public void onAllServicesInitialized() { p2PService.getP2PDataStorage().addAppendOnlyDataStoreListener(payload -> { if (payload instanceof TradeStatistics2) - addToMap((TradeStatistics2) payload, true); + addToSet((TradeStatistics2) payload); }); Map map = new HashMap<>(); + AtomicInteger origSize = new AtomicInteger(); p2PService.getP2PDataStorage().getAppendOnlyDataStoreMap().values().stream() .filter(e -> e instanceof TradeStatistics2) .map(e -> (TradeStatistics2) e) .filter(TradeStatistics2::isValid) - .forEach(e -> addToMap(e, map)); - observableTradeStatisticsSet.addAll(map.values()); + .forEach(tradeStatistics -> { + origSize.getAndIncrement(); + TradeStatistics2 prevValue = map.putIfAbsent(tradeStatistics.getOfferId(), tradeStatistics); + if (prevValue != null) { + duplicates++; + } + }); + + Collection items = map.values(); + // At startup we check if we have duplicate entries. This might be the case from software updates when we + // introduced new entries to the extraMap. As that map is for flexibility in updates we keep it excluded from + // json so that it will not cause duplicates anymore. Until all users have updated we keep the cleanup code. + // Should not be needed later anymore, but will also not hurt if no duplicates exist. + if (duplicates > 0) { + long ts = System.currentTimeMillis(); + items = tradeStatistics2StorageService.cleanupMap(items); + log.info("We found {} duplicate entries. Size of map entries before and after cleanup: {} / {}. Cleanup took {} ms.", + duplicates, origSize, items.size(), System.currentTimeMillis() - ts); + } + + observableTradeStatisticsSet.addAll(items); priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet); @@ -131,7 +156,7 @@ public void publishTradeStatistics(List trades) { trade.getDate(), (trade.getDepositTx() != null ? trade.getDepositTx().getHashAsString() : ""), extraDataMap); - addToMap(tradeStatistics, true); + addToSet(tradeStatistics); // We only republish trades from last 10 days if ((new Date().getTime() - trade.getDate().getTime()) < TimeUnit.DAYS.toMillis(10)) { @@ -149,30 +174,22 @@ public ObservableSet getObservableTradeStatisticsSet() { return observableTradeStatisticsSet; } - private void addToMap(TradeStatistics2 tradeStatistics, boolean storeLocally) { + private void addToSet(TradeStatistics2 tradeStatistics) { if (!observableTradeStatisticsSet.contains(tradeStatistics)) { - - if (observableTradeStatisticsSet.stream() - .anyMatch(e -> (e.getOfferId().equals(tradeStatistics.getOfferId())))) + if (observableTradeStatisticsSet.stream().anyMatch(e -> e.getOfferId().equals(tradeStatistics.getOfferId()))) { return; + } - if (!tradeStatistics.isValid()) + if (!tradeStatistics.isValid()) { return; + } observableTradeStatisticsSet.add(tradeStatistics); - if (storeLocally) { - priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet); - dump(); - } + priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet); + dump(); } } - private void addToMap(TradeStatistics2 tradeStatistics, Map map) { - TradeStatistics2 prevValue = map.putIfAbsent(tradeStatistics.getOfferId(), tradeStatistics); - if (prevValue != null) - log.trace("We have already an item with the same offer ID. That might happen if both the maker and the taker published the tradeStatistics"); - } - private void dump() { if (dumpStatistics) { // We store the statistics as json so it is easy for further processing (e.g. for web based services) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java index c97dae0f5ce..b699138e5f5 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java @@ -58,6 +58,8 @@ @Slf4j class RequestDataHandler implements MessageListener { private static final long TIMEOUT = 90; + private static boolean initialRequestApplied = false; + private NodeAddress peersNodeAddress; /* */ @@ -240,7 +242,12 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { // Processing 82645 items took now 61 ms compared to earlier version where it took ages (> 2min). // Usually we only get about a few hundred or max. a few 1000 items. 82645 is all // trade stats stats and all account age witness data. - dataStorage.addPersistableNetworkPayloadFromInitialRequest(e); + + // We only apply it once from first response + if (!initialRequestApplied) { + dataStorage.addPersistableNetworkPayloadFromInitialRequest(e); + initialRequestApplied = true; + } } else { // We don't broadcast here as we are only connected to the seed node and would be pointless dataStorage.addPersistableNetworkPayload(e, sender, false, diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index be956d0fb04..f01157f3fcc 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -348,7 +348,6 @@ public boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, // Overwriting an entry would be also no issue. We also skip notifying listeners as we get called before the domain // is ready so no listeners are set anyway. We might get called twice from a redundant call later, so listeners // might be added then but as we have the data already added calling them would be irrelevant as well. - // TODO find a way to avoid the second call... public boolean addPersistableNetworkPayloadFromInitialRequest(PersistableNetworkPayload payload) { byte[] hash = payload.getHash(); if (payload.verifyHashSize()) {