diff --git a/common/src/main/java/bisq/common/file/JsonFileManager.java b/common/src/main/java/bisq/common/file/JsonFileManager.java index 1d1db8a9d88..4a10f508e3b 100644 --- a/common/src/main/java/bisq/common/file/JsonFileManager.java +++ b/common/src/main/java/bisq/common/file/JsonFileManager.java @@ -24,13 +24,26 @@ import java.io.File; import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + @Slf4j public class JsonFileManager { - private final ThreadPoolExecutor executor; + private final static List INSTANCES = new ArrayList<>(); + + public static void shutDownAllInstances() { + INSTANCES.forEach(JsonFileManager::shutDown); + } + + + @Nullable + private ThreadPoolExecutor executor; private final File dir; @@ -41,54 +54,62 @@ public class JsonFileManager { public JsonFileManager(File dir) { this.dir = dir; - this.executor = Utilities.getThreadPoolExecutor("JsonFileManagerExecutor", 5, 50, 60); + if (!dir.exists() && !dir.mkdir()) { + log.warn("make dir failed"); + } - if (!dir.exists()) - if (!dir.mkdir()) - log.warn("make dir failed"); + INSTANCES.add(this); + } - Runtime.getRuntime().addShutdownHook(new Thread(JsonFileManager.this::shutDown, - "JsonFileManager.ShutDownHook")); + @NotNull + protected ThreadPoolExecutor getExecutor() { + if (executor == null) { + executor = Utilities.getThreadPoolExecutor("JsonFileManagerExecutor", 5, 50, 60); + } + return executor; } public void shutDown() { - executor.shutdown(); + if (executor != null) { + executor.shutdown(); + } } - public void writeToDisc(String json, String fileName) { - executor.execute(() -> { - File jsonFile = new File(Paths.get(dir.getAbsolutePath(), fileName + ".json").toString()); - File tempFile = null; - PrintWriter printWriter = null; - try { - tempFile = File.createTempFile("temp", null, dir); - if (!executor.isShutdown() && !executor.isTerminated() && !executor.isTerminating()) - tempFile.deleteOnExit(); - - printWriter = new PrintWriter(tempFile); - printWriter.println(json); - - // This close call and comment is borrowed from FileManager. Not 100% sure it that is really needed but - // seems that had fixed in the past and we got reported issues on Windows so that fix might be still - // required. - // Close resources before replacing file with temp file because otherwise it causes problems on windows - // when rename temp file - printWriter.close(); + public void writeToDiscThreaded(String json, String fileName) { + getExecutor().execute(() -> writeToDisc(json, fileName)); + } - FileUtil.renameFile(tempFile, jsonFile); - } catch (Throwable t) { - log.error("storageFile " + jsonFile.toString()); - t.printStackTrace(); - } finally { - if (tempFile != null && tempFile.exists()) { - log.warn("Temp file still exists after failed save. We will delete it now. storageFile=" + fileName); - if (!tempFile.delete()) - log.error("Cannot delete temp file."); - } - - if (printWriter != null) - printWriter.close(); + public void writeToDisc(String json, String fileName) { + File jsonFile = new File(Paths.get(dir.getAbsolutePath(), fileName + ".json").toString()); + File tempFile = null; + PrintWriter printWriter = null; + try { + tempFile = File.createTempFile("temp", null, dir); + tempFile.deleteOnExit(); + + printWriter = new PrintWriter(tempFile); + printWriter.println(json); + + // This close call and comment is borrowed from FileManager. Not 100% sure it that is really needed but + // seems that had fixed in the past and we got reported issues on Windows so that fix might be still + // required. + // Close resources before replacing file with temp file because otherwise it causes problems on windows + // when rename temp file + printWriter.close(); + + FileUtil.renameFile(tempFile, jsonFile); + } catch (Throwable t) { + log.error("storageFile " + jsonFile.toString()); + t.printStackTrace(); + } finally { + if (tempFile != null && tempFile.exists()) { + log.warn("Temp file still exists after failed save. We will delete it now. storageFile=" + fileName); + if (!tempFile.delete()) + log.error("Cannot delete temp file."); } - }); + + if (printWriter != null) + printWriter.close(); + } } } diff --git a/core/src/main/java/bisq/core/app/misc/ExecutableForAppWithP2p.java b/core/src/main/java/bisq/core/app/misc/ExecutableForAppWithP2p.java index 8a22a56bcae..ef2c7dffaf2 100644 --- a/core/src/main/java/bisq/core/app/misc/ExecutableForAppWithP2p.java +++ b/core/src/main/java/bisq/core/app/misc/ExecutableForAppWithP2p.java @@ -21,6 +21,7 @@ import bisq.core.btc.setup.WalletsSetup; import bisq.core.btc.wallet.BsqWalletService; import bisq.core.btc.wallet.BtcWalletService; +import bisq.core.dao.DaoSetup; import bisq.core.offer.OpenOfferManager; import bisq.core.support.dispute.arbitration.arbitrator.ArbitratorManager; @@ -31,6 +32,7 @@ import bisq.common.UserThread; import bisq.common.app.DevEnv; import bisq.common.config.Config; +import bisq.common.file.JsonFileManager; import bisq.common.handlers.ResultHandler; import bisq.common.persistence.PersistenceManager; import bisq.common.setup.GracefulShutDownHandler; @@ -83,6 +85,8 @@ public void gracefulShutDown(ResultHandler resultHandler) { log.info("gracefulShutDown"); try { if (injector != null) { + JsonFileManager.shutDownAllInstances(); + injector.getInstance(DaoSetup.class).shutDown(); injector.getInstance(ArbitratorManager.class).shutDown(); injector.getInstance(OpenOfferManager.class).shutDown(() -> injector.getInstance(P2PService.class).shutDown(() -> { injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> { diff --git a/core/src/main/java/bisq/core/dao/node/BsqNode.java b/core/src/main/java/bisq/core/dao/node/BsqNode.java index e06be387575..fd44b875338 100644 --- a/core/src/main/java/bisq/core/dao/node/BsqNode.java +++ b/core/src/main/java/bisq/core/dao/node/BsqNode.java @@ -209,7 +209,7 @@ protected void onParseBlockChainComplete() { parseBlockchainComplete = true; daoStateService.onParseBlockChainComplete(); - maybeExportToJson(); + exportJsonFilesService.onParseBlockChainComplete(); } @SuppressWarnings("WeakerAccess") @@ -291,7 +291,7 @@ protected Optional doParseBlock(RawBlock rawBlock) throws RequiredReorgFr return Optional.empty(); } - protected void maybeExportToJson() { - exportJsonFilesService.maybeExportToJson(); + protected void maybeExportNewBlockToJson(Block block) { + exportJsonFilesService.onNewBlock(block); } } diff --git a/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java b/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java index 1c9c605f04d..193218f63fb 100644 --- a/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java +++ b/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java @@ -19,7 +19,6 @@ import bisq.core.dao.DaoSetupService; import bisq.core.dao.state.DaoStateService; -import bisq.core.dao.state.model.DaoState; import bisq.core.dao.state.model.blockchain.Block; import bisq.core.dao.state.model.blockchain.PubKeyScript; import bisq.core.dao.state.model.blockchain.Tx; @@ -27,7 +26,6 @@ import bisq.core.dao.state.model.blockchain.TxType; import bisq.common.config.Config; -import bisq.common.file.FileUtil; import bisq.common.file.JsonFileManager; import bisq.common.util.Utilities; @@ -37,18 +35,11 @@ import javax.inject.Named; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - import java.nio.file.Paths; import java.io.File; -import java.io.IOException; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -56,17 +47,13 @@ import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; - @Slf4j public class ExportJsonFilesService implements DaoSetupService { private final DaoStateService daoStateService; private final File storageDir; - private final boolean dumpBlockchainData; - - private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter", - 1, 1, 1200); - private JsonFileManager txFileManager, txOutputFileManager, bsqStateFileManager; + private boolean dumpBlockchainData; + private JsonFileManager blockFileManager, txFileManager, txOutputFileManager, bsqStateFileManager; + private File blockDir; @Inject public ExportJsonFilesService(DaoStateService daoStateService, @@ -88,88 +75,135 @@ public void addListeners() { @Override public void start() { - if (dumpBlockchainData) { - File jsonDir = new File(Paths.get(storageDir.getAbsolutePath(), "json").toString()); - File txDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "tx").toString()); - File txOutputDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "txo").toString()); - File bsqStateDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "all").toString()); - try { - if (txDir.exists()) - FileUtil.deleteDirectory(txDir); - if (txOutputDir.exists()) - FileUtil.deleteDirectory(txOutputDir); - if (bsqStateDir.exists()) - FileUtil.deleteDirectory(bsqStateDir); - if (jsonDir.exists()) - FileUtil.deleteDirectory(jsonDir); - } catch (IOException e) { - log.error(e.toString()); - e.printStackTrace(); - } + if (!dumpBlockchainData) { + return; + } - if (!jsonDir.mkdir()) - log.warn("make jsonDir failed.\njsonDir=" + jsonDir.getAbsolutePath()); + File jsonDir = new File(Paths.get(storageDir.getAbsolutePath(), "json").toString()); + blockDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "block").toString()); + File txDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "tx").toString()); + File txOutputDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "txo").toString()); + File bsqStateDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "all").toString()); - if (!txDir.mkdir()) - log.warn("make txDir failed.\ntxDir=" + txDir.getAbsolutePath()); + if (!jsonDir.mkdir()) + log.warn("make jsonDir failed.\njsonDir=" + jsonDir.getAbsolutePath()); - if (!txOutputDir.mkdir()) - log.warn("make txOutputDir failed.\ntxOutputDir=" + txOutputDir.getAbsolutePath()); + if (!blockDir.mkdir()) + log.warn("make blockDir failed.\njsonDir=" + blockDir.getAbsolutePath()); - if (!bsqStateDir.mkdir()) - log.warn("make bsqStateDir failed.\nbsqStateDir=" + bsqStateDir.getAbsolutePath()); + if (!txDir.mkdir()) + log.warn("make txDir failed.\ntxDir=" + txDir.getAbsolutePath()); - txFileManager = new JsonFileManager(txDir); - txOutputFileManager = new JsonFileManager(txOutputDir); - bsqStateFileManager = new JsonFileManager(bsqStateDir); - } + if (!txOutputDir.mkdir()) + log.warn("make txOutputDir failed.\ntxOutputDir=" + txOutputDir.getAbsolutePath()); + + if (!bsqStateDir.mkdir()) + log.warn("make bsqStateDir failed.\nbsqStateDir=" + bsqStateDir.getAbsolutePath()); + + blockFileManager = new JsonFileManager(blockDir); + txFileManager = new JsonFileManager(txDir); + txOutputFileManager = new JsonFileManager(txOutputDir); + bsqStateFileManager = new JsonFileManager(bsqStateDir); } public void shutDown() { - if (dumpBlockchainData && txFileManager != null) { - txFileManager.shutDown(); - txOutputFileManager.shutDown(); - bsqStateFileManager.shutDown(); + if (!dumpBlockchainData) { + return; } + + blockFileManager.shutDown(); + txFileManager.shutDown(); + txOutputFileManager.shutDown(); + bsqStateFileManager.shutDown(); + dumpBlockchainData = false; } - public void maybeExportToJson() { - if (dumpBlockchainData && - daoStateService.isParseBlockChainComplete()) { - // We store the data we need once we write the data to disk (in the thread) locally. - // Access to daoStateService is single threaded, we must not access daoStateService from the thread. - List allJsonTxOutputs = new ArrayList<>(); - - List jsonTxs = daoStateService.getUnorderedTxStream() - .map(tx -> { - JsonTx jsonTx = getJsonTx(tx); - allJsonTxOutputs.addAll(jsonTx.getOutputs()); - return jsonTx; - }).collect(Collectors.toList()); - - DaoState daoState = daoStateService.getClone(); - List jsonBlockList = daoState.getBlocks().stream() - .map(this::getJsonBlock) - .collect(Collectors.toList()); - JsonBlocks jsonBlocks = new JsonBlocks(daoState.getChainHeight(), jsonBlockList); + public void onNewBlock(Block block) { + if (!dumpBlockchainData) { + return; + } - ListenableFuture future = executor.submit(() -> { - bsqStateFileManager.writeToDisc(Utilities.objectToJson(jsonBlocks), "blocks"); - allJsonTxOutputs.forEach(jsonTxOutput -> txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId())); - jsonTxs.forEach(jsonTx -> txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId())); - return null; - }); + // We do write the block on the main thread as the overhead to create a thread and risk for inconsistency is not + // worth the potential performance gain. + processBlock(block, true); + } - Futures.addCallback(future, new FutureCallback<>() { - public void onSuccess(Void ignore) { - } + private void processBlock(Block block, boolean doDumpDaoState) { + int lastPersistedBlock = getLastPersistedBlock(); + if (block.getHeight() <= lastPersistedBlock) { + return; + } + + long ts = System.currentTimeMillis(); + JsonBlock jsonBlock = getJsonBlock(block); + blockFileManager.writeToDisc(Utilities.objectToJson(jsonBlock), String.valueOf(jsonBlock.getHeight())); + + jsonBlock.getTxs().forEach(jsonTx -> { + txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId()); + + jsonTx.getOutputs().forEach(jsonTxOutput -> + txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId())); + }); - public void onFailure(@NotNull Throwable throwable) { - log.error(throwable.toString()); - throwable.printStackTrace(); + log.info("Write json data for block {} took {} ms", block.getHeight(), System.currentTimeMillis() - ts); + + if (doDumpDaoState) { + dumpDaoState(); + } + } + + public void onParseBlockChainComplete() { + if (!dumpBlockchainData) { + return; + } + + int lastPersistedBlock = getLastPersistedBlock(); + List blocks = daoStateService.getBlocksFromBlockHeight(lastPersistedBlock + 1, Integer.MAX_VALUE); + + // We use a thread here to write all past blocks to avoid that the main thread gets blocked for too long. + new Thread(() -> { + Thread.currentThread().setName("Write all blocks to json"); + blocks.forEach(e -> processBlock(e, false)); + }).start(); + + dumpDaoState(); + } + + private void dumpDaoState() { + // TODO we should get rid of that data structure and use the individual jsonBlocks instead as we cannot cache data + // here and re-write each time the full blockchain which is already > 200 MB + // Once the webapp has impl the changes we can delete that here. + long ts = System.currentTimeMillis(); + List jsonBlockList = daoStateService.getBlocks().stream() + .map(this::getJsonBlock) + .collect(Collectors.toList()); + JsonBlocks jsonBlocks = new JsonBlocks(daoStateService.getChainHeight(), jsonBlockList); + + // We use here the thread write method as the data is quite large and write can take a bit + bsqStateFileManager.writeToDiscThreaded(Utilities.objectToJson(jsonBlocks), "blocks"); + log.info("Dumping full bsqState with {} blocks took {} ms", + jsonBlocks.getBlocks().size(), System.currentTimeMillis() - ts); + } + + private int getLastPersistedBlock() { + // At start we use one block before genesis + int result = daoStateService.getGenesisBlockHeight() - 1; + String[] list = blockDir.list(); + if (list != null && list.length > 0) { + List blocks = Arrays.stream(list) + .filter(e -> !e.endsWith(".tmp")) + .map(e -> e.replace(".json", "")) + .map(Integer::valueOf) + .sorted() + .collect(Collectors.toList()); + if (!blocks.isEmpty()) { + Integer lastBlockHeight = blocks.get(blocks.size() - 1); + if (lastBlockHeight > result) { + result = lastBlockHeight; } - }, MoreExecutors.directExecutor()); + } } + return result; } private JsonBlock getJsonBlock(Block block) { diff --git a/core/src/main/java/bisq/core/dao/node/full/FullNode.java b/core/src/main/java/bisq/core/dao/node/full/FullNode.java index 91e9d41362d..f4e42dd7932 100644 --- a/core/src/main/java/bisq/core/dao/node/full/FullNode.java +++ b/core/src/main/java/bisq/core/dao/node/full/FullNode.java @@ -168,7 +168,7 @@ private void addBlockHandler() { } private void onNewBlock(Block block) { - maybeExportToJson(); + maybeExportNewBlockToJson(block); if (p2pNetworkReady && parseBlockchainComplete) fullNodeNetworkService.publishNewBlock(block); diff --git a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java index a51c5a11235..99cb779682b 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java +++ b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java @@ -28,6 +28,7 @@ import bisq.core.dao.node.parser.exceptions.RequiredReorgFromSnapshotException; import bisq.core.dao.state.DaoStateService; import bisq.core.dao.state.DaoStateSnapshotService; +import bisq.core.dao.state.model.blockchain.Block; import bisq.network.p2p.P2PService; import bisq.network.p2p.network.Connection; @@ -39,6 +40,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; @@ -227,19 +229,18 @@ private void runDelayedBatchProcessing(List blocks, Runnable resultHan } // We received a new block - private void onNewBlockReceived(RawBlock block) { - int blockHeight = block.getHeight(); - log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, block.getHash()); + private void onNewBlockReceived(RawBlock rawBlock) { + int blockHeight = rawBlock.getHeight(); + log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, rawBlock.getHash()); // We only update chainTipHeight if we get a newer block if (blockHeight > chainTipHeight) chainTipHeight = blockHeight; try { - doParseBlock(block); + Optional optionalBlock = doParseBlock(rawBlock); + optionalBlock.ifPresent(this::maybeExportNewBlockToJson); } catch (RequiredReorgFromSnapshotException ignore) { } - - maybeExportToJson(); } } diff --git a/core/src/main/java/bisq/core/offer/OfferBookService.java b/core/src/main/java/bisq/core/offer/OfferBookService.java index 4176132baa9..66ed7ca2b35 100644 --- a/core/src/main/java/bisq/core/offer/OfferBookService.java +++ b/core/src/main/java/bisq/core/offer/OfferBookService.java @@ -245,6 +245,6 @@ private void doDumpStatistics() { }) .filter(Objects::nonNull) .collect(Collectors.toList()); - jsonFileManager.writeToDisc(Utilities.objectToJson(offerForJsonList), "offers_statistics"); + jsonFileManager.writeToDiscThreaded(Utilities.objectToJson(offerForJsonList), "offers_statistics"); } } diff --git a/core/src/main/java/bisq/core/trade/DumpDelayedPayoutTx.java b/core/src/main/java/bisq/core/trade/DumpDelayedPayoutTx.java index c407126a9db..3d2785c01e2 100644 --- a/core/src/main/java/bisq/core/trade/DumpDelayedPayoutTx.java +++ b/core/src/main/java/bisq/core/trade/DumpDelayedPayoutTx.java @@ -58,7 +58,7 @@ public void maybeDumpDelayedPayoutTxs(TradableList trada .map(trade -> new DelayedPayoutHash(trade.getId(), Utilities.bytesAsHexString(((Trade) trade).getDelayedPayoutTxBytes()))) .collect(Collectors.toList()); - jsonFileManager.writeToDisc(Utilities.objectToJson(delayedPayoutHashes), fileName); + jsonFileManager.writeToDiscThreaded(Utilities.objectToJson(delayedPayoutHashes), fileName); } } diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java index dedc250f85a..01a978c1a9e 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java @@ -123,13 +123,13 @@ private void maybeDumpStatistics() { ArrayList fiatCurrencyList = CurrencyUtil.getAllSortedFiatCurrencies().stream() .map(e -> new CurrencyTuple(e.getCode(), e.getName(), 8)) .collect(Collectors.toCollection(ArrayList::new)); - jsonFileManager.writeToDisc(Utilities.objectToJson(fiatCurrencyList), "fiat_currency_list"); + jsonFileManager.writeToDiscThreaded(Utilities.objectToJson(fiatCurrencyList), "fiat_currency_list"); ArrayList cryptoCurrencyList = CurrencyUtil.getAllSortedCryptoCurrencies().stream() .map(e -> new CurrencyTuple(e.getCode(), e.getName(), 8)) .collect(Collectors.toCollection(ArrayList::new)); cryptoCurrencyList.add(0, new CurrencyTuple(Res.getBaseCurrencyCode(), Res.getBaseCurrencyName(), 8)); - jsonFileManager.writeToDisc(Utilities.objectToJson(cryptoCurrencyList), "crypto_currency_list"); + jsonFileManager.writeToDiscThreaded(Utilities.objectToJson(cryptoCurrencyList), "crypto_currency_list"); } List list = observableTradeStatisticsSet.stream() @@ -138,6 +138,6 @@ private void maybeDumpStatistics() { .collect(Collectors.toList()); TradeStatisticsForJson[] array = new TradeStatisticsForJson[list.size()]; list.toArray(array); - jsonFileManager.writeToDisc(Utilities.objectToJson(array), "trade_statistics"); + jsonFileManager.writeToDiscThreaded(Utilities.objectToJson(array), "trade_statistics"); } }