Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kernel: Remove dependency on CScheduler #28960

Merged
merged 7 commits into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 13 additions & 13 deletions contrib/devtools/test_deterministic_coverage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ NON_DETERMINISTIC_TESTS=(
"blockfilter_index_tests/blockfilter_index_initial_sync" # src/checkqueue.h: In CCheckQueue::Loop(): while (queue.empty()) { ... }
"coinselector_tests/knapsack_solver_test" # coinselector_tests.cpp: if (equal_sets(setCoinsRet, setCoinsRet2))
"fs_tests/fsbridge_fstream" # deterministic test failure?
"miner_tests/CreateNewBlock_validity" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"miner_tests/CreateNewBlock_validity" # validation.cpp: if (signals.CallbacksPending() > 10)
"scheduler_tests/manythreads" # scheduler.cpp: CScheduler::serviceQueue()
"scheduler_tests/singlethreadedscheduler_ordered" # scheduler.cpp: CScheduler::serviceQueue()
"txvalidationcache_tests/checkinputs_test" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"txvalidationcache_tests/tx_mempool_block_doublespend" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"txindex_tests/txindex_initial_sync" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"txvalidation_tests/tx_mempool_reject_coinbase" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"validation_block_tests/processnewblock_signals_ordering" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/coin_mark_dirty_immature_credit" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/dummy_input_size_test" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/importmulti_rescan" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/importwallet_rescan" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/ListCoins" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/scan_for_wallet_transactions" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"wallet_tests/wallet_disableprivkeys" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
"txvalidationcache_tests/checkinputs_test" # validation.cpp: if (signals.CallbacksPending() > 10)
"txvalidationcache_tests/tx_mempool_block_doublespend" # validation.cpp: if (signals.CallbacksPending() > 10)
"txindex_tests/txindex_initial_sync" # validation.cpp: if (signals.CallbacksPending() > 10)
"txvalidation_tests/tx_mempool_reject_coinbase" # validation.cpp: if (signals.CallbacksPending() > 10)
"validation_block_tests/processnewblock_signals_ordering" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/coin_mark_dirty_immature_credit" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/dummy_input_size_test" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/importmulti_rescan" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/importwallet_rescan" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/ListCoins" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/scan_for_wallet_transactions" # validation.cpp: if (signals.CallbacksPending() > 10)
"wallet_tests/wallet_disableprivkeys" # validation.cpp: if (signals.CallbacksPending() > 10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be a nice follow-up to remove the non-deterministic scheduler from all tests, unless they are explicitly testing the scheduler.

cc @aureleoules , who was asking for a solution to this for the corecheck infra

)

TEST_BITCOIN_BINARY="src/test/test_bitcoin"
Expand Down
4 changes: 1 addition & 3 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ BITCOIN_CORE_H = \
util/spanparsing.h \
util/string.h \
util/syserror.h \
util/task_runner.h \
util/thread.h \
util/threadinterrupt.h \
util/threadnames.h \
Expand Down Expand Up @@ -975,7 +976,6 @@ libbitcoinkernel_la_SOURCES = \
pubkey.cpp \
random.cpp \
randomenv.cpp \
scheduler.cpp \
script/interpreter.cpp \
script/script.cpp \
script/script_error.cpp \
Expand All @@ -992,7 +992,6 @@ libbitcoinkernel_la_SOURCES = \
util/batchpriority.cpp \
util/chaintype.cpp \
util/check.cpp \
util/exception.cpp \
util/fs.cpp \
util/fs_helpers.cpp \
util/hasher.cpp \
Expand All @@ -1003,7 +1002,6 @@ libbitcoinkernel_la_SOURCES = \
util/strencodings.cpp \
util/string.cpp \
util/syserror.cpp \
util/thread.cpp \
util/threadnames.cpp \
util/time.cpp \
util/tokenpipe.cpp \
Expand Down
3 changes: 2 additions & 1 deletion src/bench/wallet_balance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ static void WalletBalance(benchmark::Bench& bench, const bool set_dirty, const b
generatetoaddress(test_setup->m_node, address_mine.value_or(ADDRESS_WATCHONLY));
generatetoaddress(test_setup->m_node, ADDRESS_WATCHONLY);
}
SyncWithValidationInterfaceQueue();
// Calls SyncWithValidationInterfaceQueue
wallet.chain().waitForNotificationsIfTipChanged(uint256::ZERO);

auto bal = GetBalance(wallet); // Cache

Expand Down
23 changes: 6 additions & 17 deletions src/bitcoin-chainstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
#include <node/caches.h>
#include <node/chainstate.h>
#include <random.h>
#include <scheduler.h>
#include <script/sigcache.h>
#include <util/chaintype.h>
#include <util/fs.h>
#include <util/thread.h>
#include <util/task_runner.h>
#include <validation.h>
#include <validationinterface.h>

