-
Notifications
You must be signed in to change notification settings - Fork 35.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kernel: Remove dependency on CScheduler #28960
Changes from all commits
3fba3d5
473dd4b
84f5c13
4abde2c
0d6d2b6
06069b3
d5228ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,11 +23,10 @@ | |
#include <node/caches.h> | ||
#include <node/chainstate.h> | ||
#include <random.h> | ||
#include <scheduler.h> | ||
#include <script/sigcache.h> | ||
#include <util/chaintype.h> | ||
#include <util/fs.h> | ||
#include <util/thread.h> | ||
#include <util/task_runner.h> | ||
#include <validation.h> | ||
#include <validationinterface.h> | ||
|
||
|
@@ -68,16 +67,7 @@ int main(int argc, char* argv[]) | |
Assert(InitSignatureCache(validation_cache_sizes.signature_cache_bytes)); | ||
Assert(InitScriptExecutionCache(validation_cache_sizes.script_execution_cache_bytes)); | ||
|
||
|
||
// SETUP: Scheduling and Background Signals | ||
CScheduler scheduler{}; | ||
// Start the lightweight task scheduler thread | ||
scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); }); | ||
|
||
// Gather some entropy once per minute. | ||
scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess this periodic call was originally included to demonstrate how you might use the scheduler from the libbitcoinkernel interface, but now that we've removed the scheduler from the interface there isn't really an equivalent. Clients would be expected to run their own async event managers and periodic schedulers, which makes sense - and I guess is the whole point of this change! |
||
|
||
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler); | ||
ValidationSignals validation_signals{std::make_unique<util::ImmediateTaskRunner>()}; | ||
|
||
class KernelNotifications : public kernel::Notifications | ||
{ | ||
|
@@ -118,6 +108,7 @@ int main(int argc, char* argv[]) | |
.chainparams = *chainparams, | ||
.datadir = abs_datadir, | ||
.notifications = *notifications, | ||
.signals = &validation_signals, | ||
}; | ||
const node::BlockManager::Options blockman_opts{ | ||
.chainparams = chainman_opts.chainparams, | ||
|
@@ -235,9 +226,9 @@ int main(int argc, char* argv[]) | |
|
||
bool new_block; | ||
auto sc = std::make_shared<submitblock_StateCatcher>(block.GetHash()); | ||
RegisterSharedValidationInterface(sc); | ||
validation_signals.RegisterSharedValidationInterface(sc); | ||
bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block); | ||
UnregisterSharedValidationInterface(sc); | ||
validation_signals.UnregisterSharedValidationInterface(sc); | ||
if (!new_block && accepted) { | ||
std::cerr << "duplicate" << std::endl; | ||
break; | ||
|
@@ -287,10 +278,9 @@ int main(int argc, char* argv[]) | |
epilogue: | ||
// Without this precise shutdown sequence, there will be a lot of nullptr | ||
// dereferencing and UB. | ||
scheduler.stop(); | ||
if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join(); | ||
|
||
GetMainSignals().FlushBackgroundCallbacks(); | ||
validation_signals.FlushBackgroundCallbacks(); | ||
{ | ||
LOCK(cs_main); | ||
for (Chainstate* chainstate : chainman.GetAll()) { | ||
|
@@ -300,5 +290,4 @@ int main(int argc, char* argv[]) | |
} | ||
} | ||
} | ||
GetMainSignals().UnregisterBackgroundSignalScheduler(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,7 +89,7 @@ bool BaseIndex::Init() | |
return &m_chain->context()->chainman->GetChainstateForIndexing()); | ||
// Register to validation interface before setting the 'm_synced' flag, so that | ||
// callbacks are not missed once m_synced is true. | ||
RegisterValidationInterface(this); | ||
m_chain->context()->validation_signals->RegisterValidationInterface(this); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
CBlockLocator locator; | ||
if (!GetDB().ReadBestBlock(locator)) { | ||
|
@@ -380,7 +380,7 @@ bool BaseIndex::BlockUntilSyncedToCurrentChain() const | |
} | ||
|
||
LogPrintf("%s: %s is catching up on block notifications\n", __func__, GetName()); | ||
SyncWithValidationInterfaceQueue(); | ||
m_chain->context()->validation_signals->SyncWithValidationInterfaceQueue(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Elsewhere (including from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See the comments here for the reasoning. Basically we want to guard in the shutdown ( |
||
return true; | ||
} | ||
|
||
|
@@ -399,7 +399,9 @@ bool BaseIndex::StartBackgroundSync() | |
|
||
void BaseIndex::Stop() | ||
{ | ||
UnregisterValidationInterface(this); | ||
if (m_chain->context()->validation_signals) { | ||
m_chain->context()->validation_signals->UnregisterValidationInterface(this); | ||
} | ||
|
||
if (m_thread_sync.joinable()) { | ||
m_thread_sync.join(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -291,7 +291,7 @@ void Shutdown(NodeContext& node) | |
|
||
// Because these depend on each-other, we make sure that neither can be | ||
// using the other before destroying them. | ||
if (node.peerman) UnregisterValidationInterface(node.peerman.get()); | ||
if (node.peerman && node.validation_signals) node.validation_signals->UnregisterValidationInterface(node.peerman.get()); | ||
if (node.connman) node.connman->Stop(); | ||
|
||
StopTorControl(); | ||
|
@@ -317,7 +317,9 @@ void Shutdown(NodeContext& node) | |
// fee estimator from validation interface. | ||
if (node.fee_estimator) { | ||
node.fee_estimator->Flush(); | ||
UnregisterValidationInterface(node.fee_estimator.get()); | ||
if (node.validation_signals) { | ||
node.validation_signals->UnregisterValidationInterface(node.fee_estimator.get()); | ||
} | ||
} | ||
|
||
// FlushStateToDisk generates a ChainStateFlushed callback, which we should avoid missing | ||
|
@@ -332,7 +334,7 @@ void Shutdown(NodeContext& node) | |
|
||
// After there are no more peers/RPC left to give us new data which may generate | ||
// CValidationInterface callbacks, flush them... | ||
GetMainSignals().FlushBackgroundCallbacks(); | ||
if (node.validation_signals) node.validation_signals->FlushBackgroundCallbacks(); | ||
|
||
// Stop and delete all indexes only after flushing background callbacks. | ||
if (g_txindex) { | ||
|
@@ -367,17 +369,19 @@ void Shutdown(NodeContext& node) | |
|
||
#if ENABLE_ZMQ | ||
if (g_zmq_notification_interface) { | ||
UnregisterValidationInterface(g_zmq_notification_interface.get()); | ||
if (node.validation_signals) node.validation_signals->UnregisterValidationInterface(g_zmq_notification_interface.get()); | ||
g_zmq_notification_interface.reset(); | ||
} | ||
#endif | ||
|
||
node.chain_clients.clear(); | ||
UnregisterAllValidationInterfaces(); | ||
GetMainSignals().UnregisterBackgroundSignalScheduler(); | ||
if (node.validation_signals) { | ||
node.validation_signals->UnregisterAllValidationInterfaces(); | ||
} | ||
node.mempool.reset(); | ||
node.fee_estimator.reset(); | ||
node.chainman.reset(); | ||
node.validation_signals.reset(); | ||
node.scheduler.reset(); | ||
node.kernel.reset(); | ||
|
||
|
@@ -1138,17 +1142,18 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) | |
|
||
assert(!node.scheduler); | ||
node.scheduler = std::make_unique<CScheduler>(); | ||
auto& scheduler = *node.scheduler; | ||
|
||
// Start the lightweight task scheduler thread | ||
node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { node.scheduler->serviceQueue(); }); | ||
ryanofsky marked this conversation as resolved.
Show resolved
Hide resolved
|
||
scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); }); | ||
|
||
// Gather some entropy once per minute. | ||
node.scheduler->scheduleEvery([]{ | ||
scheduler.scheduleEvery([]{ | ||
RandAddPeriodic(); | ||
}, std::chrono::minutes{1}); | ||
|
||
// Check disk space every 5 minutes to avoid db corruption. | ||
node.scheduler->scheduleEvery([&args, &node]{ | ||
scheduler.scheduleEvery([&args, &node]{ | ||
constexpr uint64_t min_disk_space = 50 << 20; // 50 MB | ||
if (!CheckDiskSpace(args.GetBlocksDirPath(), min_disk_space)) { | ||
LogPrintf("Shutting down due to lack of disk space!\n"); | ||
|
@@ -1158,7 +1163,9 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) | |
} | ||
}, std::chrono::minutes{5}); | ||
|
||
GetMainSignals().RegisterBackgroundSignalScheduler(*node.scheduler); | ||
assert(!node.validation_signals); | ||
node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(scheduler)); | ||
auto& validation_signals = *node.validation_signals; | ||
Comment on lines
+1166
to
+1168
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before we had a global With this PR it lives in It would be better to store There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pointers in the options and in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can have optional stuff with |
||
|
||
// Create client interfaces for wallets that are supposed to be loaded | ||
// according to -wallet and -disablewallet options. This only constructs | ||
|
@@ -1263,8 +1270,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) | |
|
||
// Flush estimates to disk periodically | ||
CBlockPolicyEstimator* fee_estimator = node.fee_estimator.get(); | ||
node.scheduler->scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL); | ||
RegisterValidationInterface(fee_estimator); | ||
scheduler.scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL); | ||
validation_signals.RegisterValidationInterface(fee_estimator); | ||
} | ||
|
||
// Check port numbers | ||
|
@@ -1435,7 +1442,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) | |
}); | ||
|
||
if (g_zmq_notification_interface) { | ||
RegisterValidationInterface(g_zmq_notification_interface.get()); | ||
validation_signals.RegisterValidationInterface(g_zmq_notification_interface.get()); | ||
} | ||
#endif | ||
|
||
|
@@ -1449,6 +1456,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) | |
.chainparams = chainparams, | ||
.datadir = args.GetDataDirNet(), | ||
.notifications = *node.notifications, | ||
.signals = &validation_signals, | ||
}; | ||
Assert(ApplyArgsManOptions(args, chainman_opts)); // no error can happen, already checked in AppInitParameterInteraction | ||
|
||
|
@@ -1478,6 +1486,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) | |
|
||
CTxMemPool::Options mempool_opts{ | ||
.check_ratio = chainparams.DefaultConsistencyChecks() ? 1 : 0, | ||
.signals = &validation_signals, | ||
}; | ||
auto result{ApplyArgsManOptions(args, chainparams, mempool_opts)}; | ||
if (!result) { | ||
|
@@ -1505,7 +1514,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) | |
|
||
// Drain the validation interface queue to ensure that the old indexes | ||
// don't have any pending work. | ||
SyncWithValidationInterfaceQueue(); | ||
Assert(node.validation_signals)->SyncWithValidationInterfaceQueue(); | ||
ryanofsky marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
for (auto* index : node.indexes) { | ||
index->Interrupt(); | ||
|
@@ -1594,7 +1603,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) | |
node.peerman = PeerManager::make(*node.connman, *node.addrman, | ||
node.banman.get(), chainman, | ||
*node.mempool, peerman_opts); | ||
RegisterValidationInterface(node.peerman.get()); | ||
validation_signals.RegisterValidationInterface(node.peerman.get()); | ||
|
||
// ********************************************************* Step 8: start indexers | ||
|
||
|
@@ -1900,7 +1909,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) | |
|
||
connOptions.m_i2p_accept_incoming = args.GetBoolArg("-i2pacceptincoming", DEFAULT_I2P_ACCEPT_INCOMING); | ||
|
||
if (!node.connman->Start(*node.scheduler, connOptions)) { | ||
if (!node.connman->Start(scheduler, connOptions)) { | ||
return false; | ||
} | ||
|
||
|
@@ -1920,15 +1929,15 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) | |
uiInterface.InitMessage(_("Done loading").translated); | ||
|
||
for (const auto& client : node.chain_clients) { | ||
client->start(*node.scheduler); | ||
client->start(scheduler); | ||
} | ||
|
||
BanMan* banman = node.banman.get(); | ||
node.scheduler->scheduleEvery([banman]{ | ||
scheduler.scheduleEvery([banman]{ | ||
banman->DumpBanlist(); | ||
}, DUMP_BANS_INTERVAL); | ||
|
||
if (node.peerman) node.peerman->StartScheduledTasks(*node.scheduler); | ||
if (node.peerman) node.peerman->StartScheduledTasks(scheduler); | ||
|
||
#if HAVE_SYSTEM | ||
StartupNotify(args); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be a nice follow-up to remove the non-deterministic scheduler from all tests, unless they are explicitly testing the scheduler.
cc @aureleoules , who was asking for a solution to this for the corecheck infra