Skip to content

Commit

Permalink
Merge bitcoin#13033: Build txindex in parallel with validation
Browse files Browse the repository at this point in the history
9b27047 [doc] Include txindex changes in the release notes. (Jim Posen)
ed77dd6 [test] Simple unit test for TxIndex. (Jim Posen)
6d772a3 [rpc] Public interfaces to GetTransaction block until synced. (Jim Posen)
a03f804 [index] Move disk IO logic from GetTransaction to TxIndex::FindTx. (Jim Posen)
e0a3b80 [validation] Replace tx index code in validation code with TxIndex. (Jim Posen)
8181db8 [init] Initialize and start TxIndex in init code. (Jim Posen)
f90c3a6 [index] TxIndex method to wait until caught up. (Jim Posen)
70d510d [index] Allow TxIndex sync thread to be interrupted. (Jim Posen)
94b4f8b [index] TxIndex initial sync thread. (Jim Posen)
34d68bf [index] Create new TxIndex class. (Jim Posen)
c88bcec [db] Migration for txindex data to new, separate database. (Jim Posen)
0cb8303 [db] Create separate database for txindex. (Jim Posen)

Pull request description:

  I'm re-opening bitcoin#11857 as a new pull request because the last one stopped loading for people

  -------------------------------

  This refactors the tx index code to be in it's own class and get built concurrently with validation code. The main benefit is decoupling and moving the txindex into a separate DB. The primary motivation is to lay the groundwork for other indexers that might be desired (such as the [compact filters](bitcoin/bips#636)). The basic idea is that the TxIndex spins up its own thread, which first syncs the txindex to the current block index, then once in sync the BlockConnected ValidationInterface hook writes new blocks.

  ### DB changes

  At the suggestion of some other developers, the txindex has been split out into a separate database. A data migration runs at startup on any nodes with a legacy txindex. Currently the migration blocks node initialization until complete.

  ### Open questions

  - Should the migration of txindex data from the old DB to the new DB block in init or should it happen in a background thread? The downside to backgrounding it is that `getrawtransaction` would return an error message saying the txindex is syncing while the migration is running.

  ### Impact

  In a sample size n=1 test where I synced nodes from scratch, the average time [Index writing](https://github.com/bitcoin/bitcoin/blob/master/src/validation.cpp#L1903) was 3.36ms in master and 1.72ms in this branch. The average time between `UpdateTip` log lines for sequential blocks between 400,000 and IBD end on mainnet was 0.297204s in master and 0.286134s in this branch. Most likely this is just variance in IBD times, but I can try with some more trials if people want.

Tree-SHA512: 451fd7d95df89dfafceaa723cdf0f7b137615b531cf5c5035cfb54e9ccc2026cec5ac85edbcf71b7f4e2f102e36e9202b8b3a667e1504a9e1a9976ab1f0079c4
  • Loading branch information
sipa authored and UdjinM6 committed Jun 5, 2021
1 parent 8529695 commit 0f9f133
Show file tree
Hide file tree
Showing 28 changed files with 877 additions and 105 deletions.
1 change: 1 addition & 0 deletions doc/files.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* evodb/*: special txes and quorums database
* fee_estimates.dat: stores statistics used to estimate minimum transaction fees and priorities required for confirmation
* governance.dat: stores data for governance obgects
* indexes/txindex/*: optional transaction index database (LevelDB); since 0.17.0
* llmq/*: quorum signatures database
* mempool.dat: dump of the mempool's transactions
* mncache.dat: stores data for masternode list
Expand Down
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ BITCOIN_CORE_H = \
fs.h \
httprpc.h \
httpserver.h \
index/txindex.h \
indirectmap.h \
init.h \
interfaces/handler.h \
Expand Down Expand Up @@ -316,6 +317,7 @@ libdash_server_a_SOURCES = \
evo/specialtx.cpp \
httprpc.cpp \
httpserver.cpp \
index/txindex.cpp \
init.cpp \
dbwrapper.cpp \
governance/governance.cpp \
Expand Down
1 change: 1 addition & 0 deletions src/Makefile.test.include
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ BITCOIN_TESTS =\
test/timedata_tests.cpp \
test/torcontrol_tests.cpp \
test/transaction_tests.cpp \
test/txindex_tests.cpp \
test/txvalidation_tests.cpp \
test/txvalidationcache_tests.cpp \
test/versionbits_tests.cpp \
Expand Down
3 changes: 3 additions & 0 deletions src/dbwrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ class CDBWrapper
CDBWrapper(const fs::path& path, size_t nCacheSize, bool fMemory = false, bool fWipe = false, bool obfuscate = false);
~CDBWrapper();

CDBWrapper(const CDBWrapper&) = delete;
CDBWrapper& operator=(const CDBWrapper&) = delete;

template <typename K>
bool ReadDataStream(const K& key, CDataStream& ssValue) const
{
Expand Down
315 changes: 315 additions & 0 deletions src/index/txindex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
// Copyright (c) 2017-2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#include <chainparams.h>
#include <index/txindex.h>
#include <init.h>
#include <tinyformat.h>
#include <ui_interface.h>
#include <util.h>
#include <validation.h>
#include <warnings.h>

constexpr int64_t SYNC_LOG_INTERVAL = 30; // seconds
constexpr int64_t SYNC_LOCATOR_WRITE_INTERVAL = 30; // seconds

std::unique_ptr<TxIndex> g_txindex;

template<typename... Args>
static void FatalError(const char* fmt, const Args&... args)
{
std::string strMessage = tfm::format(fmt, args...);
SetMiscWarning(strMessage);
LogPrintf("*** %s\n", strMessage);
uiInterface.ThreadSafeMessageBox(
"Error: A fatal internal error occurred, see debug.log for details",
"", CClientUIInterface::MSG_ERROR);
StartShutdown();
}

TxIndex::TxIndex(std::unique_ptr<TxIndexDB> db) :
m_db(std::move(db)), m_synced(false), m_best_block_index(nullptr)
{}

TxIndex::~TxIndex()
{
Interrupt();
Stop();
}

bool TxIndex::Init()
{
LOCK(cs_main);

// Attempt to migrate txindex from the old database to the new one. Even if
// chain_tip is null, the node could be reindexing and we still want to
// delete txindex records in the old database.
if (!m_db->MigrateData(*pblocktree, chainActive.GetLocator())) {
return false;
}

CBlockLocator locator;
if (!m_db->ReadBestBlock(locator)) {
locator.SetNull();
}

m_best_block_index = FindForkInGlobalIndex(chainActive, locator);
m_synced = m_best_block_index.load() == chainActive.Tip();
return true;
}

static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev)
{
AssertLockHeld(cs_main);

if (!pindex_prev) {
return chainActive.Genesis();
}

const CBlockIndex* pindex = chainActive.Next(pindex_prev);
if (pindex) {
return pindex;
}

return chainActive.Next(chainActive.FindFork(pindex_prev));
}

void TxIndex::ThreadSync()
{
const CBlockIndex* pindex = m_best_block_index.load();
if (!m_synced) {
auto& consensus_params = Params().GetConsensus();

int64_t last_log_time = 0;
int64_t last_locator_write_time = 0;
while (true) {
if (m_interrupt) {
WriteBestBlock(pindex);
return;
}

{
LOCK(cs_main);
const CBlockIndex* pindex_next = NextSyncBlock(pindex);
if (!pindex_next) {
WriteBestBlock(pindex);
m_best_block_index = pindex;
m_synced = true;
break;
}
pindex = pindex_next;
}

int64_t current_time = GetTime();
if (last_log_time + SYNC_LOG_INTERVAL < current_time) {
LogPrintf("Syncing txindex with block chain from height %d\n", pindex->nHeight);
last_log_time = current_time;
}

if (last_locator_write_time + SYNC_LOCATOR_WRITE_INTERVAL < current_time) {
WriteBestBlock(pindex);
last_locator_write_time = current_time;
}

CBlock block;
if (!ReadBlockFromDisk(block, pindex, consensus_params)) {
FatalError("%s: Failed to read block %s from disk",
__func__, pindex->GetBlockHash().ToString());
return;
}
if (!WriteBlock(block, pindex)) {
FatalError("%s: Failed to write block %s to tx index database",
__func__, pindex->GetBlockHash().ToString());
return;
}
}
}

if (pindex) {
LogPrintf("txindex is enabled at height %d\n", pindex->nHeight);
} else {
LogPrintf("txindex is enabled\n");
}
}

bool TxIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex)
{
CDiskTxPos pos(pindex->GetBlockPos(), GetSizeOfCompactSize(block.vtx.size()));
std::vector<std::pair<uint256, CDiskTxPos>> vPos;
vPos.reserve(block.vtx.size());
for (const auto& tx : block.vtx) {
vPos.emplace_back(tx->GetHash(), pos);
pos.nTxOffset += ::GetSerializeSize(*tx, SER_DISK, CLIENT_VERSION);
}
return m_db->WriteTxs(vPos);
}

bool TxIndex::WriteBestBlock(const CBlockIndex* block_index)
{
LOCK(cs_main);
if (!m_db->WriteBestBlock(chainActive.GetLocator(block_index))) {
return error("%s: Failed to write locator to disk", __func__);
}
return true;
}

void TxIndex::BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* pindex,
const std::vector<CTransactionRef>& txn_conflicted)
{
if (!m_synced) {
return;
}

const CBlockIndex* best_block_index = m_best_block_index.load();
if (!best_block_index) {
if (pindex->nHeight != 0) {
FatalError("%s: First block connected is not the genesis block (height=%d)",
__func__, pindex->nHeight);
return;
}
} else {
// Ensure block connects to an ancestor of the current best block. This should be the case
// most of the time, but may not be immediately after the the sync thread catches up and sets
// m_synced. Consider the case where there is a reorg and the blocks on the stale branch are
// in the ValidationInterface queue backlog even after the sync thread has caught up to the
// new chain tip. In this unlikely event, log a warning and let the queue clear.
if (best_block_index->GetAncestor(pindex->nHeight - 1) != pindex->pprev) {
LogPrintf("%s: WARNING: Block %s does not connect to an ancestor of " /* Continued */
"known best chain (tip=%s); not updating txindex\n",
__func__, pindex->GetBlockHash().ToString(),
best_block_index->GetBlockHash().ToString());
return;
}
}

