Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle dao state conflicts better #2674

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 8 additions & 3 deletions common/src/main/java/bisq/common/storage/FileManager.java
Expand Up @@ -162,16 +162,21 @@ void shutDown() {
}
}

public synchronized void removeAndBackupFile(String fileName) throws IOException {
File corruptedBackupDir = new File(Paths.get(dir.getAbsolutePath(), "backup_of_corrupted_data").toString());
public static void removeAndBackupFile(File dbDir, File storageFile, String fileName, String backupFolderName)
throws IOException {
File corruptedBackupDir = new File(Paths.get(dbDir.getAbsolutePath(), backupFolderName).toString());
if (!corruptedBackupDir.exists())
if (!corruptedBackupDir.mkdir())
log.warn("make dir failed");

File corruptedFile = new File(Paths.get(dir.getAbsolutePath(), "backup_of_corrupted_data", fileName).toString());
File corruptedFile = new File(Paths.get(dbDir.getAbsolutePath(), backupFolderName, fileName).toString());
FileUtil.renameFile(storageFile, corruptedFile);
}

public synchronized void removeAndBackupFile(String fileName) throws IOException {
removeAndBackupFile(dir, storageFile, fileName, "backup_of_corrupted_data");
}

public synchronized void backupFile(String fileName, int numMaxBackupFiles) {
FileUtil.rollingBackup(dir, fileName, numMaxBackupFiles);
}
Expand Down
Expand Up @@ -36,6 +36,7 @@

import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.seed.SeedNodeRepository;

import bisq.common.UserThread;
import bisq.common.crypto.Hash;
Expand All @@ -49,6 +50,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -81,14 +83,17 @@ public interface Listener {
private final GenesisTxInfo genesisTxInfo;
private final PeriodService periodService;
private final BlindVoteListService blindVoteListService;
private final Set<String> seedNodeAddresses;

@Getter
private final LinkedList<BlindVoteStateBlock> blindVoteStateBlockChain = new LinkedList<>();
@Getter
private final LinkedList<BlindVoteStateHash> blindVoteStateHashChain = new LinkedList<>();
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
@Getter
private boolean isInConflict;
private boolean isInConflictWithNonSeedNode;
@Getter
private boolean isInConflictWithSeedNode;
private boolean parseBlockChainComplete;


Expand All @@ -101,12 +106,16 @@ public BlindVoteStateMonitoringService(DaoStateService daoStateService,
BlindVoteStateNetworkService blindVoteStateNetworkService,
GenesisTxInfo genesisTxInfo,
PeriodService periodService,
BlindVoteListService blindVoteListService) {
BlindVoteListService blindVoteListService,
SeedNodeRepository seedNodeRepository) {
this.daoStateService = daoStateService;
this.blindVoteStateNetworkService = blindVoteStateNetworkService;
this.genesisTxInfo = genesisTxInfo;
this.periodService = periodService;
this.blindVoteListService = blindVoteListService;
seedNodeAddresses = seedNodeRepository.getSeedNodeAddresses().stream()
.map(NodeAddress::getFullAddress)
.collect(Collectors.toSet());
}


Expand Down Expand Up @@ -276,7 +285,8 @@ private void maybeUpdateHashChain(int blockHeight) {

private boolean processPeersBlindVoteStateHash(BlindVoteStateHash blindVoteStateHash, Optional<NodeAddress> peersNodeAddress, boolean notifyListeners) {
AtomicBoolean changed = new AtomicBoolean(false);
AtomicBoolean isInConflict = new AtomicBoolean(this.isInConflict);
AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode);
AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode);
StringBuilder sb = new StringBuilder();
blindVoteStateBlockChain.stream()
.filter(e -> e.getHeight() == blindVoteStateHash.getHeight()).findAny()
Expand All @@ -286,7 +296,12 @@ private boolean processPeersBlindVoteStateHash(BlindVoteStateHash blindVoteState
daoStateBlock.putInPeersMap(peersNodeAddressAsString, blindVoteStateHash);
if (!daoStateBlock.getMyStateHash().hasEqualHash(blindVoteStateHash)) {
daoStateBlock.putInConflictMap(peersNodeAddressAsString, blindVoteStateHash);
isInConflict.set(true);
if (seedNodeAddresses.contains(peersNodeAddressAsString)) {
inConflictWithSeedNode.set(true);
} else {
inConflictWithNonSeedNode.set(true);
}

sb.append("We received a block hash from peer ")
.append(peersNodeAddressAsString)
.append(" which conflicts with our block hash.\n")
Expand All @@ -298,11 +313,15 @@ private boolean processPeersBlindVoteStateHash(BlindVoteStateHash blindVoteState
changed.set(true);
});

this.isInConflict = isInConflict.get();
this.isInConflictWithNonSeedNode = inConflictWithNonSeedNode.get();
this.isInConflictWithSeedNode = inConflictWithSeedNode.get();

String conflictMsg = sb.toString();
if (this.isInConflict && !conflictMsg.isEmpty()) {
log.warn(conflictMsg);
if (!conflictMsg.isEmpty()) {
if (this.isInConflictWithSeedNode)
log.warn("Conflict with seed nodes: {}", conflictMsg);
else if (this.isInConflictWithNonSeedNode)
log.info("Conflict with non-seed nodes: {}", conflictMsg);
}

if (notifyListeners && changed.get()) {
Expand Down
Expand Up @@ -33,6 +33,7 @@

import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.seed.SeedNodeRepository;

import bisq.common.UserThread;
import bisq.common.crypto.Hash;
Expand All @@ -48,6 +49,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -83,6 +85,8 @@ public interface Listener {
private final DaoStateService daoStateService;
private final DaoStateNetworkService daoStateNetworkService;
private final GenesisTxInfo genesisTxInfo;
private final Set<String> seedNodeAddresses;


@Getter
private final LinkedList<DaoStateBlock> daoStateBlockChain = new LinkedList<>();
Expand All @@ -91,7 +95,9 @@ public interface Listener {
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private boolean parseBlockChainComplete;
@Getter
private boolean isInConflict;
private boolean isInConflictWithNonSeedNode;
@Getter
private boolean isInConflictWithSeedNode;
@Getter
private ObservableList<UtxoMismatch> utxoMismatches = FXCollections.observableArrayList();

Expand All @@ -103,10 +109,14 @@ public interface Listener {
@Inject
public DaoStateMonitoringService(DaoStateService daoStateService,
DaoStateNetworkService daoStateNetworkService,
GenesisTxInfo genesisTxInfo) {
GenesisTxInfo genesisTxInfo,
SeedNodeRepository seedNodeRepository) {
this.daoStateService = daoStateService;
this.daoStateNetworkService = daoStateNetworkService;
this.genesisTxInfo = genesisTxInfo;
seedNodeAddresses = seedNodeRepository.getSeedNodeAddresses().stream()
.map(NodeAddress::getFullAddress)
.collect(Collectors.toSet());
}


Expand Down Expand Up @@ -283,7 +293,8 @@ private void updateHashChain(Block block) {

private boolean processPeersDaoStateHash(DaoStateHash daoStateHash, Optional<NodeAddress> peersNodeAddress, boolean notifyListeners) {
AtomicBoolean changed = new AtomicBoolean(false);
AtomicBoolean isInConflict = new AtomicBoolean(this.isInConflict);
AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode);
AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode);
StringBuilder sb = new StringBuilder();
daoStateBlockChain.stream()
.filter(e -> e.getHeight() == daoStateHash.getHeight()).findAny()
Expand All @@ -293,7 +304,11 @@ private boolean processPeersDaoStateHash(DaoStateHash daoStateHash, Optional<Nod
daoStateBlock.putInPeersMap(peersNodeAddressAsString, daoStateHash);
if (!daoStateBlock.getMyStateHash().hasEqualHash(daoStateHash)) {
daoStateBlock.putInConflictMap(peersNodeAddressAsString, daoStateHash);
isInConflict.set(true);
if (seedNodeAddresses.contains(peersNodeAddressAsString)) {
inConflictWithSeedNode.set(true);
} else {
inConflictWithNonSeedNode.set(true);
}
sb.append("We received a block hash from peer ")
.append(peersNodeAddressAsString)
.append(" which conflicts with our block hash.\n")
Expand All @@ -305,13 +320,18 @@ private boolean processPeersDaoStateHash(DaoStateHash daoStateHash, Optional<Nod
changed.set(true);
});

this.isInConflict = isInConflict.get();
this.isInConflictWithNonSeedNode = inConflictWithNonSeedNode.get();
this.isInConflictWithSeedNode = inConflictWithSeedNode.get();

String conflictMsg = sb.toString();
if (this.isInConflict && !conflictMsg.isEmpty()) {
log.warn(conflictMsg);
if (!conflictMsg.isEmpty()) {
if (this.isInConflictWithSeedNode)
log.warn("Conflict with seed nodes: {}", conflictMsg);
else if (this.isInConflictWithNonSeedNode)
log.info("Conflict with non-seed nodes: {}", conflictMsg);
}


if (notifyListeners && changed.get()) {
listeners.forEach(Listener::onChangeAfterBatchProcessing);
}
Expand Down
Expand Up @@ -36,6 +36,7 @@

import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.seed.SeedNodeRepository;

import bisq.common.UserThread;
import bisq.common.crypto.Hash;
Expand All @@ -49,6 +50,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -81,14 +83,18 @@ public interface Listener {
private final GenesisTxInfo genesisTxInfo;
private final PeriodService periodService;
private final ProposalService proposalService;
private final Set<String> seedNodeAddresses;


@Getter
private final LinkedList<ProposalStateBlock> proposalStateBlockChain = new LinkedList<>();
@Getter
private final LinkedList<ProposalStateHash> proposalStateHashChain = new LinkedList<>();
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
@Getter
private boolean isInConflict;
private boolean isInConflictWithNonSeedNode;
@Getter
private boolean isInConflictWithSeedNode;
private boolean parseBlockChainComplete;


Expand All @@ -101,12 +107,16 @@ public ProposalStateMonitoringService(DaoStateService daoStateService,
ProposalStateNetworkService proposalStateNetworkService,
GenesisTxInfo genesisTxInfo,
PeriodService periodService,
ProposalService proposalService) {
ProposalService proposalService,
SeedNodeRepository seedNodeRepository) {
this.daoStateService = daoStateService;
this.proposalStateNetworkService = proposalStateNetworkService;
this.genesisTxInfo = genesisTxInfo;
this.periodService = periodService;
this.proposalService = proposalService;
seedNodeAddresses = seedNodeRepository.getSeedNodeAddresses().stream()
.map(NodeAddress::getFullAddress)
.collect(Collectors.toSet());
}


Expand Down Expand Up @@ -280,7 +290,8 @@ private boolean maybeUpdateHashChain(int blockHeight) {

private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash, Optional<NodeAddress> peersNodeAddress, boolean notifyListeners) {
AtomicBoolean changed = new AtomicBoolean(false);
AtomicBoolean isInConflict = new AtomicBoolean(this.isInConflict);
AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode);
AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode);
StringBuilder sb = new StringBuilder();
proposalStateBlockChain.stream()
.filter(e -> e.getHeight() == proposalStateHash.getHeight()).findAny()
Expand All @@ -290,7 +301,11 @@ private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHas
daoStateBlock.putInPeersMap(peersNodeAddressAsString, proposalStateHash);
if (!daoStateBlock.getMyStateHash().hasEqualHash(proposalStateHash)) {
daoStateBlock.putInConflictMap(peersNodeAddressAsString, proposalStateHash);
isInConflict.set(true);
if (seedNodeAddresses.contains(peersNodeAddressAsString)) {
inConflictWithSeedNode.set(true);
} else {
inConflictWithNonSeedNode.set(true);
}
sb.append("We received a block hash from peer ")
.append(peersNodeAddressAsString)
.append(" which conflicts with our block hash.\n")
Expand All @@ -302,11 +317,15 @@ private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHas
changed.set(true);
});

this.isInConflict = isInConflict.get();
this.isInConflictWithNonSeedNode = inConflictWithNonSeedNode.get();
this.isInConflictWithSeedNode = inConflictWithSeedNode.get();

String conflictMsg = sb.toString();
if (this.isInConflict && !conflictMsg.isEmpty()) {
log.warn(conflictMsg);
if (!conflictMsg.isEmpty()) {
if (this.isInConflictWithSeedNode)
log.warn("Conflict with seed nodes: {}", conflictMsg);
else if (this.isInConflictWithNonSeedNode)
log.info("Conflict with non-seed nodes: {}", conflictMsg);
}

if (notifyListeners && changed.get()) {
Expand Down
18 changes: 8 additions & 10 deletions core/src/main/resources/i18n/displayStrings.properties
Expand Up @@ -910,7 +910,8 @@ settings.preferences.selectCurrencyNetwork=Select network
setting.preferences.daoOptions=DAO options
setting.preferences.dao.resync.label=Rebuild DAO state from genesis tx
setting.preferences.dao.resync.button=Resync
setting.preferences.dao.resync.popup=After an application restart the BSQ consensus state will be rebuilt from the genesis transaction.
setting.preferences.dao.resync.popup=After an application restart the P2P network governance data will be reloaded from \
the seed nodes and the BSQ consensus state will be rebuilt from the genesis transaction.
setting.preferences.dao.isDaoFullNode=Run Bisq as DAO full node
setting.preferences.dao.rpcUser=RPC username
setting.preferences.dao.rpcPw=RPC password
Expand Down Expand Up @@ -1904,11 +1905,9 @@ dao.monitor.requestAlHashes=Request all hashes
dao.monitor.resync=Resync DAO state
dao.monitor.table.header.cycleBlockHeight=Cycle / block height
dao.monitor.table.cycleBlockHeight=Cycle {0} / block {1}
dao.monitor.table.seedPeers=Seed node: {0}

dao.monitor.daoState.headline=DAO state
dao.monitor.daoState.daoStateInSync=Your local DAO state is in consensus with the network
dao.monitor.daoState.daoStateNotInSync=Your local DAO state is not in consensus with the network. Please resync your \
DAO state.
dao.monitor.daoState.table.headline=Chain of DAO state hashes
dao.monitor.daoState.table.blockHeight=Block height
dao.monitor.daoState.table.hash=Hash of DAO state
Expand All @@ -1920,21 +1919,20 @@ dao.monitor.daoState.utxoConflicts.sumUtxo=Sum of all UTXO: {0} BSQ
dao.monitor.daoState.utxoConflicts.sumBsq=Sum of all BSQ: {0} BSQ

dao.monitor.proposal.headline=Proposals state
dao.monitor.proposal.daoStateInSync=Your local proposals state is in consensus with the network
dao.monitor.proposal.daoStateNotInSync=Your local proposals state is not in consensus with the network. Please restart your \
application.
dao.monitor.proposal.table.headline=Chain of proposal state hashes
dao.monitor.proposal.conflictTable.headline=Proposal state hashes from peers in conflict

dao.monitor.proposal.table.hash=Hash of proposal state
dao.monitor.proposal.table.prev=Previous hash
dao.monitor.proposal.table.numProposals=No. proposals

dao.monitor.isInConflictWithSeedNode=Your local data is not in consensus with at least one seed node. \
Please resync the DAO state.
dao.monitor.isInConflictWithNonSeedNode=One of your peers is not in consensus with the network but your node \
is in sync with the seed nodes.
dao.monitor.daoStateInSync=Your local node is in consensus with the network

dao.monitor.blindVote.headline=Blind votes state
dao.monitor.blindVote.daoStateInSync=Your local blind votes state is in consensus with the network
dao.monitor.blindVote.daoStateNotInSync=Your local blind votes state is not in consensus with the network. Please restart your \
application.
dao.monitor.blindVote.table.headline=Chain of blind vote state hashes
dao.monitor.blindVote.conflictTable.headline=Blind vote state hashes from peers in conflict
dao.monitor.blindVote.table.hash=Hash of blind vote state
Expand Down
Expand Up @@ -20,8 +20,12 @@
import bisq.core.dao.monitoring.model.StateHash;
import bisq.core.locale.Res;

import bisq.network.p2p.NodeAddress;

import bisq.common.util.Utilities;

import java.util.Set;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -36,10 +40,15 @@ public abstract class StateInConflictListItem<T extends StateHash> {
private final String prevHash;
private final T stateHash;

protected StateInConflictListItem(String peerAddress, T stateHash, int cycleIndex) {
protected StateInConflictListItem(String peerAddress, T stateHash, int cycleIndex,
Set<NodeAddress> seedNodeAddresses) {
this.stateHash = stateHash;
this.peerAddress = peerAddress;
height = Res.get("dao.monitor.table.cycleBlockHeight", cycleIndex + 1, String.valueOf(stateHash.getHeight()));
this.peerAddress = seedNodeAddresses.stream().anyMatch(e -> e.getFullAddress().equals(peerAddress)) ?
Res.get("dao.monitor.table.seedPeers", peerAddress) :
peerAddress;
height = Res.get("dao.monitor.table.cycleBlockHeight",
cycleIndex + 1,
String.valueOf(stateHash.getHeight()));
hash = Utilities.bytesAsHexString(stateHash.getHash());
prevHash = stateHash.getPrevHash().length > 0 ?
Utilities.bytesAsHexString(stateHash.getPrevHash()) : "-";
Expand Down