diff --git a/core/src/main/java/bisq/core/dao/node/BsqNode.java b/core/src/main/java/bisq/core/dao/node/BsqNode.java index fd44b875338..e06be387575 100644 --- a/core/src/main/java/bisq/core/dao/node/BsqNode.java +++ b/core/src/main/java/bisq/core/dao/node/BsqNode.java @@ -209,7 +209,7 @@ protected void onParseBlockChainComplete() { parseBlockchainComplete = true; daoStateService.onParseBlockChainComplete(); - exportJsonFilesService.onParseBlockChainComplete(); + maybeExportToJson(); } @SuppressWarnings("WeakerAccess") @@ -291,7 +291,7 @@ protected Optional doParseBlock(RawBlock rawBlock) throws RequiredReorgFr return Optional.empty(); } - protected void maybeExportNewBlockToJson(Block block) { - exportJsonFilesService.onNewBlock(block); + protected void maybeExportToJson() { + exportJsonFilesService.maybeExportToJson(); } } diff --git a/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java b/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java index 193218f63fb..1c9c605f04d 100644 --- a/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java +++ b/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java @@ -19,6 +19,7 @@ import bisq.core.dao.DaoSetupService; import bisq.core.dao.state.DaoStateService; +import bisq.core.dao.state.model.DaoState; import bisq.core.dao.state.model.blockchain.Block; import bisq.core.dao.state.model.blockchain.PubKeyScript; import bisq.core.dao.state.model.blockchain.Tx; @@ -26,6 +27,7 @@ import bisq.core.dao.state.model.blockchain.TxType; import bisq.common.config.Config; +import bisq.common.file.FileUtil; import bisq.common.file.JsonFileManager; import bisq.common.util.Utilities; @@ -35,11 +37,18 @@ import javax.inject.Named; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + import java.nio.file.Paths; import java.io.File; +import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -47,13 +56,17 @@ import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; + @Slf4j public class ExportJsonFilesService implements DaoSetupService { private final DaoStateService daoStateService; private final File storageDir; - private boolean dumpBlockchainData; - private JsonFileManager blockFileManager, txFileManager, txOutputFileManager, bsqStateFileManager; - private File blockDir; + private final boolean dumpBlockchainData; + + private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter", + 1, 1, 1200); + private JsonFileManager txFileManager, txOutputFileManager, bsqStateFileManager; @Inject public ExportJsonFilesService(DaoStateService daoStateService, @@ -75,135 +88,88 @@ public void addListeners() { @Override public void start() { - if (!dumpBlockchainData) { - return; - } - - File jsonDir = new File(Paths.get(storageDir.getAbsolutePath(), "json").toString()); - blockDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "block").toString()); - File txDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "tx").toString()); - File txOutputDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "txo").toString()); - File bsqStateDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "all").toString()); - - if (!jsonDir.mkdir()) - log.warn("make jsonDir failed.\njsonDir=" + jsonDir.getAbsolutePath()); - - if (!blockDir.mkdir()) - log.warn("make blockDir failed.\njsonDir=" + blockDir.getAbsolutePath()); - - if (!txDir.mkdir()) - log.warn("make txDir failed.\ntxDir=" + txDir.getAbsolutePath()); - - if (!txOutputDir.mkdir()) - log.warn("make txOutputDir failed.\ntxOutputDir=" + txOutputDir.getAbsolutePath()); + if (dumpBlockchainData) { + File jsonDir = new File(Paths.get(storageDir.getAbsolutePath(), "json").toString()); + File txDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "tx").toString()); + File txOutputDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "txo").toString()); + File bsqStateDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "all").toString()); + try { + if (txDir.exists()) + FileUtil.deleteDirectory(txDir); + if (txOutputDir.exists()) + FileUtil.deleteDirectory(txOutputDir); + if (bsqStateDir.exists()) + FileUtil.deleteDirectory(bsqStateDir); + if (jsonDir.exists()) + FileUtil.deleteDirectory(jsonDir); + } catch (IOException e) { + log.error(e.toString()); + e.printStackTrace(); + } - if (!bsqStateDir.mkdir()) - log.warn("make bsqStateDir failed.\nbsqStateDir=" + bsqStateDir.getAbsolutePath()); + if (!jsonDir.mkdir()) + log.warn("make jsonDir failed.\njsonDir=" + jsonDir.getAbsolutePath()); - blockFileManager = new JsonFileManager(blockDir); - txFileManager = new JsonFileManager(txDir); - txOutputFileManager = new JsonFileManager(txOutputDir); - bsqStateFileManager = new JsonFileManager(bsqStateDir); - } + if (!txDir.mkdir()) + log.warn("make txDir failed.\ntxDir=" + txDir.getAbsolutePath()); - public void shutDown() { - if (!dumpBlockchainData) { - return; - } + if (!txOutputDir.mkdir()) + log.warn("make txOutputDir failed.\ntxOutputDir=" + txOutputDir.getAbsolutePath()); - blockFileManager.shutDown(); - txFileManager.shutDown(); - txOutputFileManager.shutDown(); - bsqStateFileManager.shutDown(); - dumpBlockchainData = false; - } + if (!bsqStateDir.mkdir()) + log.warn("make bsqStateDir failed.\nbsqStateDir=" + bsqStateDir.getAbsolutePath()); - public void onNewBlock(Block block) { - if (!dumpBlockchainData) { - return; + txFileManager = new JsonFileManager(txDir); + txOutputFileManager = new JsonFileManager(txOutputDir); + bsqStateFileManager = new JsonFileManager(bsqStateDir); } - - // We do write the block on the main thread as the overhead to create a thread and risk for inconsistency is not - // worth the potential performance gain. - processBlock(block, true); } - private void processBlock(Block block, boolean doDumpDaoState) { - int lastPersistedBlock = getLastPersistedBlock(); - if (block.getHeight() <= lastPersistedBlock) { - return; - } - - long ts = System.currentTimeMillis(); - JsonBlock jsonBlock = getJsonBlock(block); - blockFileManager.writeToDisc(Utilities.objectToJson(jsonBlock), String.valueOf(jsonBlock.getHeight())); - - jsonBlock.getTxs().forEach(jsonTx -> { - txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId()); - - jsonTx.getOutputs().forEach(jsonTxOutput -> - txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId())); - }); - - log.info("Write json data for block {} took {} ms", block.getHeight(), System.currentTimeMillis() - ts); - - if (doDumpDaoState) { - dumpDaoState(); + public void shutDown() { + if (dumpBlockchainData && txFileManager != null) { + txFileManager.shutDown(); + txOutputFileManager.shutDown(); + bsqStateFileManager.shutDown(); } } - public void onParseBlockChainComplete() { - if (!dumpBlockchainData) { - return; - } - - int lastPersistedBlock = getLastPersistedBlock(); - List blocks = daoStateService.getBlocksFromBlockHeight(lastPersistedBlock + 1, Integer.MAX_VALUE); - - // We use a thread here to write all past blocks to avoid that the main thread gets blocked for too long. - new Thread(() -> { - Thread.currentThread().setName("Write all blocks to json"); - blocks.forEach(e -> processBlock(e, false)); - }).start(); - - dumpDaoState(); - } + public void maybeExportToJson() { + if (dumpBlockchainData && + daoStateService.isParseBlockChainComplete()) { + // We store the data we need once we write the data to disk (in the thread) locally. + // Access to daoStateService is single threaded, we must not access daoStateService from the thread. + List allJsonTxOutputs = new ArrayList<>(); + + List jsonTxs = daoStateService.getUnorderedTxStream() + .map(tx -> { + JsonTx jsonTx = getJsonTx(tx); + allJsonTxOutputs.addAll(jsonTx.getOutputs()); + return jsonTx; + }).collect(Collectors.toList()); + + DaoState daoState = daoStateService.getClone(); + List jsonBlockList = daoState.getBlocks().stream() + .map(this::getJsonBlock) + .collect(Collectors.toList()); + JsonBlocks jsonBlocks = new JsonBlocks(daoState.getChainHeight(), jsonBlockList); - private void dumpDaoState() { - // TODO we should get rid of that data structure and use the individual jsonBlocks instead as we cannot cache data - // here and re-write each time the full blockchain which is already > 200 MB - // Once the webapp has impl the changes we can delete that here. - long ts = System.currentTimeMillis(); - List jsonBlockList = daoStateService.getBlocks().stream() - .map(this::getJsonBlock) - .collect(Collectors.toList()); - JsonBlocks jsonBlocks = new JsonBlocks(daoStateService.getChainHeight(), jsonBlockList); + ListenableFuture future = executor.submit(() -> { + bsqStateFileManager.writeToDisc(Utilities.objectToJson(jsonBlocks), "blocks"); + allJsonTxOutputs.forEach(jsonTxOutput -> txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId())); + jsonTxs.forEach(jsonTx -> txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId())); + return null; + }); - // We use here the thread write method as the data is quite large and write can take a bit - bsqStateFileManager.writeToDiscThreaded(Utilities.objectToJson(jsonBlocks), "blocks"); - log.info("Dumping full bsqState with {} blocks took {} ms", - jsonBlocks.getBlocks().size(), System.currentTimeMillis() - ts); - } + Futures.addCallback(future, new FutureCallback<>() { + public void onSuccess(Void ignore) { + } - private int getLastPersistedBlock() { - // At start we use one block before genesis - int result = daoStateService.getGenesisBlockHeight() - 1; - String[] list = blockDir.list(); - if (list != null && list.length > 0) { - List blocks = Arrays.stream(list) - .filter(e -> !e.endsWith(".tmp")) - .map(e -> e.replace(".json", "")) - .map(Integer::valueOf) - .sorted() - .collect(Collectors.toList()); - if (!blocks.isEmpty()) { - Integer lastBlockHeight = blocks.get(blocks.size() - 1); - if (lastBlockHeight > result) { - result = lastBlockHeight; + public void onFailure(@NotNull Throwable throwable) { + log.error(throwable.toString()); + throwable.printStackTrace(); } - } + }, MoreExecutors.directExecutor()); } - return result; } private JsonBlock getJsonBlock(Block block) { diff --git a/core/src/main/java/bisq/core/dao/node/full/FullNode.java b/core/src/main/java/bisq/core/dao/node/full/FullNode.java index f4e42dd7932..91e9d41362d 100644 --- a/core/src/main/java/bisq/core/dao/node/full/FullNode.java +++ b/core/src/main/java/bisq/core/dao/node/full/FullNode.java @@ -168,7 +168,7 @@ private void addBlockHandler() { } private void onNewBlock(Block block) { - maybeExportNewBlockToJson(block); + maybeExportToJson(); if (p2pNetworkReady && parseBlockchainComplete) fullNodeNetworkService.publishNewBlock(block); diff --git a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java index 99cb779682b..a51c5a11235 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java +++ b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java @@ -28,7 +28,6 @@ import bisq.core.dao.node.parser.exceptions.RequiredReorgFromSnapshotException; import bisq.core.dao.state.DaoStateService; import bisq.core.dao.state.DaoStateSnapshotService; -import bisq.core.dao.state.model.blockchain.Block; import bisq.network.p2p.P2PService; import bisq.network.p2p.network.Connection; @@ -40,7 +39,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Optional; import lombok.extern.slf4j.Slf4j; @@ -229,18 +227,19 @@ private void runDelayedBatchProcessing(List blocks, Runnable resultHan } // We received a new block - private void onNewBlockReceived(RawBlock rawBlock) { - int blockHeight = rawBlock.getHeight(); - log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, rawBlock.getHash()); + private void onNewBlockReceived(RawBlock block) { + int blockHeight = block.getHeight(); + log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, block.getHash()); // We only update chainTipHeight if we get a newer block if (blockHeight > chainTipHeight) chainTipHeight = blockHeight; try { - Optional optionalBlock = doParseBlock(rawBlock); - optionalBlock.ifPresent(this::maybeExportNewBlockToJson); + doParseBlock(block); } catch (RequiredReorgFromSnapshotException ignore) { } + + maybeExportToJson(); } }