if (WriteBlock(*block, pindex)) {
m_best_block_index = pindex;
} else {
FatalError("%s: Failed to write block %s to txindex",
__func__, pindex->GetBlockHash().ToString());
return;
}
}

void TxIndex::SetBestChain(const CBlockLocator& locator)
{
if (!m_synced) {
return;
}

const uint256& locator_tip_hash = locator.vHave.front();
const CBlockIndex* locator_tip_index;
{
LOCK(cs_main);
locator_tip_index = LookupBlockIndex(locator_tip_hash);
}

if (!locator_tip_index) {
FatalError("%s: First block (hash=%s) in locator was not found",
__func__, locator_tip_hash.ToString());
return;
}

// This checks that SetBestChain callbacks are received after BlockConnected. The check may fail
// immediately after the the sync thread catches up and sets m_synced. Consider the case where
// there is a reorg and the blocks on the stale branch are in the ValidationInterface queue
// backlog even after the sync thread has caught up to the new chain tip. In this unlikely
// event, log a warning and let the queue clear.
const CBlockIndex* best_block_index = m_best_block_index.load();
if (best_block_index->GetAncestor(locator_tip_index->nHeight) != locator_tip_index) {
LogPrintf("%s: WARNING: Locator contains block (hash=%s) not on known best " /* Continued */
"chain (tip=%s); not writing txindex locator\n",
__func__, locator_tip_hash.ToString(),
best_block_index->GetBlockHash().ToString());
return;
}

if (!m_db->WriteBestBlock(locator)) {
error("%s: Failed to write locator to disk", __func__);
}
}

