Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix performance issue in BsqWalletService #3177

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 52 additions & 12 deletions core/src/main/java/bisq/core/btc/wallet/BsqWalletService.java
Expand Up @@ -35,6 +35,8 @@
import bisq.core.provider.fee.FeeService;
import bisq.core.user.Preferences;

import bisq.common.UserThread;

import org.bitcoinj.core.Address;
import org.bitcoinj.core.AddressFormatException;
import org.bitcoinj.core.BlockChain;
Expand All @@ -57,15 +59,14 @@

import javax.inject.Inject;

import javafx.collections.FXCollections;
import javafx.collections.ObservableList;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -80,12 +81,20 @@

@Slf4j
public class BsqWalletService extends WalletService implements DaoStateListener {

public interface WalletTransactionsChangeListener {

void onWalletTransactionsChange();
}

private final BsqCoinSelector bsqCoinSelector;
private final NonBsqCoinSelector nonBsqCoinSelector;
private final DaoStateService daoStateService;
private final UnconfirmedBsqChangeOutputListService unconfirmedBsqChangeOutputListService;
private final ObservableList<Transaction> walletTransactions = FXCollections.observableArrayList();
private final List<Transaction> walletTransactions = new ArrayList<>();
private final CopyOnWriteArraySet<BsqBalanceListener> bsqBalanceListeners = new CopyOnWriteArraySet<>();
private final List<WalletTransactionsChangeListener> walletTransactionsChangeListeners = new ArrayList<>();
private boolean updateBsqWalletTransactionsPending;

// balance of non BSQ satoshis
@Getter
Expand Down Expand Up @@ -152,7 +161,13 @@ public void onReorganize(Wallet wallet) {

@Override
public void onTransactionConfidenceChanged(Wallet wallet, Transaction tx) {
updateBsqWalletTransactions();
// We are only interested in updates from unconfirmed txs and confirmed txs at the
// time when it gets into a block. Otherwise we would get called
// updateBsqWalletTransactions for each tx as the block depth changes for all.
if (tx.getConfidence().getDepthInBlocks() <= 1 &&
daoStateService.isParseBlockChainComplete()) {
updateBsqWalletTransactions();
}
unconfirmedBsqChangeOutputListService.onTransactionConfidenceChanged(tx);
}

Expand Down Expand Up @@ -215,6 +230,7 @@ String getWalletAsString(boolean includePrivKeys) {
///////////////////////////////////////////////////////////////////////////////////////////

private void updateBsqBalance() {
long ts = System.currentTimeMillis();
unverifiedBalance = Coin.valueOf(
getTransactions(false).stream()
.filter(tx -> tx.getConfidence().getConfidenceType() == PENDING)
Expand Down Expand Up @@ -246,7 +262,7 @@ private void updateBsqBalance() {
}
return false;
})
.mapToLong(in -> in != null ? in.getValue().value : 0)
.mapToLong(in -> in.getValue() != null ? in.getValue().value : 0)
.sum();
return outputs - lockedInputs;
})
Expand Down Expand Up @@ -289,6 +305,7 @@ private void updateBsqBalance() {

bsqBalanceListeners.forEach(e -> e.onUpdateBalances(availableConfirmedBalance, availableNonBsqBalance, unverifiedBalance,
unconfirmedChangeBalance, lockedForVotingBalance, lockupBondsBalance, unlockingBondsBalance));
log.info("updateBsqBalance took {} ms", System.currentTimeMillis() - ts);
}

public void addBsqBalanceListener(BsqBalanceListener listener) {
Expand All @@ -299,13 +316,21 @@ public void removeBsqBalanceListener(BsqBalanceListener listener) {
bsqBalanceListeners.remove(listener);
}

public void addWalletTransactionsChangeListener(WalletTransactionsChangeListener listener) {
walletTransactionsChangeListeners.add(listener);
}

public void removeWalletTransactionsChangeListener(WalletTransactionsChangeListener listener) {
walletTransactionsChangeListeners.remove(listener);
}


///////////////////////////////////////////////////////////////////////////////////////////
// BSQ TransactionOutputs and Transactions
///////////////////////////////////////////////////////////////////////////////////////////

public ObservableList<Transaction> getWalletTransactions() {
return walletTransactions;
public List<Transaction> getClonedWalletTransactions() {
return new ArrayList<>(walletTransactions);
}

public Stream<Transaction> getPendingWalletTransactionsStream() {
Expand All @@ -314,9 +339,21 @@ public Stream<Transaction> getPendingWalletTransactionsStream() {
}

private void updateBsqWalletTransactions() {
walletTransactions.setAll(getTransactions(false));
if (daoStateService.isParseBlockChainComplete()) {
updateBsqBalance();
// We get called updateBsqWalletTransactions multiple times from onWalletChanged, onTransactionConfidenceChanged
// and from onParseBlockCompleteAfterBatchProcessing. But as updateBsqBalance is an expensive operation we do
// not want to call it in a short interval series so we use a flag and a delay to not call it multiple times
// in a 100 ms period.
if (!updateBsqWalletTransactionsPending) {
updateBsqWalletTransactionsPending = true;
UserThread.runAfter(() -> {
walletTransactions.clear();
walletTransactions.addAll(getTransactions(false));
walletTransactionsChangeListeners.forEach(WalletTransactionsChangeListener::onWalletTransactionsChange);
updateBsqBalance();
updateBsqWalletTransactionsPending = false;
}, 100, TimeUnit.MILLISECONDS);
}
}
}

Expand Down Expand Up @@ -434,7 +471,7 @@ public Coin getValueSentToMeForTransaction(Transaction transaction) throws Scrip
}

public Optional<Transaction> isWalletTransaction(String txId) {
return getWalletTransactions().stream().filter(e -> e.getHashAsString().equals(txId)).findAny();
return walletTransactions.stream().filter(e -> e.getHashAsString().equals(txId)).findAny();
}


Expand Down Expand Up @@ -553,7 +590,10 @@ private Transaction getPreparedBurnFeeTx(Coin fee, boolean requireChangeOutput)
return tx;
}

private void addInputsAndChangeOutputForTx(Transaction tx, Coin fee, BsqCoinSelector bsqCoinSelector, boolean requireChangeOutput)
private void addInputsAndChangeOutputForTx(Transaction tx,
Coin fee,
BsqCoinSelector bsqCoinSelector,
boolean requireChangeOutput)
throws InsufficientBsqException {
Coin requiredInput;
// If our fee is less then dust limit we increase it so we are sure to not get any dust output.
Expand Down
Expand Up @@ -35,7 +35,6 @@
import javax.inject.Inject;

import javafx.collections.FXCollections;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;

import java.util.Arrays;
Expand All @@ -55,7 +54,8 @@
* unconfirmed txs.
*/
@Slf4j
public abstract class BondRepository<T extends Bond, R extends BondedAsset> implements DaoSetupService {
public abstract class BondRepository<T extends Bond, R extends BondedAsset> implements DaoSetupService,
BsqWalletService.WalletTransactionsChangeListener {

///////////////////////////////////////////////////////////////////////////////////////////
// Static
Expand Down Expand Up @@ -161,7 +161,7 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) {
update();
}
});
bsqWalletService.getWalletTransactions().addListener((ListChangeListener<Transaction>) c -> update());
bsqWalletService.addWalletTransactionsChangeListener(this);
}

@Override
Expand All @@ -170,6 +170,16 @@ public void start() {
}


///////////////////////////////////////////////////////////////////////////////////////////
// BsqWalletService.WalletTransactionsChangeListener
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onWalletTransactionsChange() {
update();
}


///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -195,6 +205,7 @@ public List<Bond> getActiveBonds() {
abstract protected Stream<R> getBondedAssetStream();

protected void update() {
log.debug("update");
getBondedAssetStream().forEach(bondedAsset -> {
String uid = bondedAsset.getUid();
bondByUidMap.putIfAbsent(uid, createBond(bondedAsset));
Expand Down
Expand Up @@ -26,12 +26,9 @@
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;

import org.bitcoinj.core.Transaction;

import javax.inject.Inject;

import javafx.collections.FXCollections;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;

import java.util.Arrays;
Expand All @@ -50,7 +47,7 @@
* unconfirmed txs.
*/
@Slf4j
public class MyBondedReputationRepository implements DaoSetupService {
public class MyBondedReputationRepository implements DaoSetupService, BsqWalletService.WalletTransactionsChangeListener {
private final DaoStateService daoStateService;
private final BsqWalletService bsqWalletService;
private final MyReputationListService myReputationListService;
Expand Down Expand Up @@ -84,19 +81,30 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) {
update();
}
});
bsqWalletService.getWalletTransactions().addListener((ListChangeListener<Transaction>) c -> update());
bsqWalletService.addWalletTransactionsChangeListener(this);
}

@Override
public void start() {
}


///////////////////////////////////////////////////////////////////////////////////////////
// BsqWalletService.WalletTransactionsChangeListener
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onWalletTransactionsChange() {
update();
}


///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////

private void update() {
log.debug("update");
// It can be that the same salt/hash is in several lockupTxs, so we use the bondByLockupTxIdMap to eliminate
// duplicates by the collection algorithm.
Map<String, MyBondedReputation> bondByLockupTxIdMap = new HashMap<>();
Expand Down
Expand Up @@ -60,7 +60,7 @@ public BondedRolesRepository(DaoStateService daoStateService, BsqWalletService b
///////////////////////////////////////////////////////////////////////////////////////////

public boolean isMyRole(Role role) {
Set<String> myWalletTransactionIds = bsqWalletService.getWalletTransactions().stream()
Set<String> myWalletTransactionIds = bsqWalletService.getClonedWalletTransactions().stream()
.map(Transaction::getHashAsString)
.collect(Collectors.toSet());
return getAcceptedBondedRoleProposalStream()
Expand Down
Expand Up @@ -71,21 +71,20 @@
import javafx.beans.value.ChangeListener;

import javafx.collections.FXCollections;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;
import javafx.collections.transformation.SortedList;

import javafx.util.Callback;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@FxmlView
public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBalanceListener, DaoStateListener {
public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBalanceListener, DaoStateListener,
BsqWalletService.WalletTransactionsChangeListener {

private TableView<BsqTxListItem> tableView;

Expand All @@ -100,7 +99,6 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
private final ObservableList<BsqTxListItem> observableList = FXCollections.observableArrayList();
// Need to be DoubleProperty as we pass it as reference
private final SortedList<BsqTxListItem> sortedList = new SortedList<>(observableList);
private ListChangeListener<Transaction> walletBsqTransactionsListener;
private int gridRow = 0;
private Label chainHeightLabel;
private ProgressBar chainSyncIndicator;
Expand Down Expand Up @@ -173,7 +171,6 @@ public void initialize() {
VBox.setVgrow(tableView, Priority.ALWAYS);
root.getChildren().add(vBox);

walletBsqTransactionsListener = change -> updateList();
walletChainHeightListener = (observable, oldValue, newValue) -> {
walletChainHeight = bsqWalletService.getBestChainHeight();
onUpdateAnyChainHeight();
Expand All @@ -183,7 +180,7 @@ public void initialize() {
@Override
protected void activate() {
bsqBalanceUtil.activate();
bsqWalletService.getWalletTransactions().addListener(walletBsqTransactionsListener);
bsqWalletService.addWalletTransactionsChangeListener(this);
bsqWalletService.addBsqBalanceListener(this);
btcWalletService.getChainHeightProperty().addListener(walletChainHeightListener);

Expand All @@ -207,7 +204,7 @@ protected void activate() {
protected void deactivate() {
bsqBalanceUtil.deactivate();
sortedList.comparatorProperty().unbind();
bsqWalletService.getWalletTransactions().removeListener(walletBsqTransactionsListener);
bsqWalletService.removeWalletTransactionsChangeListener(this);
bsqWalletService.removeBsqBalanceListener(this);
btcWalletService.getChainHeightProperty().removeListener(walletChainHeightListener);
daoFacade.removeBsqStateListener(this);
Expand Down Expand Up @@ -254,6 +251,15 @@ public void onParseBlockChainComplete() {
}
}

///////////////////////////////////////////////////////////////////////////////////////////
// BsqWalletService.WalletTransactionsChangeListener
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onWalletTransactionsChange() {
updateList();
}


///////////////////////////////////////////////////////////////////////////////////////////
// Private
Expand Down Expand Up @@ -299,8 +305,7 @@ private void onUpdateAnyChainHeight() {
private void updateList() {
observableList.forEach(BsqTxListItem::cleanup);

// copy list to avoid ConcurrentModificationException
final List<Transaction> walletTransactions = new ArrayList<>(bsqWalletService.getWalletTransactions());
List<Transaction> walletTransactions = bsqWalletService.getClonedWalletTransactions();
List<BsqTxListItem> items = walletTransactions.stream()
.map(transaction -> {
return new BsqTxListItem(transaction,
Expand Down