Skip to content

Commit

Permalink
Be explicit about RPA CoTask lifecycle; don't use thread_local
Browse files Browse the repository at this point in the history
Use of thread_local CoTask to parallelize RPA indexing was added
by a recent commit: e4b01d7

The situation is fragile and there is potential for UB here due to
static initialization hell.

Instead, we explicitly create the CoTask in the DownloadBlocksTask
thread and it lives as a DownloadBlocksTask member variable, which
is more correct and less bugprone.
  • Loading branch information
cculianu committed Jun 8, 2024
1 parent 6bfae61 commit 10623d7
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 20 deletions.
17 changes: 4 additions & 13 deletions src/BlockProc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
/* static */ const TxHash PreProcessedBlock::nullhash;

/// fill this struct's data with all the txdata, etc from a bitcoin CBlock. Alternative to using the second c'tor.
void PreProcessedBlock::fill(BlockHeight blockHeight, size_t blockSize, const bitcoin::CBlock &b, const bool enableRpa) {
void PreProcessedBlock::fill(BlockHeight blockHeight, size_t blockSize, const bitcoin::CBlock &b, CoTask *rpaTask) {
if (!header.IsNull() || !txInfos.empty())
clear();
height = blockHeight;
Expand All @@ -45,17 +45,8 @@ void PreProcessedBlock::fill(BlockHeight blockHeight, size_t blockSize, const bi
std::unordered_map<TxHash, unsigned, HashHasher> txHashToIndex; // since we know the size ahead of time here, we can set max_load_factor to 1.0 and avoid over-allocating the hash table
txHashToIndex.max_load_factor(1.0);
txHashToIndex.reserve(b.vtx.size());
static thread_local std::optional<CoTask> rpaTask;
std::optional<CoTask::Future> rpaFut; // NB: rpaFut will auto-wait for work (if any) to complete as part of its d'tor
if (enableRpa) {
if (!rpaTask) {
QString threadName = Util::ThreadName::Get();
if (threadName.isEmpty()) threadName = "???"; // this should ideally not happen, but is here for defensive programming
// Create a new CoTask into TLS. It will be destructed when the current thread exits.
// The assumption here is that the DownloadBlocksTask is calling us and its threads stick around for a while
// as blocks are downloaded.
rpaTask.emplace(QString("RPA CoTask[%1]").arg(threadName));
}
if (rpaTask) {
// Do RPA-related hashing and processing in the rpaTask's thread in parallel (really pays off for 1-off blocks)
rpaFut = rpaTask->submitWork([&b, this]{
// Process the first 30 inputs for each non-coinbase block txn
Expand Down Expand Up @@ -278,9 +269,9 @@ QString PreProcessedBlock::toDebugString() const

/// convenience factory static method: given a block, return a shard_ptr instance of this struct
/*static*/
PreProcessedBlockPtr PreProcessedBlock::makeShared(unsigned height_, size_t size, const bitcoin::CBlock &block, bool enableRpa)
PreProcessedBlockPtr PreProcessedBlock::makeShared(unsigned height_, size_t size, const bitcoin::CBlock &block, CoTask *rpaTask)
{
return std::make_shared<PreProcessedBlock>(height_, size, block, enableRpa);
return std::make_shared<PreProcessedBlock>(height_, size, block, rpaTask);
}


Expand Down
9 changes: 5 additions & 4 deletions src/BlockProc.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <unordered_set>
#include <vector>

class CoTask;

struct PreProcessedBlock;
using PreProcessedBlockPtr = std::shared_ptr<PreProcessedBlock>; ///< For clarity/convenience
Expand Down Expand Up @@ -163,17 +164,17 @@ struct PreProcessedBlock

// c'tors, etc... note this class is fully copyable and moveable
PreProcessedBlock() = default;
PreProcessedBlock(BlockHeight bheight, size_t rawBlockSizeBytes, const bitcoin::CBlock &b, bool enableRpaIndexing) {
fill(bheight, rawBlockSizeBytes, b, enableRpaIndexing);
PreProcessedBlock(BlockHeight bheight, size_t rawBlockSizeBytes, const bitcoin::CBlock &b, CoTask *rpaTask /* nullable */) {
fill(bheight, rawBlockSizeBytes, b, rpaTask);
}
/// reset this to empty
inline void clear() { *this = PreProcessedBlock(); }
/// fill this block with data from bitcoin's CBlock
void fill(BlockHeight blockHeight, size_t rawSizeBytes, const bitcoin::CBlock &b, bool enableRpaIndexing);
void fill(BlockHeight blockHeight, size_t rawSizeBytes, const bitcoin::CBlock &b, CoTask *rpaTask /* nullable */);

/// convenience factory static method: given a block, return a shard_ptr instance of this struct
static PreProcessedBlockPtr makeShared(unsigned height, size_t sizeBytes, const bitcoin::CBlock &block,
bool enableRpaIndexing);
CoTask *rpaTask /* nullable */);

/// debug string
QString toDebugString() const;
Expand Down
17 changes: 14 additions & 3 deletions src/Controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "Controller.h"
#include "Controller_SynchDSPsTask.h"
#include "Controller_SynchMempoolTask.h"
#include "CoTask.h"
#include "Mempool.h"
#include "SubsMgr.h"
#include "ThreadPool.h"
Expand Down Expand Up @@ -495,6 +496,7 @@ struct DownloadBlocksTask : CtlTask
const bool allowMimble; ///< like above, but if true we allow mimblewimble (litecoin)
const bool allowCashTokens; ///< allow special cashtoken deserialization rules (BCH only)
const int rpaStartHeight; ///< if >= 0, rpa data will be indexed in PreProcessedBlock, starting at this height.
std::optional<CoTask> rpaTask; ///< this gets created only at the point where current block height >= rpaStartHeight && rpaStartHeight > -1

void do_get(unsigned height);

Expand Down Expand Up @@ -705,9 +707,18 @@ void DownloadBlocksTask::do_get(unsigned int bnum)
// be out-of-synch due to configuration change, etc).
VarDLTaskResult DownloadBlocksTask::process_block_guts(unsigned bnum, const QByteArray &rawblock, const bitcoin::CBlock &cblock)
{
const bool indexRpaForThisBlock = rpaStartHeight >= 0 && bnum >= unsigned(rpaStartHeight);
auto ppb = PreProcessedBlock::makeShared(bnum, size_t(rawblock.size()), cblock, indexRpaForThisBlock);
if (UNLIKELY(rpaStartHeight >= 0 && bnum == unsigned(rpaStartHeight))) {
CoTask * rpaTaskIfEnabledForThisBlock = nullptr;
// Determine if RPA indexing is enabled for this block, and if so, ensure this->rpaTask is created and pass down a
// pointer to it. The non-null ptr then tells PreProcessedBlock to index RPA data for this block.
const bool rpaIsEnabledForThisBlock = rpaStartHeight >= 0 && bnum >= unsigned(rpaStartHeight);
if (rpaIsEnabledForThisBlock) {
if (!rpaTask) rpaTask.emplace(QString("RPA CoTask[%1]").arg(objectName()));
rpaTaskIfEnabledForThisBlock = &*rpaTask;
}

auto ppb = PreProcessedBlock::makeShared(bnum, size_t(rawblock.size()), cblock, rpaTaskIfEnabledForThisBlock);

if (UNLIKELY(rpaIsEnabledForThisBlock && bnum == unsigned(rpaStartHeight))) {
Util::AsyncOnObject(ctl, [height = rpaStartHeight]{
// We do this in the Controller thread to make the log look pretty, since all other logging
// user sees at this point is from the Controller thread anyway ...
Expand Down

0 comments on commit 10623d7

Please sign in to comment.