Expand Down Expand Up @@ -68,16 +67,7 @@ int main(int argc, char* argv[])
Assert(InitSignatureCache(validation_cache_sizes.signature_cache_bytes));
Assert(InitScriptExecutionCache(validation_cache_sizes.script_execution_cache_bytes));


// SETUP: Scheduling and Background Signals
CScheduler scheduler{};
// Start the lightweight task scheduler thread
scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });

// Gather some entropy once per minute.
scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d63b2e8

I guess this periodic call was originally included to demonstrate how you might use the scheduler from the libbitcoinkernel interface, but now that we've removed the scheduler from the interface there isn't really an equivalent. Clients would be expected to run their own async event managers and periodic schedulers, which makes sense - and I guess is the whole point of this change!


GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
ValidationSignals validation_signals{std::make_unique<util::ImmediateTaskRunner>()};

class KernelNotifications : public kernel::Notifications
{
Expand Down Expand Up @@ -118,6 +108,7 @@ int main(int argc, char* argv[])
.chainparams = *chainparams,
.datadir = abs_datadir,
.notifications = *notifications,
.signals = &validation_signals,
};
const node::BlockManager::Options blockman_opts{
.chainparams = chainman_opts.chainparams,
Expand Down Expand Up @@ -235,9 +226,9 @@ int main(int argc, char* argv[])

bool new_block;
auto sc = std::make_shared<submitblock_StateCatcher>(block.GetHash());
RegisterSharedValidationInterface(sc);
validation_signals.RegisterSharedValidationInterface(sc);
bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block);
UnregisterSharedValidationInterface(sc);
validation_signals.UnregisterSharedValidationInterface(sc);
if (!new_block && accepted) {
std::cerr << "duplicate" << std::endl;
break;
Expand Down Expand Up @@ -287,10 +278,9 @@ int main(int argc, char* argv[])
epilogue:
// Without this precise shutdown sequence, there will be a lot of nullptr
// dereferencing and UB.
scheduler.stop();
if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join();

GetMainSignals().FlushBackgroundCallbacks();
validation_signals.FlushBackgroundCallbacks();
{
LOCK(cs_main);
for (Chainstate* chainstate : chainman.GetAll()) {
Expand All @@ -300,5 +290,4 @@ int main(int argc, char* argv[])
}
}
}
GetMainSignals().UnregisterBackgroundSignalScheduler();
}
8 changes: 5 additions & 3 deletions src/index/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ bool BaseIndex::Init()
return &m_chain->context()->chainman->GetChainstateForIndexing());
// Register to validation interface before setting the 'm_synced' flag, so that
// callbacks are not missed once m_synced is true.
RegisterValidationInterface(this);
m_chain->context()->validation_signals->RegisterValidationInterface(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit "[refactor] De-globalize ValidationSignals" (6ffa572)

Note: This is ugly, but #24230 replaces it with m_chain->attachChain(). Other changes to indexing code in this PR, which also don't look great, have similar replacements.


CBlockLocator locator;
if (!GetDB().ReadBestBlock(locator)) {
Expand Down Expand Up @@ -380,7 +380,7 @@ bool BaseIndex::BlockUntilSyncedToCurrentChain() const
}

LogPrintf("%s: %s is catching up on block notifications\n", __func__, GetName());
SyncWithValidationInterfaceQueue();
m_chain->context()->validation_signals->SyncWithValidationInterfaceQueue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elsewhere (including from BaseIndex::Stop()) there is a check whether validation_signals is null, which I think is missing here and in BaseIndex::Init().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comments here for the reasoning. Basically we want to guard in the shutdown (Stop) case and it is less important to do so in the rest, since we have tests heavily exercising those paths. Question is also on which de-reference we should be guarding and what to do about the existing unguarded lines that call m_chain and its members.

return true;
}

Expand All @@ -399,7 +399,9 @@ bool BaseIndex::StartBackgroundSync()