bool TxIndex::BlockUntilSyncedToCurrentChain()
{
AssertLockNotHeld(cs_main);

if (!m_synced) {
return false;
}

{
// Skip the queue-draining stuff if we know we're caught up with
// chainActive.Tip().
LOCK(cs_main);
const CBlockIndex* chain_tip = chainActive.Tip();
const CBlockIndex* best_block_index = m_best_block_index.load();
if (best_block_index->GetAncestor(chain_tip->nHeight) == chain_tip) {
return true;
}
}

LogPrintf("%s: txindex is catching up on block notifications\n", __func__);
SyncWithValidationInterfaceQueue();
return true;
}

bool TxIndex::FindTx(const uint256& tx_hash, uint256& block_hash, CTransactionRef& tx) const
{
CDiskTxPos postx;
if (!m_db->ReadTxPos(tx_hash, postx)) {
return false;
}

CAutoFile file(OpenBlockFile(postx, true), SER_DISK, CLIENT_VERSION);
if (file.IsNull()) {
return error("%s: OpenBlockFile failed", __func__);
}
CBlockHeader header;
try {
file >> header;
fseek(file.Get(), postx.nTxOffset, SEEK_CUR);
file >> tx;
} catch (const std::exception& e) {
return error("%s: Deserialize or I/O error - %s", __func__, e.what());
}
if (tx->GetHash() != tx_hash) {
return error("%s: txid mismatch", __func__);
}
block_hash = header.GetHash();
return true;
}

bool TxIndex::HasTx(const uint256& tx_hash) const
{
CDiskTxPos postx;
return m_db->ReadTxPos(tx_hash, postx);
}

void TxIndex::Interrupt()
{
m_interrupt();
}

void TxIndex::Start()
{
// Need to register this ValidationInterface before running Init(), so that
// callbacks are not missed if Init sets m_synced to true.
RegisterValidationInterface(this);
if (!Init()) {
FatalError("%s: txindex failed to initialize", __func__);
return;
}

m_thread_sync = std::thread(&TraceThread<std::function<void()>>, "txindex",
std::bind(&TxIndex::ThreadSync, this));
}

void TxIndex::Stop()
{
UnregisterValidationInterface(this);

if (m_thread_sync.joinable()) {
m_thread_sync.join();
}
}
Loading

0 comments on commit 0f9f133

Please sign in to comment.