Skip to content

Commit

Permalink
Merge #28960: kernel: Remove dependency on CScheduler
Browse files Browse the repository at this point in the history
d5228ef kernel: Remove dependency on CScheduler (TheCharlatan)
06069b3 scripted-diff: Rename MainSignals to ValidationSignals (TheCharlatan)
0d6d2b6 scripted-diff: Rename SingleThreadedSchedulerClient to SerialTaskRunner (TheCharlatan)
4abde2c [refactor] Make MainSignals RAII styled (TheCharlatan)
84f5c13 refactor: De-globalize g_signals (TheCharlatan)
473dd4b [refactor] Prepare for g_signals de-globalization (TheCharlatan)
3fba3d5 [refactor] Make signals optional in mempool and chainman (TheCharlatan)

Pull request description:

  By defining a virtual interface class for the scheduler client, users of the kernel can now define their own event consuming infrastructure, without having to spawn threads or rely on the scheduler design.

  Removing `CScheduler` also allows removing the thread and exception modules from the kernel library.

  To make the `CMainSignals` class easier to use from a kernel library perspective, remove its global instantiation and adopt RAII practices.

  Renames `CMainSignals` to `ValidationSignals`, which more accurately describes its purpose and scope.

  Also make the `ValidationSignals` in the `ChainstateManager` and CTxMemPool` optional. This could be useful in the future for using or testing these classes without having to instantiate any form of signal handling.

  ---

  This PR is part of the [libbitcoinkernel project](#27587). It improves the kernel API and removes two modules from the kernel library.

ACKs for top commit:
  maflcko:
    re-ACK d5228ef 🌄
  ryanofsky:
    Code review ACK d5228ef. Just comment change since last review.
  vasild:
    ACK d5228ef
  furszy:
    diff ACK d5228ef

Tree-SHA512: e93a5f10eb6182effb84bb981859a7ce750e466efd8171045d8d9e7fe46e4065631d9f6f533c5967c4d34c9bb7d7a67e9f4593bd4c5b30cd7b3bbad7be7b331b
  • Loading branch information
achow101 committed Mar 9, 2024
2 parents 1cd2e29 + d5228ef commit c07935b
Show file tree
Hide file tree
Showing 40 changed files with 365 additions and 289 deletions.
26 changes: 13 additions & 13 deletions contrib/devtools/test_deterministic_coverage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ NON_DETERMINISTIC_TESTS=(
"blockfilter_index_tests/blockfilter_index_initial_sync" # src/checkqueue.h: In CCheckQueue::Loop(): while (queue.empty()) { ... }
"coinselector_tests/knapsack_solver_test" # coinselector_tests.cpp: if (equal_sets(setCoinsRet, setCoinsRet2))
"fs_tests/fsbridge_fstream" # deterministic test failure?
"miner_tests/CreateNewBlock_validity" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"miner_tests/CreateNewBlock_validity" # validation.cpp: if (signals.CallbacksPending() > 10)
"scheduler_tests/manythreads" # scheduler.cpp: CScheduler::serviceQueue()
"scheduler_tests/singlethreadedscheduler_ordered" # scheduler.cpp: CScheduler::serviceQueue()
"txvalidationcache_tests/checkinputs_test" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"txvalidationcache_tests/tx_mempool_block_doublespend" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"txindex_tests/txindex_initial_sync" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"txvalidation_tests/tx_mempool_reject_coinbase" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"validation_block_tests/processnewblock_signals_ordering" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/coin_mark_dirty_immature_credit" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/dummy_input_size_test" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/importmulti_rescan" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/importwallet_rescan" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/ListCoins" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/scan_for_wallet_transactions" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/wallet_disableprivkeys" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"txvalidationcache_tests/checkinputs_test" # validation.cpp: if (signals.CallbacksPending() > 10)
"txvalidationcache_tests/tx_mempool_block_doublespend" # validation.cpp: if (signals.CallbacksPending() > 10)
"txindex_tests/txindex_initial_sync" # validation.cpp: if (signals.CallbacksPending() > 10)
"txvalidation_tests/tx_mempool_reject_coinbase" # validation.cpp: if (signals.CallbacksPending() > 10)
"validation_block_tests/processnewblock_signals_ordering" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/coin_mark_dirty_immature_credit" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/dummy_input_size_test" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/importmulti_rescan" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/importwallet_rescan" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/ListCoins" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/scan_for_wallet_transactions" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/wallet_disableprivkeys" # validation.cpp: if (signals.CallbacksPending() > 10)
)

TEST_BITCOIN_BINARY="src/test/test_bitcoin"
Expand Down
4 changes: 1 addition & 3 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ BITCOIN_CORE_H = \
util/spanparsing.h \
util/string.h \
util/syserror.h \
util/task_runner.h \
util/thread.h \
util/threadinterrupt.h \
util/threadnames.h \
Expand Down Expand Up @@ -966,7 +967,6 @@ libbitcoinkernel_la_SOURCES = \
pubkey.cpp \
random.cpp \
randomenv.cpp \
scheduler.cpp \
script/interpreter.cpp \
script/script.cpp \
script/script_error.cpp \
Expand All @@ -983,7 +983,6 @@ libbitcoinkernel_la_SOURCES = \
util/batchpriority.cpp \
util/chaintype.cpp \
util/check.cpp \
util/exception.cpp \
util/fs.cpp \
util/fs_helpers.cpp \
util/hasher.cpp \
Expand All @@ -994,7 +993,6 @@ libbitcoinkernel_la_SOURCES = \
util/strencodings.cpp \
util/string.cpp \
util/syserror.cpp \
util/thread.cpp \
util/threadnames.cpp \
util/time.cpp \
util/tokenpipe.cpp \
Expand Down
3 changes: 2 additions & 1 deletion src/bench/wallet_balance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ static void WalletBalance(benchmark::Bench& bench, const bool set_dirty, const b
generatetoaddress(test_setup->m_node, address_mine.value_or(ADDRESS_WATCHONLY));
generatetoaddress(test_setup->m_node, ADDRESS_WATCHONLY);
}
SyncWithValidationInterfaceQueue();
// Calls SyncWithValidationInterfaceQueue
wallet.chain().waitForNotificationsIfTipChanged(uint256::ZERO);

auto bal = GetBalance(wallet); // Cache

Expand Down
23 changes: 6 additions & 17 deletions src/bitcoin-chainstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
#include <node/caches.h>
#include <node/chainstate.h>
#include <random.h>
#include <scheduler.h>
#include <script/sigcache.h>
#include <util/chaintype.h>
#include <util/fs.h>
#include <util/thread.h>
#include <util/task_runner.h>
#include <validation.h>
#include <validationinterface.h>

Expand Down Expand Up @@ -68,16 +67,7 @@ int main(int argc, char* argv[])
Assert(InitSignatureCache(validation_cache_sizes.signature_cache_bytes));
Assert(InitScriptExecutionCache(validation_cache_sizes.script_execution_cache_bytes));


// SETUP: Scheduling and Background Signals
CScheduler scheduler{};
// Start the lightweight task scheduler thread
scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });

// Gather some entropy once per minute.
scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1});

GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
ValidationSignals validation_signals{std::make_unique<util::ImmediateTaskRunner>()};

class KernelNotifications : public kernel::Notifications
{
Expand Down Expand Up @@ -118,6 +108,7 @@ int main(int argc, char* argv[])
.chainparams = *chainparams,
.datadir = abs_datadir,
.notifications = *notifications,
.signals = &validation_signals,
};
const node::BlockManager::Options blockman_opts{
.chainparams = chainman_opts.chainparams,
Expand Down Expand Up @@ -235,9 +226,9 @@ int main(int argc, char* argv[])

bool new_block;
auto sc = std::make_shared<submitblock_StateCatcher>(block.GetHash());
RegisterSharedValidationInterface(sc);
validation_signals.RegisterSharedValidationInterface(sc);
bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block);
UnregisterSharedValidationInterface(sc);
validation_signals.UnregisterSharedValidationInterface(sc);
if (!new_block && accepted) {
std::cerr << "duplicate" << std::endl;
break;
Expand Down Expand Up @@ -287,10 +278,9 @@ int main(int argc, char* argv[])
epilogue:
// Without this precise shutdown sequence, there will be a lot of nullptr
// dereferencing and UB.
scheduler.stop();
if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join();

GetMainSignals().FlushBackgroundCallbacks();
validation_signals.FlushBackgroundCallbacks();
{
LOCK(cs_main);
for (Chainstate* chainstate : chainman.GetAll()) {
Expand All @@ -300,5 +290,4 @@ int main(int argc, char* argv[])
}
}
}
GetMainSignals().UnregisterBackgroundSignalScheduler();
}
8 changes: 5 additions & 3 deletions src/index/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ bool BaseIndex::Init()
return &m_chain->context()->chainman->GetChainstateForIndexing());
// Register to validation interface before setting the 'm_synced' flag, so that
// callbacks are not missed once m_synced is true.
RegisterValidationInterface(this);
m_chain->context()->validation_signals->RegisterValidationInterface(this);

CBlockLocator locator;
if (!GetDB().ReadBestBlock(locator)) {
Expand Down Expand Up @@ -380,7 +380,7 @@ bool BaseIndex::BlockUntilSyncedToCurrentChain() const
}

LogPrintf("%s: %s is catching up on block notifications\n", __func__, GetName());
SyncWithValidationInterfaceQueue();
m_chain->context()->validation_signals->SyncWithValidationInterfaceQueue();
return true;
}

Expand All @@ -399,7 +399,9 @@ bool BaseIndex::StartBackgroundSync()

void BaseIndex::Stop()
{
UnregisterValidationInterface(this);
if (m_chain->context()->validation_signals) {
m_chain->context()->validation_signals->UnregisterValidationInterface(this);
}

if (m_thread_sync.joinable()) {
m_thread_sync.join();
Expand Down
47 changes: 28 additions & 19 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ void Shutdown(NodeContext& node)

// Because these depend on each-other, we make sure that neither can be
// using the other before destroying them.
if (node.peerman) UnregisterValidationInterface(node.peerman.get());
if (node.peerman && node.validation_signals) node.validation_signals->UnregisterValidationInterface(node.peerman.get());
if (node.connman) node.connman->Stop();

StopTorControl();
Expand All @@ -317,7 +317,9 @@ void Shutdown(NodeContext& node)
// fee estimator from validation interface.
if (node.fee_estimator) {
node.fee_estimator->Flush();
UnregisterValidationInterface(node.fee_estimator.get());
if (node.validation_signals) {
node.validation_signals->UnregisterValidationInterface(node.fee_estimator.get());
}
}

// FlushStateToDisk generates a ChainStateFlushed callback, which we should avoid missing
Expand All @@ -332,7 +334,7 @@ void Shutdown(NodeContext& node)

// After there are no more peers/RPC left to give us new data which may generate
// CValidationInterface callbacks, flush them...
GetMainSignals().FlushBackgroundCallbacks();
if (node.validation_signals) node.validation_signals->FlushBackgroundCallbacks();

// Stop and delete all indexes only after flushing background callbacks.
if (g_txindex) {
Expand Down Expand Up @@ -367,17 +369,19 @@ void Shutdown(NodeContext& node)

#if ENABLE_ZMQ
if (g_zmq_notification_interface) {
UnregisterValidationInterface(g_zmq_notification_interface.get());
if (node.validation_signals) node.validation_signals->UnregisterValidationInterface(g_zmq_notification_interface.get());
g_zmq_notification_interface.reset();
}
#endif

node.chain_clients.clear();
UnregisterAllValidationInterfaces();
GetMainSignals().UnregisterBackgroundSignalScheduler();
if (node.validation_signals) {
node.validation_signals->UnregisterAllValidationInterfaces();
}
node.mempool.reset();
node.fee_estimator.reset();
node.chainman.reset();
node.validation_signals.reset();
node.scheduler.reset();
node.kernel.reset();

Expand Down Expand Up @@ -1138,17 +1142,18 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)

assert(!node.scheduler);
node.scheduler = std::make_unique<CScheduler>();
auto& scheduler = *node.scheduler;

// Start the lightweight task scheduler thread
node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { node.scheduler->serviceQueue(); });
scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });

// Gather some entropy once per minute.
node.scheduler->scheduleEvery([]{
scheduler.scheduleEvery([]{
RandAddPeriodic();
}, std::chrono::minutes{1});

// Check disk space every 5 minutes to avoid db corruption.
node.scheduler->scheduleEvery([&args, &node]{
scheduler.scheduleEvery([&args, &node]{
constexpr uint64_t min_disk_space = 50 << 20; // 50 MB
if (!CheckDiskSpace(args.GetBlocksDirPath(), min_disk_space)) {
LogPrintf("Shutting down due to lack of disk space!\n");
Expand All @@ -1158,7 +1163,9 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
}
}, std::chrono::minutes{5});

GetMainSignals().RegisterBackgroundSignalScheduler(*node.scheduler);
assert(!node.validation_signals);
node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(scheduler));
auto& validation_signals = *node.validation_signals;

// Create client interfaces for wallets that are supposed to be loaded
// according to -wallet and -disablewallet options. This only constructs
Expand Down Expand Up @@ -1263,8 +1270,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)

// Flush estimates to disk periodically
CBlockPolicyEstimator* fee_estimator = node.fee_estimator.get();
node.scheduler->scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL);
RegisterValidationInterface(fee_estimator);
scheduler.scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL);
validation_signals.RegisterValidationInterface(fee_estimator);
}

// Check port numbers
Expand Down Expand Up @@ -1435,7 +1442,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
});

if (g_zmq_notification_interface) {
RegisterValidationInterface(g_zmq_notification_interface.get());
validation_signals.RegisterValidationInterface(g_zmq_notification_interface.get());
}
#endif

Expand All @@ -1449,6 +1456,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
.chainparams = chainparams,
.datadir = args.GetDataDirNet(),
.notifications = *node.notifications,
.signals = &validation_signals,
};
Assert(ApplyArgsManOptions(args, chainman_opts)); // no error can happen, already checked in AppInitParameterInteraction

Expand Down Expand Up @@ -1478,6 +1486,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)

CTxMemPool::Options mempool_opts{
.check_ratio = chainparams.DefaultConsistencyChecks() ? 1 : 0,
.signals = &validation_signals,
};
auto result{ApplyArgsManOptions(args, chainparams, mempool_opts)};
if (!result) {
Expand Down Expand Up @@ -1505,7 +1514,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)

// Drain the validation interface queue to ensure that the old indexes
// don't have any pending work.
SyncWithValidationInterfaceQueue();
Assert(node.validation_signals)->SyncWithValidationInterfaceQueue();

for (auto* index : node.indexes) {
index->Interrupt();
Expand Down Expand Up @@ -1594,7 +1603,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
node.peerman = PeerManager::make(*node.connman, *node.addrman,
node.banman.get(), chainman,
*node.mempool, peerman_opts);
RegisterValidationInterface(node.peerman.get());
validation_signals.RegisterValidationInterface(node.peerman.get());

// ********************************************************* Step 8: start indexers

Expand Down Expand Up @@ -1900,7 +1909,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)

connOptions.m_i2p_accept_incoming = args.GetBoolArg("-i2pacceptincoming", DEFAULT_I2P_ACCEPT_INCOMING);

if (!node.connman->Start(*node.scheduler, connOptions)) {
if (!node.connman->Start(scheduler, connOptions)) {
return false;
}

Expand All @@ -1920,15 +1929,15 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
uiInterface.InitMessage(_("Done loading").translated);

for (const auto& client : node.chain_clients) {
client->start(*node.scheduler);
client->start(scheduler);
}

BanMan* banman = node.banman.get();
node.scheduler->scheduleEvery([banman]{
scheduler.scheduleEvery([banman]{
banman->DumpBanlist();
}, DUMP_BANS_INTERVAL);

if (node.peerman) node.peerman->StartScheduledTasks(*node.scheduler);
if (node.peerman) node.peerman->StartScheduledTasks(scheduler);

#if HAVE_SYSTEM
StartupNotify(args);
Expand Down
2 changes: 2 additions & 0 deletions src/kernel/chainstatemanager_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <optional>

class CChainParams;
class ValidationSignals;

static constexpr bool DEFAULT_CHECKPOINTS_ENABLED{true};
static constexpr auto DEFAULT_MAX_TIP_AGE{24h};
Expand All @@ -44,6 +45,7 @@ struct ChainstateManagerOpts {
DBOptions coins_db{};
CoinsViewOptions coins_view{};
Notifications& notifications;
ValidationSignals* signals{nullptr};
//! Number of script check worker threads. Zero means no parallel verification.
int worker_threads_num{0};
};
Expand Down
4 changes: 4 additions & 0 deletions src/kernel/mempool_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <cstdint>
#include <optional>

class ValidationSignals;

/** Default for -maxmempool, maximum megabytes of mempool memory usage */
static constexpr unsigned int DEFAULT_MAX_MEMPOOL_SIZE_MB{300};
/** Default for -maxmempool when blocksonly is set */
Expand Down Expand Up @@ -56,6 +58,8 @@ struct MemPoolOptions {
bool full_rbf{DEFAULT_MEMPOOL_FULL_RBF};
bool persist_v1_dat{DEFAULT_PERSIST_V1_DAT};
MemPoolLimits limits{};

ValidationSignals* signals{nullptr};
};
} // namespace kernel

Expand Down
4 changes: 4 additions & 0 deletions src/node/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class BanMan;
class BaseIndex;
class CBlockPolicyEstimator;
class CConnman;
class ValidationSignals;
class CScheduler;
class CTxMemPool;
class ChainstateManager;
Expand Down Expand Up @@ -70,7 +71,10 @@ struct NodeContext {
interfaces::WalletLoader* wallet_loader{nullptr};
std::unique_ptr<CScheduler> scheduler;
std::function<void()> rpc_interruption_point = [] {};
//! Issues blocking calls about sync status, errors and warnings
std::unique_ptr<KernelNotifications> notifications;
//! Issues calls about blocks and transactions
std::unique_ptr<ValidationSignals> validation_signals;
std::atomic<int> exit_status{EXIT_SUCCESS};

//! Declare default constructor and destructor that are not inline, so code
Expand Down

0 comments on commit c07935b

Please sign in to comment.