void BaseIndex::Stop()
{
UnregisterValidationInterface(this);
if (m_chain->context()->validation_signals) {
m_chain->context()->validation_signals->UnregisterValidationInterface(this);
}

if (m_thread_sync.joinable()) {
m_thread_sync.join();
Expand Down
47 changes: 28 additions & 19 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ void Shutdown(NodeContext& node)

// Because these depend on each-other, we make sure that neither can be
// using the other before destroying them.
if (node.peerman) UnregisterValidationInterface(node.peerman.get());
if (node.peerman && node.validation_signals) node.validation_signals->UnregisterValidationInterface(node.peerman.get());
if (node.connman) node.connman->Stop();

StopTorControl();
Expand All @@ -317,7 +317,9 @@ void Shutdown(NodeContext& node)
// fee estimator from validation interface.
if (node.fee_estimator) {
node.fee_estimator->Flush();
UnregisterValidationInterface(node.fee_estimator.get());
if (node.validation_signals) {
node.validation_signals->UnregisterValidationInterface(node.fee_estimator.get());
}
}

// FlushStateToDisk generates a ChainStateFlushed callback, which we should avoid missing
Expand All @@ -332,7 +334,7 @@ void Shutdown(NodeContext& node)

// After there are no more peers/RPC left to give us new data which may generate
// CValidationInterface callbacks, flush them...
GetMainSignals().FlushBackgroundCallbacks();
if (node.validation_signals) node.validation_signals->FlushBackgroundCallbacks();

// Stop and delete all indexes only after flushing background callbacks.
if (g_txindex) {
Expand Down Expand Up @@ -367,17 +369,19 @@ void Shutdown(NodeContext& node)

#if ENABLE_ZMQ
if (g_zmq_notification_interface) {
UnregisterValidationInterface(g_zmq_notification_interface.get());
if (node.validation_signals) node.validation_signals->UnregisterValidationInterface(g_zmq_notification_interface.get());
g_zmq_notification_interface.reset();
}
#endif

node.chain_clients.clear();
UnregisterAllValidationInterfaces();
GetMainSignals().UnregisterBackgroundSignalScheduler();
if (node.validation_signals) {
node.validation_signals->UnregisterAllValidationInterfaces();
}
node.mempool.reset();
node.fee_estimator.reset();
node.chainman.reset();
node.validation_signals.reset();
node.scheduler.reset();
node.kernel.reset();

Expand Down Expand Up @@ -1138,17 +1142,18 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)

assert(!node.scheduler);
node.scheduler = std::make_unique<CScheduler>();
auto& scheduler = *node.scheduler;

// Start the lightweight task scheduler thread
node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { node.scheduler->serviceQueue(); });
ryanofsky marked this conversation as resolved.
Show resolved Hide resolved
scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });

// Gather some entropy once per minute.
node.scheduler->scheduleEvery([]{
scheduler.scheduleEvery([]{
RandAddPeriodic();
}, std::chrono::minutes{1});

// Check disk space every 5 minutes to avoid db corruption.
node.scheduler->scheduleEvery([&args, &node]{
scheduler.scheduleEvery([&args, &node]{
constexpr uint64_t min_disk_space = 50 << 20; // 50 MB
if (!CheckDiskSpace(args.GetBlocksDirPath(), min_disk_space)) {
LogPrintf("Shutting down due to lack of disk space!\n");
Expand All @@ -1158,7 +1163,9 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
}
}, std::chrono::minutes{5});

GetMainSignals().RegisterBackgroundSignalScheduler(*node.scheduler);
assert(!node.validation_signals);
node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(scheduler));
auto& validation_signals = *node.validation_signals;
Comment on lines +1166 to +1168
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we had a global g_signals and code that wished to access it called GetMainSignals(). This was safe wrt lifetime of the object.

With this PR it lives in NodeContext::validation_signals as std::unique_ptr<ValidationSignals>. Also (for covenience?) the object being pointed to by the unique_ptr is extracted from the unique_ptr and saved as a raw pointer in ChainstateManager::Options::signals and CTxMemPool::Options::signals. This defeats the idea of unique_ptr because after it goes out of scope and destroys the object, the raw pointers will still exist and point to garbage. Even if not possible with the current code, this pattern better be avoided.

It would be better to store shared_ptr<ValidationSignals> in all 3 places because that will eliminate any lifetime dependencies - e.g. object A must be destroyed before object B.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pointers in the options and in CTxMemPool are supposed to be optional values (see the first commit message). Using a raw pointer for optional non-owned values seems like an acceptable pattern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can have optional stuff with shared_ptr as well (if it is empty).


// Create client interfaces for wallets that are supposed to be loaded
// according to -wallet and -disablewallet options. This only constructs
Expand Down Expand Up @@ -1263,8 +1270,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)

// Flush estimates to disk periodically
CBlockPolicyEstimator* fee_estimator = node.fee_estimator.get();
node.scheduler->scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL);
RegisterValidationInterface(fee_estimator);
scheduler.scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL);
validation_signals.RegisterValidationInterface(fee_estimator);
}

// Check port numbers
Expand Down Expand Up @@ -1435,7 +1442,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
});

if (g_zmq_notification_interface) {
RegisterValidationInterface(g_zmq_notification_interface.get());
validation_signals.RegisterValidationInterface(g_zmq_notification_interface.get());
}
#endif

