diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java index bb31911d2bd..49abada260c 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java @@ -27,6 +27,7 @@ import bisq.network.p2p.storage.payload.CapabilityRequiringPayload; import bisq.network.p2p.storage.payload.PersistableNetworkPayload; import bisq.network.p2p.storage.payload.ProcessOncePersistableNetworkPayload; +import bisq.network.p2p.storage.payload.PrunablePersistableNetworkPayload; import bisq.common.app.Capabilities; import bisq.common.app.Capability; @@ -48,6 +49,7 @@ import java.util.Date; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import lombok.Value; import lombok.extern.slf4j.Slf4j; @@ -65,11 +67,15 @@ @Slf4j @Value public final class TradeStatistics2 implements ProcessOncePersistableNetworkPayload, PersistableNetworkPayload, - CapabilityRequiringPayload, Comparable { + CapabilityRequiringPayload, Comparable, PrunablePersistableNetworkPayload { public static final String MEDIATOR_ADDRESS = "medAddr"; public static final String REFUND_AGENT_ADDRESS = "refAddr"; + // We keep only recent trade statistics data (max. 60 days old). Older data gets removed and will be + // handled by the historical trade statistics data framework. + private static final long INCLUSION_PERIOD = TimeUnit.DAYS.toMillis(60); + private final OfferPayload.Direction direction; private final String baseCurrency; private final String counterCurrency; @@ -246,6 +252,11 @@ public Capabilities getRequiredCapabilities() { return new Capabilities(Capability.TRADE_STATISTICS_HASH_UPDATE); } + @Override + public boolean doExclude() { + return System.currentTimeMillis() - tradeDate > INCLUSION_PERIOD; + } + /////////////////////////////////////////////////////////////////////////////////////////// // Getters diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java index c732a077f2c..2ae9b6deb76 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java @@ -19,23 +19,27 @@ import bisq.network.p2p.storage.P2PDataStorage; import bisq.network.p2p.storage.payload.PersistableNetworkPayload; +import bisq.network.p2p.storage.payload.PrunablePersistableNetworkPayload; import bisq.network.p2p.storage.persistence.MapStoreService; +import bisq.network.p2p.storage.persistence.PrunableStoreService; import bisq.common.config.Config; import bisq.common.storage.Storage; -import javax.inject.Named; - import javax.inject.Inject; +import javax.inject.Named; import java.io.File; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @Slf4j -public class TradeStatistics2StorageService extends MapStoreService { +public class TradeStatistics2StorageService extends MapStoreService + implements PrunableStoreService { private static final String FILE_NAME = "TradeStatistics2Store"; @@ -79,8 +83,27 @@ protected TradeStatistics2Store createStore() { return new TradeStatistics2Store(); } + // At startup we check our persisted data if it contains too old entries and remove those. + // This method is called from a non user thread. @Override - protected void readStore() { - super.readStore(); + public synchronized void prune() { + AtomicBoolean hasExcludedElements = new AtomicBoolean(false); + Map map = getMap(); + Map newMap = map.entrySet().stream() + .filter(e -> { + if (((PrunablePersistableNetworkPayload) e.getValue()).doExclude()) { + hasExcludedElements.set(true); + return false; + } else { + return true; + } + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (hasExcludedElements.get()) { + map.clear(); + map.putAll(newMap); + persist(); + } } } diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java index 4d90fc143c9..4f06d33f682 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java @@ -91,7 +91,7 @@ public void onAllServicesInitialized() { priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet); - dump(); + maybeDump(); } public ObservableSet getObservableTradeStatisticsSet() { @@ -99,35 +99,37 @@ public ObservableSet getObservableTradeStatisticsSet() { } private void addToSet(TradeStatistics2 tradeStatistics) { + if (observableTradeStatisticsSet.contains(tradeStatistics)) { + return; + } + + if (!tradeStatistics.isValid()) { + return; + } - if (!observableTradeStatisticsSet.contains(tradeStatistics)) { - Optional duplicate = observableTradeStatisticsSet.stream().filter( - e -> e.getOfferId().equals(tradeStatistics.getOfferId())).findAny(); - - if (duplicate.isPresent()) { - // TODO: Can be removed as soon as everyone uses v1.2.6+ - // Removes an existing object with a trade id if the new one matches the existing except - // for the deposit tx id - if (tradeStatistics.getDepositTxId() == null && - tradeStatistics.isValid() && - duplicate.get().compareTo(tradeStatistics) == 0) { - observableTradeStatisticsSet.remove(duplicate.get()); - } else { - return; - } - } - if (!tradeStatistics.isValid()) { + // TODO remove that part + Optional duplicate = observableTradeStatisticsSet.stream().filter( + e -> e.getOfferId().equals(tradeStatistics.getOfferId())).findAny(); + if (duplicate.isPresent()) { + // TODO: Can be removed as soon as everyone uses v1.2.6+ + // Removes an existing object with a trade id if the new one matches the existing except + // for the deposit tx id + if (tradeStatistics.getDepositTxId() == null && + tradeStatistics.isValid() && + duplicate.get().compareTo(tradeStatistics) == 0) { + observableTradeStatisticsSet.remove(duplicate.get()); + } else { return; } - - observableTradeStatisticsSet.add(tradeStatistics); - priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet); - dump(); } + + observableTradeStatisticsSet.add(tradeStatistics); + priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet); + maybeDump(); } - private void dump() { + private void maybeDump() { if (dumpStatistics) { ArrayList fiatCurrencyList = CurrencyUtil.getAllSortedFiatCurrencies().stream() .map(e -> new CurrencyTuple(e.getCode(), e.getName(), 8)) diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index 34e29f3f5fa..1cf93b6743c 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -44,6 +44,7 @@ import bisq.network.p2p.storage.payload.ProtectedMailboxStorageEntry; import bisq.network.p2p.storage.payload.ProtectedStorageEntry; import bisq.network.p2p.storage.payload.ProtectedStoragePayload; +import bisq.network.p2p.storage.payload.PrunablePersistableNetworkPayload; import bisq.network.p2p.storage.payload.RequiresOwnerIsOnlinePayload; import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreListener; import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService; @@ -185,8 +186,12 @@ public synchronized void readFromResources(String postFix) { resourceDataStoreService.readFromResources(postFix); map.putAll(protectedDataStoreService.getMap()); + + // Prune in case we have a PrunableStoreService + appendOnlyDataStoreService.prune(); } + /////////////////////////////////////////////////////////////////////////////////////////// // RequestData API /////////////////////////////////////////////////////////////////////////////////////////// @@ -521,6 +526,12 @@ private boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, return false; } + // If we receive data which are considered pruned from the seed node we ignore it + if (payload instanceof PrunablePersistableNetworkPayload && + ((PrunablePersistableNetworkPayload) payload).doExclude()) { + return false; + } + // Add the payload and publish the state update to the appendOnlyDataStoreListeners if (!payloadHashAlreadyInStore) { appendOnlyDataStoreService.put(hashAsByteArray, payload); @@ -540,13 +551,20 @@ private boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, // is ready so no listeners are set anyway. We might get called twice from a redundant call later, so listeners // might be added then but as we have the data already added calling them would be irrelevant as well. private void addPersistableNetworkPayloadFromInitialRequest(PersistableNetworkPayload payload) { - byte[] hash = payload.getHash(); - if (payload.verifyHashSize()) { - ByteArray hashAsByteArray = new ByteArray(hash); - appendOnlyDataStoreService.put(hashAsByteArray, payload); - } else { - log.warn("We got a hash exceeding our permitted size"); + if (!payload.verifyHashSize()) { + log.warn("We got a hash not matching our defined size"); + return; } + + // If we receive data which are considered pruned from the seed node we ignore it + if (payload instanceof PrunablePersistableNetworkPayload && + ((PrunablePersistableNetworkPayload) payload).doExclude()) { + return; + } + + byte[] hash = payload.getHash(); + ByteArray hashAsByteArray = new ByteArray(hash); + appendOnlyDataStoreService.put(hashAsByteArray, payload); } /** diff --git a/p2p/src/main/java/bisq/network/p2p/storage/payload/PrunablePersistableNetworkPayload.java b/p2p/src/main/java/bisq/network/p2p/storage/payload/PrunablePersistableNetworkPayload.java new file mode 100644 index 00000000000..82b268d800f --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/storage/payload/PrunablePersistableNetworkPayload.java @@ -0,0 +1,25 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.storage.payload; + +/** + * Interface for PersistableNetworkPayloads which can be pruned (e.g. old objects will be removed from data store). + */ +public interface PrunablePersistableNetworkPayload extends PersistableNetworkPayload { + boolean doExclude(); +} diff --git a/p2p/src/main/java/bisq/network/p2p/storage/persistence/AppendOnlyDataStoreService.java b/p2p/src/main/java/bisq/network/p2p/storage/persistence/AppendOnlyDataStoreService.java index c7ea20c9438..b64a40e258f 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/persistence/AppendOnlyDataStoreService.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/persistence/AppendOnlyDataStoreService.java @@ -66,4 +66,11 @@ public void put(P2PDataStorage.ByteArray hashAsByteArray, PersistableNetworkPayl .filter(service -> service.canHandle(payload)) .forEach(service -> service.putIfAbsent(hashAsByteArray, payload)); } + + public void prune() { + services.stream() + .filter(e -> e instanceof PrunableStoreService) + .map((e -> (PrunableStoreService) e)) + .forEach(PrunableStoreService::prune); + } } diff --git a/p2p/src/main/java/bisq/network/p2p/storage/persistence/PrunableStoreService.java b/p2p/src/main/java/bisq/network/p2p/storage/persistence/PrunableStoreService.java new file mode 100644 index 00000000000..07af3fbf51c --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/storage/persistence/PrunableStoreService.java @@ -0,0 +1,25 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.storage.persistence; + +/** + * Interface for StoreService which can be pruned (e.g. old objects will be removed from data store). + */ +public interface PrunableStoreService { + void prune(); +}