Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

index: Create IndexRunner class for activing indexes. #14111

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ BITCOIN_CORE_H = \
httprpc.h \
httpserver.h \
index/base.h \
index/error.h \
index/txindex.h \
index/runner.h \
indirectmap.h \
init.h \
interfaces/chain.h \
Expand Down Expand Up @@ -250,6 +252,7 @@ libbitcoin_server_a_SOURCES = \
httprpc.cpp \
httpserver.cpp \
index/base.cpp \
index/runner.cpp \
index/txindex.cpp \
interfaces/chain.cpp \
interfaces/handler.cpp \
Expand Down
56 changes: 4 additions & 52 deletions src/index/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,16 @@

#include <chainparams.h>
#include <index/base.h>
#include <shutdown.h>
#include <tinyformat.h>
#include <ui_interface.h>
#include <index/error.h>
#include <threadinterrupt.h>
#include <util/system.h>
#include <validation.h>
#include <warnings.h>

constexpr char DB_BEST_BLOCK = 'B';

constexpr int64_t SYNC_LOG_INTERVAL = 30; // seconds
constexpr int64_t SYNC_LOCATOR_WRITE_INTERVAL = 30; // seconds

template<typename... Args>
static void FatalError(const char* fmt, const Args&... args)
{
std::string strMessage = tfm::format(fmt, args...);
SetMiscWarning(strMessage);
LogPrintf("*** %s\n", strMessage);
uiInterface.ThreadSafeMessageBox(
"Error: A fatal internal error occurred, see debug.log for details",
"", CClientUIInterface::MSG_ERROR);
StartShutdown();
}

BaseIndex::DB::DB(const fs::path& path, size_t n_cache_size, bool f_memory, bool f_wipe, bool f_obfuscate) :
CDBWrapper(path, n_cache_size, f_memory, f_wipe, f_obfuscate)
{}
Expand All @@ -46,12 +32,6 @@ bool BaseIndex::DB::WriteBestBlock(const CBlockLocator& locator)
return Write(DB_BEST_BLOCK, locator);
}

BaseIndex::~BaseIndex()
{
Interrupt();
Stop();
}

bool BaseIndex::Init()
{
CBlockLocator locator;
Expand Down Expand Up @@ -85,7 +65,7 @@ static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev) EXCLUSIV
return chainActive.Next(chainActive.FindFork(pindex_prev));
}