Expand All @@ -1449,6 +1456,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
.chainparams = chainparams,
.datadir = args.GetDataDirNet(),
.notifications = *node.notifications,
.signals = &validation_signals,
};
Assert(ApplyArgsManOptions(args, chainman_opts)); // no error can happen, already checked in AppInitParameterInteraction

Expand Down Expand Up @@ -1478,6 +1486,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)

CTxMemPool::Options mempool_opts{
.check_ratio = chainparams.DefaultConsistencyChecks() ? 1 : 0,
.signals = &validation_signals,
};
auto result{ApplyArgsManOptions(args, chainparams, mempool_opts)};
if (!result) {
Expand Down Expand Up @@ -1505,7 +1514,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)

// Drain the validation interface queue to ensure that the old indexes
// don't have any pending work.
SyncWithValidationInterfaceQueue();
Assert(node.validation_signals)->SyncWithValidationInterfaceQueue();
ryanofsky marked this conversation as resolved.
Show resolved Hide resolved

for (auto* index : node.indexes) {
index->Interrupt();
Expand Down Expand Up @@ -1594,7 +1603,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
node.peerman = PeerManager::make(*node.connman, *node.addrman,
node.banman.get(), chainman,
*node.mempool, peerman_opts);
RegisterValidationInterface(node.peerman.get());
validation_signals.RegisterValidationInterface(node.peerman.get());

// ********************************************************* Step 8: start indexers

Expand Down Expand Up @@ -1900,7 +1909,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)

connOptions.m_i2p_accept_incoming = args.GetBoolArg("-i2pacceptincoming", DEFAULT_I2P_ACCEPT_INCOMING);

if (!node.connman->Start(*node.scheduler, connOptions)) {
if (!node.connman->Start(scheduler, connOptions)) {
return false;
}

Expand All @@ -1920,15 +1929,15 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
uiInterface.InitMessage(_("Done loading").translated);

for (const auto& client : node.chain_clients) {
client->start(*node.scheduler);
client->start(scheduler);
}

BanMan* banman = node.banman.get();
node.scheduler->scheduleEvery([banman]{
scheduler.scheduleEvery([banman]{
banman->DumpBanlist();
}, DUMP_BANS_INTERVAL);

if (node.peerman) node.peerman->StartScheduledTasks(*node.scheduler);
if (node.peerman) node.peerman->StartScheduledTasks(scheduler);

#if HAVE_SYSTEM
StartupNotify(args);
Expand Down
2 changes: 2 additions & 0 deletions src/kernel/chainstatemanager_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <optional>

class CChainParams;
class ValidationSignals;

static constexpr bool DEFAULT_CHECKPOINTS_ENABLED{true};
static constexpr auto DEFAULT_MAX_TIP_AGE{24h};
Expand All @@ -44,6 +45,7 @@ struct ChainstateManagerOpts {
DBOptions coins_db{};
CoinsViewOptions coins_view{};
Notifications& notifications;
ValidationSignals* signals{nullptr};
//! Number of script check worker threads. Zero means no parallel verification.
int worker_threads_num{0};
};
Expand Down
4 changes: 4 additions & 0 deletions src/kernel/mempool_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <cstdint>
#include <optional>

class ValidationSignals;

/** Default for -maxmempool, maximum megabytes of mempool memory usage */
static constexpr unsigned int DEFAULT_MAX_MEMPOOL_SIZE_MB{300};
/** Default for -maxmempool when blocksonly is set */
Expand Down Expand Up @@ -56,6 +58,8 @@ struct MemPoolOptions {
bool full_rbf{DEFAULT_MEMPOOL_FULL_RBF};
bool persist_v1_dat{DEFAULT_PERSIST_V1_DAT};
MemPoolLimits limits{};

ValidationSignals* signals{nullptr};
};
} // namespace kernel

Expand Down
4 changes: 4 additions & 0 deletions src/node/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class BanMan;
class BaseIndex;
class CBlockPolicyEstimator;
class CConnman;
class ValidationSignals;
class CScheduler;
class CTxMemPool;
class ChainstateManager;
Expand Down Expand Up @@ -70,7 +71,10 @@ struct NodeContext {
interfaces::WalletLoader* wallet_loader{nullptr};
std::unique_ptr<CScheduler> scheduler;
std::function<void()> rpc_interruption_point = [] {};
//! Issues blocking calls about sync status, errors and warnings
std::unique_ptr<KernelNotifications> notifications;
//! Issues calls about blocks and transactions
std::unique_ptr<ValidationSignals> validation_signals;
TheCharlatan marked this conversation as resolved.
Show resolved Hide resolved
std::atomic<int> exit_status{EXIT_SUCCESS};

//! Declare default constructor and destructor that are not inline, so code
Expand Down