Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve TempProposal processing (fixes #3143) #3148

Merged
merged 1 commit into from Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/java/bisq/core/dao/DaoSetup.java
Expand Up @@ -27,6 +27,7 @@
import bisq.core.dao.governance.bond.role.BondedRolesRepository;
import bisq.core.dao.governance.period.CycleService;
import bisq.core.dao.governance.proofofburn.ProofOfBurnService;
import bisq.core.dao.governance.proposal.ProposalListPresentation;
import bisq.core.dao.governance.proposal.ProposalService;
import bisq.core.dao.governance.voteresult.MissingDataRequestService;
import bisq.core.dao.governance.voteresult.VoteResultService;
Expand Down Expand Up @@ -59,6 +60,7 @@ public DaoSetup(BsqNodeProvider bsqNodeProvider,
CycleService cycleService,
BallotListService ballotListService,
ProposalService proposalService,
ProposalListPresentation proposalListPresentation,
BlindVoteListService blindVoteListService,
MyBlindVoteListService myBlindVoteListService,
VoteRevealService voteRevealService,
Expand Down Expand Up @@ -90,6 +92,7 @@ public DaoSetup(BsqNodeProvider bsqNodeProvider,
daoSetupServices.add(cycleService);
daoSetupServices.add(ballotListService);
daoSetupServices.add(proposalService);
daoSetupServices.add(proposalListPresentation);
daoSetupServices.add(blindVoteListService);
daoSetupServices.add(myBlindVoteListService);
daoSetupServices.add(voteRevealService);
Expand Down
Expand Up @@ -18,12 +18,20 @@
package bisq.core.dao.governance.proposal;

import bisq.core.btc.wallet.BsqWalletService;
import bisq.core.dao.DaoSetupService;
import bisq.core.dao.governance.proposal.storage.appendonly.ProposalPayload;
import bisq.core.dao.governance.proposal.storage.temp.TempProposalPayload;
import bisq.core.dao.state.DaoStateListener;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.governance.Proposal;

import bisq.network.p2p.storage.HashMapChangedListener;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;

import bisq.common.UserThread;

import org.bitcoinj.core.TransactionConfidence;

import com.google.inject.Inject;
Expand All @@ -47,7 +55,8 @@
* our own proposal that is not critical). Foreign proposals are only shown if confirmed and fully validated.
*/
@Slf4j
public class ProposalListPresentation implements DaoStateListener, MyProposalListService.Listener {
public class ProposalListPresentation implements DaoStateListener, HashMapChangedListener,
MyProposalListService.Listener, DaoSetupService {
private final ProposalService proposalService;
private final DaoStateService daoStateService;
private final MyProposalListService myProposalListService;
Expand All @@ -56,6 +65,8 @@ public class ProposalListPresentation implements DaoStateListener, MyProposalLis
private final ObservableList<Proposal> allProposals = FXCollections.observableArrayList();
@Getter
private final FilteredList<Proposal> activeOrMyUnconfirmedProposals = new FilteredList<>(allProposals);
private final ListChangeListener<Proposal> proposalListChangeListener;
private boolean tempProposalsChanged;


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -65,6 +76,7 @@ public class ProposalListPresentation implements DaoStateListener, MyProposalLis
@Inject
public ProposalListPresentation(ProposalService proposalService,
DaoStateService daoStateService,
P2PDataStorage p2PDataStorage,
MyProposalListService myProposalListService,
BsqWalletService bsqWalletService,
ProposalValidatorProvider validatorProvider) {
Expand All @@ -75,13 +87,30 @@ public ProposalListPresentation(ProposalService proposalService,
this.validatorProvider = validatorProvider;

daoStateService.addDaoStateListener(this);
p2PDataStorage.addHashMapChangedListener(this);
myProposalListService.addListener(this);

proposalService.getTempProposals().addListener((ListChangeListener<Proposal>) c -> {
updateLists();
});
proposalService.getProposalPayloads().addListener((ListChangeListener<ProposalPayload>) c -> {
updateLists();
proposalListChangeListener = c -> updateLists();
}


///////////////////////////////////////////////////////////////////////////////////////////
// DaoSetupService
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void addListeners() {
}

@Override
public void start() {
// We must set the listeners initially and not on onParseBlockChainComplete as activeOrMyUnconfirmedProposals
// is used in voteResults which can be called earlier during sync.
// To avoid unneeded upDateLists calls we delay one render frame so that once the proposalService is complete we
// register out listeners.
UserThread.execute(() -> {
proposalService.getTempProposals().addListener(proposalListChangeListener);
proposalService.getProposalPayloads().addListener((ListChangeListener<ProposalPayload>) c -> updateLists());
});
}

Expand All @@ -96,6 +125,43 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) {
}


///////////////////////////////////////////////////////////////////////////////////////////
// HashMapChangedListener
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void onAdded(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
}

@Override
public void onRemoved(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
}

@Override
public void onBatchRemoveExpiredDataStarted() {
// We temporary remove the listener when batch processing starts to avoid that we rebuild our lists at each
// remove call. After batch processing at onBatchRemoveExpiredDataCompleted we add again our listener and call
// the updateLists method.
proposalService.getTempProposals().removeListener(proposalListChangeListener);
}

@Override
public void onBatchRemoveExpiredDataCompleted() {
proposalService.getTempProposals().addListener(proposalListChangeListener);
// We only call updateLists if tempProposals have changed. updateLists() is an expensive call and takes 200 ms.
if (tempProposalsChanged) {
updateLists();
tempProposalsChanged = false;
}
}


///////////////////////////////////////////////////////////////////////////////////////////
// MyProposalListService.Listener
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Expand Up @@ -279,7 +279,7 @@ private void onProtectedDataRemoved(ProtectedStorageEntry entry) {
if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) {
if (tempProposals.contains(proposal)) {
tempProposals.remove(proposal);
log.info("We received a remove request for a TempProposalPayload and have removed the proposal " +
log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " +
"from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " +
"txInPastCycle={}, unconfirmedOrNonBsqTx={}",
proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx);
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/main/java/bisq/network/p2p/network/Connection.java
Expand Up @@ -355,7 +355,7 @@ public boolean noCapabilityRequiredOrCapabilityIsSupported(Proto msg) {
data = ((AddDataMessage) msg).getProtectedStorageEntry().getProtectedStoragePayload();
}
// Monitoring nodes have only one capability set, we don't want to log those
log.debug("We did not send the message because the peer does not support our required capabilities. " +
log.info("We did not send the message because the peer does not support our required capabilities. " +
"messageClass={}, peer={}, peers supportedCapabilities={}",
data.getClass().getSimpleName(), peersNodeAddressOptional, capabilities);
}
Expand Down
Expand Up @@ -24,4 +24,12 @@ public interface HashMapChangedListener {

@SuppressWarnings("UnusedParameters")
void onRemoved(ProtectedStorageEntry data);

// We process all expired entries after a delay (60 s) after onBootstrapComplete.
// We notify listeners of start and completion so they can optimize to only update after batch processing is done.
default void onBatchRemoveExpiredDataStarted() {
}

default void onBatchRemoveExpiredDataCompleted() {
}
}
Expand Up @@ -198,9 +198,12 @@ public void onBootstrapComplete() {
}
});

toRemoveSet.forEach(
protectedDataToRemove -> hashMapChangedListeners.forEach(
// Batch processing can cause performance issues, so we give listeners a chance to deal with it by notifying
// about start and end of iteration.
hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataStarted);
toRemoveSet.forEach(protectedDataToRemove -> hashMapChangedListeners.forEach(
listener -> listener.onRemoved(protectedDataToRemove)));
hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataCompleted);

if (sequenceNumberMap.size() > 1000)
sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap()));
Expand Down
Binary file removed p2p/src/main/resources/TempProposalStore_BTC_MAINNET
Binary file not shown.