void BaseIndex::ThreadSync()
void BaseIndex::ThreadSync(CThreadInterrupt* interrupt)
{
const CBlockIndex* pindex = m_best_block_index.load();
if (!m_synced) {
Expand All @@ -94,7 +74,7 @@ void BaseIndex::ThreadSync()
int64_t last_log_time = 0;
int64_t last_locator_write_time = 0;
while (true) {
if (m_interrupt) {
if (interrupt && *interrupt) {
WriteBestBlock(pindex);
return;
}
Expand Down Expand Up @@ -252,31 +232,3 @@ bool BaseIndex::BlockUntilSyncedToCurrentChain()
SyncWithValidationInterfaceQueue();
return true;
}

void BaseIndex::Interrupt()
{
m_interrupt();
}

void BaseIndex::Start()
{
// Need to register this ValidationInterface before running Init(), so that
// callbacks are not missed if Init sets m_synced to true.
RegisterValidationInterface(this);
if (!Init()) {
FatalError("%s: %s failed to initialize", __func__, GetName());
return;
}

m_thread_sync = std::thread(&TraceThread<std::function<void()>>, GetName(),
std::bind(&BaseIndex::ThreadSync, this));
}

void BaseIndex::Stop()
{
UnregisterValidationInterface(this);

if (m_thread_sync.joinable()) {
m_thread_sync.join();
}
}
27 changes: 8 additions & 19 deletions src/index/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
#include <dbwrapper.h>
#include <primitives/block.h>
#include <primitives/transaction.h>
#include <threadinterrupt.h>
#include <uint256.h>
#include <validationinterface.h>

class CBlockIndex;
class CThreadInterrupt;

/**
* Base class for indices of blockchain data. This implements
Expand All @@ -21,6 +21,8 @@ class CBlockIndex;
*/
class BaseIndex : public CValidationInterface
{
friend class IndexRunner;

protected:
class DB : public CDBWrapper
{
Expand All @@ -44,15 +46,12 @@ class BaseIndex : public CValidationInterface
/// The last block in the chain that the index is in sync with.
std::atomic<const CBlockIndex*> m_best_block_index{nullptr};

std::thread m_thread_sync;
CThreadInterrupt m_interrupt;

/// Sync the index with the block index starting from the current best block.
/// Intended to be run in its own thread, m_thread_sync, and can be
/// interrupted with m_interrupt. Once the index gets in sync, the m_synced
/// flag is set and the BlockConnected ValidationInterface callback takes
/// over and the sync thread exits.
void ThreadSync();
/// interrupted with the interrupt parameter. Once the index gets in sync,
/// the m_synced flag is set and the BlockConnected ValidationInterface
/// callback takes over and the sync thread exits.
void ThreadSync(CThreadInterrupt* interrupt);

/// Write the current chain block locator to the DB.
bool WriteBestBlock(const CBlockIndex* block_index);
Expand All @@ -75,24 +74,14 @@ class BaseIndex : public CValidationInterface
virtual const char* GetName() const = 0;

public:
/// Destructor interrupts sync thread if running and blocks until it exits.
virtual ~BaseIndex();
virtual ~BaseIndex() = default;

/// Blocks the current thread until the index is caught up to the current
/// state of the block chain. This only blocks if the index has gotten in
/// sync once and only needs to process blocks in the ValidationInterface
/// queue. If the index is catching up from far behind, this method does
/// not block and immediately returns false.
bool BlockUntilSyncedToCurrentChain();

void Interrupt();

/// Start initializes the sync state and registers the instance as a
/// ValidationInterface so that it stays in sync with blockchain updates.
void Start();

/// Stops the instance from staying in sync with blockchain updates.
void Stop();
};

#endif // BITCOIN_INDEX_BASE_H
28 changes: 28 additions & 0 deletions src/index/error.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2017-2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#ifndef BITCOIN_INDEX_ERROR_H
#define BITCOIN_INDEX_ERROR_H

#include <string>

#include <logging.h>
#include <shutdown.h>
#include <tinyformat.h>
#include <ui_interface.h>
#include <warnings.h>

template<typename... Args>
static void FatalError(const char* fmt, const Args&... args)
{
std::string strMessage = tfm::format(fmt, args...);
SetMiscWarning(strMessage);
LogPrintf("*** %s\n", strMessage);
uiInterface.ThreadSafeMessageBox(
"Error: A fatal internal error occurred, see debug.log for details",
"", CClientUIInterface::MSG_ERROR);
StartShutdown();
}

#endif // BITCOIN_INDEX_ERROR_H
67 changes: 67 additions & 0 deletions src/index/runner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#include <map>

#include <index/base.h>
#include <index/error.h>
#include <index/runner.h>
#include <validationinterface.h>

static std::map<BaseIndex*, IndexRunner> g_running_indexes;

IndexRunner::IndexRunner(BaseIndex* index) : m_index(index)
{
// Need to register this ValidationInterface before running Init(), so that
// callbacks are not missed if Init sets m_synced to true.
RegisterValidationInterface(m_index);
if (!m_index->Init()) {
FatalError("%s: %s failed to initialize", __func__, m_index->GetName());
return;
}

m_thread_sync = std::thread(&TraceThread<std::function<void()>>, m_index->GetName(),
std::bind(&BaseIndex::ThreadSync, m_index, &m_interrupt));
}

IndexRunner::~IndexRunner()
{
UnregisterValidationInterface(m_index);

Interrupt();
if (m_thread_sync.joinable()) {
m_thread_sync.join();
}
}

void IndexRunner::Interrupt()
{
m_interrupt();
}

bool StartIndex(BaseIndex* index)
{
if (!index) return false;
auto result = g_running_indexes.emplace(std::piecewise_construct,
std::forward_as_tuple(index),
std::forward_as_tuple(index));
return result.second;
}

bool InterruptIndex(BaseIndex* index)
{
if (!index) return false;

auto it = g_running_indexes.find(index);
if (it == g_running_indexes.end()) return false;

it->second.Interrupt();
return true;
}

bool StopIndex(BaseIndex* index)
{
if (!index) return false;
return g_running_indexes.erase(index);
}
44 changes: 44 additions & 0 deletions src/index/runner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#ifndef BITCOIN_INDEX_RUNNER_H
#define BITCOIN_INDEX_RUNNER_H

#include <thread>

#include <threadinterrupt.h>

class BaseIndex;

/**
* RAII interface for activating indexes to stay in sync with blockchain updates.
*/
class IndexRunner
{
private:
BaseIndex* m_index;

std::thread m_thread_sync;
CThreadInterrupt m_interrupt;

public:

/**
* Start initializes the sync state and registers the index as a ValidationInterface so that it
* stays in sync with blockchain updates.
*/
explicit IndexRunner(BaseIndex* index);

/** Stops the instance from staying in sync with blockchain updates. */
~IndexRunner();

/** Interrupts the sync thread if it is running. */
void Interrupt();
};

bool StartIndex(BaseIndex* index);
bool InterruptIndex(BaseIndex* index);
bool StopIndex(BaseIndex* index);

#endif // BITCOIN_INDEX_RUNNER_H
1 change: 1 addition & 0 deletions src/index/txindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <index/txindex.h>
#include <shutdown.h>
#include <txdb.h>
#include <ui_interface.h>
#include <util/system.h>
#include <validation.h>
Expand Down
1 change: 0 additions & 1 deletion src/index/txindex.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

#include <chain.h>
#include <index/base.h>
#include <txdb.h>

/**
* TxIndex is used to look up transactions included in the blockchain by hash.
Expand Down
9 changes: 4 additions & 5 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <httprpc.h>
#include <interfaces/chain.h>
#include <index/txindex.h>
#include <index/runner.h>
#include <key.h>
#include <validation.h>
#include <miner.h>
Expand Down Expand Up @@ -184,9 +185,7 @@ void Interrupt()
InterruptMapPort();
if (g_connman)
g_connman->Interrupt();
if (g_txindex) {
g_txindex->Interrupt();
}
InterruptIndex(g_txindex.get());
}

void Shutdown(InitInterfaces& interfaces)
Expand Down Expand Up @@ -217,7 +216,7 @@ void Shutdown(InitInterfaces& interfaces)
// using the other before destroying them.
if (peerLogic) UnregisterValidationInterface(peerLogic.get());
if (g_connman) g_connman->Stop();
if (g_txindex) g_txindex->Stop();
StopIndex(g_txindex.get());

StopTorControl();

Expand Down Expand Up @@ -1641,7 +1640,7 @@ bool AppInitMain(InitInterfaces& interfaces)
// ********************************************************* Step 8: start indexers
if (gArgs.GetBoolArg("-txindex", DEFAULT_TXINDEX)) {
g_txindex = MakeUnique<TxIndex>(nTxIndexCache, false, fReindex);
g_txindex->Start();
StartIndex(g_txindex.get());
}

// ********************************************************* Step 9: load wallet
Expand Down
Loading