diff --git a/core/src/main/java/bisq/core/dao/node/BsqNode.java b/core/src/main/java/bisq/core/dao/node/BsqNode.java index 321e3e2eb9e..ec67a5a0deb 100644 --- a/core/src/main/java/bisq/core/dao/node/BsqNode.java +++ b/core/src/main/java/bisq/core/dao/node/BsqNode.java @@ -20,8 +20,12 @@ import bisq.core.dao.DaoSetupService; import bisq.core.dao.node.full.RawBlock; import bisq.core.dao.node.parser.BlockParser; +import bisq.core.dao.node.parser.exceptions.BlockHashNotConnectingException; +import bisq.core.dao.node.parser.exceptions.BlockHeightNotConnectingException; +import bisq.core.dao.node.parser.exceptions.RequiredReorgFromSnapshotException; import bisq.core.dao.state.DaoStateService; import bisq.core.dao.state.DaoStateSnapshotService; +import bisq.core.dao.state.model.blockchain.Block; import bisq.network.p2p.P2PService; import bisq.network.p2p.P2PServiceListener; @@ -30,6 +34,11 @@ import com.google.inject.Inject; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; @@ -51,6 +60,7 @@ public abstract class BsqNode implements DaoSetupService { protected boolean p2pNetworkReady; @Nullable protected ErrorMessageHandler errorMessageHandler; + protected List pendingBlocks = new ArrayList<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -179,19 +189,87 @@ protected void onParseBlockChainComplete() { log.info("onParseBlockChainComplete"); parseBlockchainComplete = true; daoStateService.onParseBlockChainComplete(); - - // log.error("COMPLETED: sb1={}\nsb2={}", BlockParser.sb1.toString(), BlockParser.sb2.toString()); - // log.error("equals? " + BlockParser.sb1.toString().equals(BlockParser.sb2.toString())); - // Utilities.copyToClipboard(BlockParser.sb1.toString() + "\n\n\n" + BlockParser.sb2.toString()); } @SuppressWarnings("WeakerAccess") protected void startReOrgFromLastSnapshot() { daoStateSnapshotService.applySnapshot(true); - startParseBlocks(); } - protected boolean isBlockAlreadyAdded(RawBlock rawBlock) { - return daoStateService.getBlockAtHeight(rawBlock.getHeight()).isPresent(); + + protected Optional doParseBlock(RawBlock rawBlock) throws RequiredReorgFromSnapshotException { + // We check if we have a block with that height. If so we return. We do not use the chainHeight as with genesis + // height we have no block but chainHeight is initially set to genesis height (bad design ;-( but a bit tricky + // to change now as it used in many areas.) + if (daoStateService.getBlockAtHeight(rawBlock.getHeight()).isPresent()) { + log.info("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight()); + return Optional.empty(); + } + + try { + Block block = blockParser.parseBlock(rawBlock); + + if (pendingBlocks.contains(rawBlock)) + pendingBlocks.remove(rawBlock); + + // After parsing we check if we have pending blocks we might have received earlier but which have been + // not connecting from the latest height we had. The list is sorted by height + if (!pendingBlocks.isEmpty()) { + // To avoid ConcurrentModificationException we copy the list. It might be altered in the method call + ArrayList tempPendingBlocks = new ArrayList<>(pendingBlocks); + for (RawBlock tempPendingBlock : tempPendingBlocks) { + try { + doParseBlock(tempPendingBlock); + } catch (RequiredReorgFromSnapshotException e1) { + // In case we got a reorg we break the iteration + break; + } + } + } + + return Optional.of(block); + } catch (BlockHeightNotConnectingException e) { + // There is no guaranteed order how we receive blocks. We could have received block 102 before 101. + // If block is in future we move the block to teh pendingBlocks list. At next block we look up the + // list if there is any potential candidate with the correct height and if so we remove that from that list. + + int heightForNextBlock = daoStateService.getChainHeight() + 1; + if (rawBlock.getHeight() > heightForNextBlock) { + pendingBlocks.add(rawBlock); + pendingBlocks.sort(Comparator.comparing(RawBlock::getHeight)); + log.info("We received an block with a future block height. We store it as pending and try to apply " + + "it at the next block. rawBlock: height/hash={}/{}", rawBlock.getHeight(), rawBlock.getHash()); + } else if (rawBlock.getHeight() >= daoStateService.getGenesisBlockHeight()) { + // We received an older block. We compare if we have it in our chain. + Optional optionalBlock = daoStateService.getBlockAtHeight(rawBlock.getHeight()); + if (optionalBlock.isPresent()) { + if (optionalBlock.get().getHash().equals(rawBlock.getPreviousBlockHash())) { + log.info("We received an old block we have already parsed and added. We ignore it."); + } else { + log.info("We received an old block with a different hash. We ignore it. Hash={}", rawBlock.getHash()); + } + } else { + log.info("In case we have reset from genesis height we would not find the block"); + } + } else { + log.info("We ignore it as it was before genesis height"); + } + } catch (BlockHashNotConnectingException throwable) { + Optional lastBlock = daoStateService.getLastBlock(); + log.warn("Block not connecting:\n" + + "New block height/hash/previousBlockHash={}/{}/{}, latest block height/hash={}/{}", + rawBlock.getHeight(), + rawBlock.getHash(), + rawBlock.getPreviousBlockHash(), + lastBlock.isPresent() ? lastBlock.get().getHeight() : "lastBlock not present", + lastBlock.isPresent() ? lastBlock.get().getHash() : "lastBlock not present"); + + pendingBlocks.clear(); + startReOrgFromLastSnapshot(); + throw new RequiredReorgFromSnapshotException(rawBlock); + } + + + return Optional.empty(); } } diff --git a/core/src/main/java/bisq/core/dao/node/full/FullNode.java b/core/src/main/java/bisq/core/dao/node/full/FullNode.java index 18c3ba0b0ca..c0585250854 100644 --- a/core/src/main/java/bisq/core/dao/node/full/FullNode.java +++ b/core/src/main/java/bisq/core/dao/node/full/FullNode.java @@ -21,7 +21,7 @@ import bisq.core.dao.node.explorer.ExportJsonFilesService; import bisq.core.dao.node.full.network.FullNodeNetworkService; import bisq.core.dao.node.parser.BlockParser; -import bisq.core.dao.node.parser.exceptions.BlockNotConnectingException; +import bisq.core.dao.node.parser.exceptions.RequiredReorgFromSnapshotException; import bisq.core.dao.state.DaoStateService; import bisq.core.dao.state.DaoStateSnapshotService; import bisq.core.dao.state.model.blockchain.Block; @@ -106,6 +106,15 @@ protected void startParseBlocks() { requestChainHeadHeightAndParseBlocks(getStartBlockHeight()); } + @Override + protected void startReOrgFromLastSnapshot() { + super.startReOrgFromLastSnapshot(); + + int startBlockHeight = getStartBlockHeight(); + rpcService.requestChainHeadHeight(chainHeight -> parseBlocksOnHeadHeight(startBlockHeight, chainHeight), + this::handleError); + } + @Override protected void onP2PNetworkReady() { super.onP2PNetworkReady(); @@ -137,13 +146,9 @@ private void addBlockHandler() { if (!addBlockHandlerAdded) { addBlockHandlerAdded = true; rpcService.addNewBtcBlockHandler(rawBlock -> { - if (!isBlockAlreadyAdded(rawBlock)) { - try { - Block block = blockParser.parseBlock(rawBlock); - onNewBlock(block); - } catch (BlockNotConnectingException throwable) { - handleError(throwable); - } + try { + doParseBlock(rawBlock).ifPresent(this::onNewBlock); + } catch (RequiredReorgFromSnapshotException ignore) { } }, this::handleError); @@ -190,13 +195,7 @@ private void parseBlocksOnHeadHeight(int startBlockHeight, int chainHeight) { // if we are at chainTip, so do not include here another check as it would // not trigger the listener registration. parseBlocksIfNewBlockAvailable(chainHeight); - }, throwable -> { - if (throwable instanceof BlockNotConnectingException) { - startReOrgFromLastSnapshot(); - } else { - handleError(throwable); - } - }); + }, this::handleError); } else { log.warn("We are trying to start with a block which is above the chain height of bitcoin core. " + "We need probably wait longer until bitcoin core has fully synced. " + @@ -210,33 +209,29 @@ private void parseBlocks(int startBlockHeight, Consumer newBlockHandler, ResultHandler resultHandler, Consumer errorHandler) { - parseBlock(startBlockHeight, chainHeight, newBlockHandler, resultHandler, errorHandler); + parseBlockRecursively(startBlockHeight, chainHeight, newBlockHandler, resultHandler, errorHandler); } - // Recursively request and parse all blocks - private void parseBlock(int blockHeight, int chainHeight, - Consumer newBlockHandler, ResultHandler resultHandler, - Consumer errorHandler) { + private void parseBlockRecursively(int blockHeight, + int chainHeight, + Consumer newBlockHandler, + ResultHandler resultHandler, + Consumer errorHandler) { rpcService.requestBtcBlock(blockHeight, rawBlock -> { - if (!isBlockAlreadyAdded(rawBlock)) { - try { - Block block = blockParser.parseBlock(rawBlock); - newBlockHandler.accept(block); - - // Increment blockHeight and recursively call parseBlockAsync until we reach chainHeight - if (blockHeight < chainHeight) { - final int newBlockHeight = blockHeight + 1; - parseBlock(newBlockHeight, chainHeight, newBlockHandler, resultHandler, errorHandler); - } else { - // We are done - resultHandler.handleResult(); - } - } catch (BlockNotConnectingException e) { - errorHandler.accept(e); + try { + doParseBlock(rawBlock).ifPresent(newBlockHandler); + + // Increment blockHeight and recursively call parseBlockAsync until we reach chainHeight + if (blockHeight < chainHeight) { + int newBlockHeight = blockHeight + 1; + parseBlockRecursively(newBlockHeight, chainHeight, newBlockHandler, resultHandler, errorHandler); + } else { + // We are done + resultHandler.handleResult(); } - } else { - log.info("Block was already added height=", rawBlock.getHeight()); + } catch (RequiredReorgFromSnapshotException ignore) { + // If we get a reorg we don't continue to call parseBlockRecursively } }, errorHandler); @@ -245,16 +240,13 @@ private void parseBlock(int blockHeight, int chainHeight, private void handleError(Throwable throwable) { String errorMessage = "An error occurred: Error=" + throwable.toString(); log.error(errorMessage); - - if (throwable instanceof BlockNotConnectingException) { - startReOrgFromLastSnapshot(); - } else if (throwable instanceof RpcException && + if (throwable instanceof RpcException && throwable.getCause() != null && throwable.getCause() instanceof HttpLayerException && ((HttpLayerException) throwable.getCause()).getCode() == 1004004) { errorMessage = "You have configured Bisq to run as DAO full node but there is not " + "localhost Bitcoin Core node detected. You need to have Bitcoin Core started and synced before " + - "starting Bisq."; + "starting Bisq. Please restart Bisq with proper DAO full node setup or switch to lite node mode."; } if (errorMessageHandler != null) diff --git a/core/src/main/java/bisq/core/dao/node/full/RpcService.java b/core/src/main/java/bisq/core/dao/node/full/RpcService.java index fe7b3de5284..3dd890b64c8 100644 --- a/core/src/main/java/bisq/core/dao/node/full/RpcService.java +++ b/core/src/main/java/bisq/core/dao/node/full/RpcService.java @@ -284,10 +284,12 @@ private RawTx getTxFromRawTransaction(RawTransaction rawBtcTx, com.neemre.btcdcl try { opReturnData = Utils.HEX.decode(chunks[1]); } catch (Throwable t) { - // We get sometimes exceptions, seems BitcoinJ - // cannot handle all existing OP_RETURN data, but we ignore them - // anyway as our OP_RETURN data is valid in BitcoinJ - log.warn("Error at Utils.HEX.decode(chunks[1]): " + t.toString() + " / chunks[1]=" + chunks[1]); + log.warn("Error at Utils.HEX.decode(chunks[1]): " + t.toString() + + " / chunks[1]=" + chunks[1] + + "\nWe get sometimes exceptions with opReturn data, seems BitcoinJ " + + "cannot handle all " + + "existing OP_RETURN data, but we ignore them anyway as the OP_RETURN " + + "data used for DAO transactions are all valid in BitcoinJ"); } } } diff --git a/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java b/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java index e0c2a7482c4..c0060bf9736 100644 --- a/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java +++ b/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java @@ -93,9 +93,8 @@ public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, final Connecti Log.traceCall(getBlocksRequest + "\n\tconnection=" + connection); List blocks = new LinkedList<>(daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight())); List rawBlocks = blocks.stream().map(RawBlock::fromBlock).collect(Collectors.toList()); - final GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce()); - log.debug("getBlocksResponse " + getBlocksResponse.getRequestNonce()); - log.info("Received getBlocksResponse from {} for blocks from height {}", + GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce()); + log.info("Received GetBlocksRequest from {} for blocks from height {}", connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight()); if (timeoutTimer == null) { timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions @@ -108,7 +107,7 @@ public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, final Connecti } SettableFuture future = networkNode.sendMessage(connection, getBlocksResponse); - Futures.addCallback(future, new FutureCallback() { + Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(Connection connection) { if (!stopped) { diff --git a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java index 89dd0ba3c31..ef3f18aadf2 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java +++ b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java @@ -23,7 +23,7 @@ import bisq.core.dao.node.messages.GetBlocksResponse; import bisq.core.dao.node.messages.NewBlockBroadcastMessage; import bisq.core.dao.node.parser.BlockParser; -import bisq.core.dao.node.parser.exceptions.BlockNotConnectingException; +import bisq.core.dao.node.parser.exceptions.RequiredReorgFromSnapshotException; import bisq.core.dao.state.DaoStateService; import bisq.core.dao.state.DaoStateSnapshotService; @@ -121,6 +121,15 @@ protected void startParseBlocks() { liteNodeNetworkService.requestBlocks(getStartBlockHeight()); } + @Override + protected void startReOrgFromLastSnapshot() { + super.startReOrgFromLastSnapshot(); + + int startBlockHeight = getStartBlockHeight(); + liteNodeNetworkService.reset(); + liteNodeNetworkService.requestBlocks(startBlockHeight); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Private @@ -137,30 +146,24 @@ private void onRequestedBlocksReceived(List blockList) { // 144 blocks a day would result in about 4000 in a month, so if a user downloads the app after 1 months latest // release it will be a bit of a performance hit. It is a one time event as the snapshots gets created and be // used at next startup. - long startTs = System.currentTimeMillis(); - blockList.forEach(this::parseBlock); - log.info("Parsing of {} blocks took {} sec.", blockList.size(), (System.currentTimeMillis() - startTs) / 1000D); + for (RawBlock block : blockList) { + try { + doParseBlock(block); + } catch (RequiredReorgFromSnapshotException e1) { + // In case we got a reorg we break the iteration + break; + } + } + onParseBlockChainComplete(); } // We received a new block private void onNewBlockReceived(RawBlock block) { - log.info("onNewBlockReceived: block at height {}", block.getHeight()); - parseBlock(block); - } - - private void parseBlock(RawBlock rawBlock) { - if (!isBlockAlreadyAdded(rawBlock)) { - try { - blockParser.parseBlock(rawBlock); - } catch (BlockNotConnectingException throwable) { - startReOrgFromLastSnapshot(); - } catch (Throwable throwable) { - log.error(throwable.toString()); - throwable.printStackTrace(); - if (errorMessageHandler != null) - errorMessageHandler.handleErrorMessage(throwable.toString()); - } + log.info("onNewBlockReceived: block at height {}, hash={}", block.getHeight(), block.getHash()); + try { + doParseBlock(block); + } catch (RequiredReorgFromSnapshotException ignore) { } } } diff --git a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java index d9a20d66342..0bd7df0ce5b 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java +++ b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java @@ -151,6 +151,13 @@ public void requestBlocks(int startBlockHeight) { } } + public void reset() { + lastRequestedBlockHeight = 0; + lastReceivedBlockHeight = 0; + retryCounter = 0; + requestBlocksHandlerMap.values().forEach(RequestBlocksHandler::cancel); + } + /////////////////////////////////////////////////////////////////////////////////////////// // ConnectionListener implementation @@ -273,8 +280,6 @@ public void onFault(String errorMessage, @Nullable Connection connection) { log.info("requestBlocks with startBlockHeight={} from peer {}", startBlockHeight, peersNodeAddress); requestBlocksHandler.requestBlocks(); } else { - //TODO check with re-orgs - // FIXME when a lot of blocks are created we get caught here. Seems to be a threading issue... log.warn("startBlockHeight must not be smaller than lastReceivedBlockHeight. That should never happen." + "startBlockHeight={},lastReceivedBlockHeight={}", startBlockHeight, lastReceivedBlockHeight); DevEnv.logErrorAndThrowIfDevMode("startBlockHeight must be larger than lastReceivedBlockHeight. startBlockHeight=" + diff --git a/core/src/main/java/bisq/core/dao/node/parser/BlockParser.java b/core/src/main/java/bisq/core/dao/node/parser/BlockParser.java index fe105648419..5d71fd9164b 100644 --- a/core/src/main/java/bisq/core/dao/node/parser/BlockParser.java +++ b/core/src/main/java/bisq/core/dao/node/parser/BlockParser.java @@ -18,7 +18,8 @@ package bisq.core.dao.node.parser; import bisq.core.dao.node.full.RawBlock; -import bisq.core.dao.node.parser.exceptions.BlockNotConnectingException; +import bisq.core.dao.node.parser.exceptions.BlockHashNotConnectingException; +import bisq.core.dao.node.parser.exceptions.BlockHeightNotConnectingException; import bisq.core.dao.state.DaoStateService; import bisq.core.dao.state.model.blockchain.Block; import bisq.core.dao.state.model.blockchain.Tx; @@ -74,9 +75,10 @@ public BlockParser(TxParser txParser, * * @param rawBlock Contains all transactions of a bitcoin block without any BSQ specific data * @return Block: Gets created from the rawBlock but contains only BSQ specific transactions. - * @throws BlockNotConnectingException If new block does not connect to previous block + * @throws BlockHashNotConnectingException If new block does not connect to previous block + * @throws BlockHeightNotConnectingException If new block height is not current cahin Height + 1 */ - public Block parseBlock(RawBlock rawBlock) throws BlockNotConnectingException { + public Block parseBlock(RawBlock rawBlock) throws BlockHashNotConnectingException, BlockHeightNotConnectingException { int blockHeight = rawBlock.getHeight(); log.debug("Parse block at height={} ", blockHeight); @@ -108,41 +110,32 @@ public Block parseBlock(RawBlock rawBlock) throws BlockNotConnectingException { List txList = block.getTxs(); rawBlock.getRawTxs().forEach(rawTx -> - txParser.findTx(rawTx, - genesisTxId, - genesisBlockHeight, - genesisTotalSupply) - .ifPresent(txList::add)); - log.debug("parseBsqTxs took {} ms", rawBlock.getRawTxs().size(), System.currentTimeMillis() - startTs); + txParser.findTx(rawTx, + genesisTxId, + genesisBlockHeight, + genesisTotalSupply) + .ifPresent(txList::add)); + log.info("parseBsqTxs took {} ms", rawBlock.getRawTxs().size(), System.currentTimeMillis() - startTs); daoStateService.onParseBlockComplete(block); return block; } - private void validateIfBlockIsConnecting(RawBlock rawBlock) throws BlockNotConnectingException { + private void validateIfBlockIsConnecting(RawBlock rawBlock) throws BlockHashNotConnectingException, BlockHeightNotConnectingException { LinkedList blocks = daoStateService.getBlocks(); - if (!isBlockConnecting(rawBlock, blocks) && !blocks.isEmpty()) { - Block last = blocks.getLast(); - log.warn("addBlock called with a not connecting block. New block:\n" + - "height()={}, hash()={}, lastBlock.height()={}, lastBlock.hash()={}", - rawBlock.getHeight(), - rawBlock.getHash(), - last != null ? last.getHeight() : "null", - last != null ? last.getHash() : "null"); - throw new BlockNotConnectingException(rawBlock); - } + + if (blocks.isEmpty()) + return; + + Block last = blocks.getLast(); + if (last.getHeight() + 1 != rawBlock.getHeight()) + throw new BlockHeightNotConnectingException(rawBlock); + + if (!last.getHash().equals(rawBlock.getPreviousBlockHash())) + throw new BlockHashNotConnectingException(rawBlock); } private boolean isBlockAlreadyAdded(RawBlock rawBlock) { return daoStateService.isBlockHashKnown(rawBlock.getHash()); } - - private boolean isBlockConnecting(RawBlock rawBlock, LinkedList blocks) { - // Case 1: blocks is empty - // Case 2: blocks not empty. Last block must match new blocks getPreviousBlockHash and - // height of last block +1 must be new blocks height - return blocks.isEmpty() || - (blocks.getLast().getHash().equals(rawBlock.getPreviousBlockHash()) && - blocks.getLast().getHeight() + 1 == rawBlock.getHeight()); - } } diff --git a/core/src/main/java/bisq/core/dao/node/parser/exceptions/BlockNotConnectingException.java b/core/src/main/java/bisq/core/dao/node/parser/exceptions/BlockHashNotConnectingException.java similarity index 87% rename from core/src/main/java/bisq/core/dao/node/parser/exceptions/BlockNotConnectingException.java rename to core/src/main/java/bisq/core/dao/node/parser/exceptions/BlockHashNotConnectingException.java index 357257994e4..97a0999a1ae 100644 --- a/core/src/main/java/bisq/core/dao/node/parser/exceptions/BlockNotConnectingException.java +++ b/core/src/main/java/bisq/core/dao/node/parser/exceptions/BlockHashNotConnectingException.java @@ -22,11 +22,11 @@ import lombok.Getter; @Getter -public class BlockNotConnectingException extends Exception { +public class BlockHashNotConnectingException extends Exception { private RawBlock rawBlock; - public BlockNotConnectingException(RawBlock rawBlock) { + public BlockHashNotConnectingException(RawBlock rawBlock) { this.rawBlock = rawBlock; } } diff --git a/core/src/main/java/bisq/core/dao/node/parser/exceptions/BlockHeightNotConnectingException.java b/core/src/main/java/bisq/core/dao/node/parser/exceptions/BlockHeightNotConnectingException.java new file mode 100644 index 00000000000..f7b0164e4d2 --- /dev/null +++ b/core/src/main/java/bisq/core/dao/node/parser/exceptions/BlockHeightNotConnectingException.java @@ -0,0 +1,32 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.core.dao.node.parser.exceptions; + +import bisq.core.dao.node.full.RawBlock; + +import lombok.Getter; + +@Getter +public class BlockHeightNotConnectingException extends Exception { + + private RawBlock rawBlock; + + public BlockHeightNotConnectingException(RawBlock rawBlock) { + this.rawBlock = rawBlock; + } +} diff --git a/core/src/main/java/bisq/core/dao/node/parser/exceptions/RequiredReorgFromSnapshotException.java b/core/src/main/java/bisq/core/dao/node/parser/exceptions/RequiredReorgFromSnapshotException.java new file mode 100644 index 00000000000..59a36c66939 --- /dev/null +++ b/core/src/main/java/bisq/core/dao/node/parser/exceptions/RequiredReorgFromSnapshotException.java @@ -0,0 +1,32 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.core.dao.node.parser.exceptions; + +import bisq.core.dao.node.full.RawBlock; + +import lombok.Getter; + +@Getter +public class RequiredReorgFromSnapshotException extends Exception { + + private RawBlock rawBlock; + + public RequiredReorgFromSnapshotException(RawBlock rawBlock) { + this.rawBlock = rawBlock; + } +} diff --git a/core/src/main/java/bisq/core/dao/state/DaoStateService.java b/core/src/main/java/bisq/core/dao/state/DaoStateService.java index 577e0cf5b4c..dcadd3f7824 100644 --- a/core/src/main/java/bisq/core/dao/state/DaoStateService.java +++ b/core/src/main/java/bisq/core/dao/state/DaoStateService.java @@ -193,10 +193,16 @@ public void onNewBlockHeight(int blockHeight) { // Second we get the block added with empty txs public void onNewBlockWithEmptyTxs(Block block) { - daoState.getBlocks().add(block); - daoStateListeners.forEach(l -> l.onEmptyBlockAdded(block)); + if (daoState.getBlocks().isEmpty() && block.getHeight() != getGenesisBlockHeight()) { + log.warn("We don't have any blocks yet and we received a block which is not the genesis block. " + + "We ignore that block as the first block need to be the genesis block. " + + "That might happen in edge cases at reorgs."); + } else { + daoState.getBlocks().add(block); + daoStateListeners.forEach(l -> l.onEmptyBlockAdded(block)); - log.info("New Block added at blockHeight " + block.getHeight()); + log.info("New Block added at blockHeight " + block.getHeight()); + } } // Third we get the onParseBlockComplete called after all rawTxs of blocks have been parsed @@ -224,7 +230,7 @@ public LinkedList getBlocks() { public boolean isBlockHashKnown(String blockHash) { // TODO(chirhonul): If performance of O(n) time in number of blocks becomes an issue, // we should keep a HashMap of block hash -> Block to make this method O(1). - return getBlocks().stream().anyMatch(block -> block.getHash() == blockHash); + return getBlocks().stream().anyMatch(block -> block.getHash().equals(blockHash)); } public Optional getLastBlock() { diff --git a/core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java b/core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java index d701586fc6c..fe20b7de82d 100644 --- a/core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java +++ b/core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java @@ -44,6 +44,7 @@ public class DaoStateSnapshotService implements DaoStateListener { private final DaoStateStorageService daoStateStorageService; private DaoState snapshotCandidate; + private int chainHeightOfLastApplySnapshot; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -113,15 +114,32 @@ public void applySnapshot(boolean fromReorg) { DaoState persisted = daoStateStorageService.getPersistedBsqState(); if (persisted != null) { LinkedList blocks = persisted.getBlocks(); + int chainHeightOfPersisted = persisted.getChainHeight(); if (!blocks.isEmpty()) { int heightOfLastBlock = blocks.getLast().getHeight(); log.info("applySnapshot from persisted daoState with height of last block {}", heightOfLastBlock); - if (isValidHeight(heightOfLastBlock)) - daoStateService.applySnapshot(persisted); + if (isValidHeight(heightOfLastBlock)) { + if (chainHeightOfLastApplySnapshot != chainHeightOfPersisted) { + chainHeightOfLastApplySnapshot = chainHeightOfPersisted; + daoStateService.applySnapshot(persisted); + } else { + // The reorg might have been caused by the previous parsing which might contains a range of + // blocks. + log.warn("We applied already a snapshot with chainHeight {}. We will reset the daoState and " + + "start over from the genesis transaction again.", chainHeightOfLastApplySnapshot); + persisted = new DaoState(); + int genesisBlockHeight = genesisTxInfo.getGenesisBlockHeight(); + persisted.setChainHeight(genesisBlockHeight); + chainHeightOfLastApplySnapshot = genesisBlockHeight; + daoStateService.applySnapshot(persisted); + } + } } else if (fromReorg) { log.info("We got a reorg and we want to apply the snapshot but it is empty. That is expected in the first blocks until the " + "first snapshot has been created. We use our applySnapshot method and restart from the genesis tx"); - persisted.setChainHeight(genesisTxInfo.getGenesisBlockHeight()); + int genesisBlockHeight = genesisTxInfo.getGenesisBlockHeight(); + persisted.setChainHeight(genesisBlockHeight); + chainHeightOfLastApplySnapshot = genesisBlockHeight; daoStateService.applySnapshot(persisted); } } else { diff --git a/core/src/main/java/bisq/core/dao/state/model/DaoState.java b/core/src/main/java/bisq/core/dao/state/model/DaoState.java index ff2a21ed3f3..a907ef2a0ec 100644 --- a/core/src/main/java/bisq/core/dao/state/model/DaoState.java +++ b/core/src/main/java/bisq/core/dao/state/model/DaoState.java @@ -69,7 +69,7 @@ public static DaoState getClone(DaoState daoState) { /////////////////////////////////////////////////////////////////////////////////////////// @Getter - private int chainHeight; + private int chainHeight; // Is set initially to genesis height @Getter private final LinkedList blocks; @Getter diff --git a/core/src/test/java/bisq/core/dao/node/full/BlockParserTest.java b/core/src/test/java/bisq/core/dao/node/full/BlockParserTest.java index 502d7c35f75..b2bac9d193c 100644 --- a/core/src/test/java/bisq/core/dao/node/full/BlockParserTest.java +++ b/core/src/test/java/bisq/core/dao/node/full/BlockParserTest.java @@ -19,7 +19,6 @@ import bisq.core.dao.node.parser.BlockParser; import bisq.core.dao.node.parser.TxParser; -import bisq.core.dao.node.parser.exceptions.BlockNotConnectingException; import bisq.core.dao.state.DaoStateService; import bisq.core.dao.state.model.blockchain.TxInput; import bisq.core.dao.state.model.blockchain.TxOutputKey; @@ -28,8 +27,6 @@ import org.bitcoinj.core.Coin; -import com.neemre.btcdcli4j.core.BitcoindException; -import com.neemre.btcdcli4j.core.CommunicationException; import com.neemre.btcdcli4j.core.domain.RawBlock; import com.neemre.btcdcli4j.core.domain.RawTransaction; @@ -137,7 +134,7 @@ public void testIsBsqTx() { } @Test - public void testParseBlocks() throws BitcoindException, CommunicationException, BlockNotConnectingException, RpcException { + public void testParseBlocks() { // Setup blocks to test, starting before genesis // Only the transactions related to bsq are relevant, no checks are done on correctness of blocks or other txs // so hashes and most other data don't matter diff --git a/core/src/test/java/bisq/core/dao/state/DaoStateServiceTest.java b/core/src/test/java/bisq/core/dao/state/DaoStateServiceTest.java index 79c180652f7..0d9eff411b8 100644 --- a/core/src/test/java/bisq/core/dao/state/DaoStateServiceTest.java +++ b/core/src/test/java/bisq/core/dao/state/DaoStateServiceTest.java @@ -40,8 +40,8 @@ public void testIsBlockHashKnown() { Block block = new Block(0, 1534800000, "fakeblockhash0", null); stateService.onNewBlockWithEmptyTxs(block); Assert.assertEquals( - "Block that was added should exist.", - true, + "Block has to be genesis block to get added.", + false, stateService.isBlockHashKnown("fakeblockhash0") ); @@ -62,10 +62,5 @@ public void testIsBlockHashKnown() { false, stateService.isBlockHashKnown("fakeblockhash4") ); - Assert.assertEquals( - "Block that was added along with more blocks should exist.", - true, - stateService.isBlockHashKnown("fakeblockhash3") - ); } } diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java index 0f913e32520..716673048d3 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java @@ -44,7 +44,6 @@ import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -281,8 +280,7 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { // We need to take care that the update period between releases stay short as with the current // situation before 0.9 release we receive 4000 objects with a newly installed client, which // causes the application to stay stuck for quite a while at startup. - log.info("Start processing {} delayedItems.", processDelayedItems.size()); - long startTs = new Date().getTime(); + log.info("Start processing {} items.", processDelayedItems.size()); processDelayedItems.forEach(item -> { if (item instanceof ProtectedStorageEntry) dataStorage.addProtectedStorageEntry((ProtectedStorageEntry) item, sender, null, @@ -291,7 +289,6 @@ else if (item instanceof PersistableNetworkPayload) dataStorage.addPersistableNetworkPayload((PersistableNetworkPayload) item, sender, false, false, false, false); }); - log.info("Processing delayedItems completed after {} sec.", (new Date().getTime() - startTs) / 1000D); cleanup(); listener.onComplete();