Skip to content

Commit

Permalink
Add local thread pool to CCheckQueue and use it in tests
Browse files Browse the repository at this point in the history
Summary:
This is a backport of [[bitcoin/bitcoin#18710 | [[bitcoin/bitcoin#18710 | core#18710]]]] [2/4]
bitcoin/bitcoin@0151177
bitcoin/bitcoin@dba3069

Depends on D10985

Test Plan:
With TSAN:
`ninja && ninja check check-functional`

Reviewers: #bitcoin_abc, Fabien

Reviewed By: #bitcoin_abc, Fabien

Subscribers: Fabien

Differential Revision: https://reviews.bitcoinabc.org/D10986
  • Loading branch information
hebasto authored and PiRK committed Feb 4, 2022
1 parent 83a6f76 commit 8b2338a
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 61 deletions.
51 changes: 46 additions & 5 deletions src/checkqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#define BITCOIN_CHECKQUEUE_H

#include <sync.h>
#include <tinyformat.h>
#include <util/threadnames.h>

#include <algorithm>
#include <vector>
Expand Down Expand Up @@ -59,8 +61,11 @@ template <typename T> class CCheckQueue {
//! The maximum number of elements to be processed in one batch
const unsigned int nBatchSize;

std::vector<std::thread> m_worker_threads;
bool m_request_stop{false};

/** Internal function that does bulk of the verification work. */
bool Loop(bool fMaster = false) {
bool Loop(bool fMaster) {
boost::condition_variable &cond = fMaster ? condMaster : condWorker;
std::vector<T> vChecks;
vChecks.reserve(nBatchSize);
Expand All @@ -84,7 +89,7 @@ template <typename T> class CCheckQueue {
nTotal++;
}
// logically, the do loop starts here
while (queue.empty()) {
while (queue.empty() && !m_request_stop) {
if (fMaster && nTodo == 0) {
nTotal--;
bool fRet = fAllOk;
Expand All @@ -97,6 +102,10 @@ template <typename T> class CCheckQueue {
cond.wait(lock); // wait
nIdle--;
}
if (m_request_stop) {
return false;
}

// Decide how many work units to process now.
// * Do not try to do everything at once, but aim for
// increasingly smaller batches so all workers finish
Expand Down Expand Up @@ -137,12 +146,29 @@ template <typename T> class CCheckQueue {
explicit CCheckQueue(unsigned int nBatchSizeIn)
: nBatchSize(nBatchSizeIn) {}

//! Create a pool of new worker threads.
void StartWorkerThreads(const int threads_num) {
{
boost::unique_lock<boost::mutex> lock(mutex);
nIdle = 0;
nTotal = 0;
fAllOk = true;
}
assert(m_worker_threads.empty());
for (int n = 0; n < threads_num; ++n) {
m_worker_threads.emplace_back([this, n]() {
util::ThreadRename(strprintf("scriptch.%i", n));
Loop(false /* worker thread */);
});
}
}

//! Worker thread
void Thread() { Loop(); }
void Thread() { Loop(false /* worker thread */); }

//! Wait until execution finishes, and return whether all evaluations were
//! successful.
bool Wait() { return Loop(true); }
bool Wait() { return Loop(true /* master thread */); }

//! Add a batch of checks to the queue
void Add(std::vector<T> &vChecks) {
Expand All @@ -159,7 +185,22 @@ template <typename T> class CCheckQueue {
}
}

~CCheckQueue() {}
//! Stop all of the worker threads.
void StopWorkerThreads() {
{
boost::unique_lock<boost::mutex> lock(mutex);
m_request_stop = true;
}
condWorker.notify_all();
for (std::thread &t : m_worker_threads) {
t.join();
}
m_worker_threads.clear();
boost::unique_lock<boost::mutex> lock(mutex);
m_request_stop = false;
}

~CCheckQueue() { assert(m_worker_threads.empty()); }
};

/**
Expand Down
5 changes: 2 additions & 3 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ void Shutdown(NodeContext &node) {
}
threadGroup.interrupt_all();
threadGroup.join_all();
StopScriptCheckWorkerThreads();

// After the threads that potentially access these pointers have been
// stopped, destruct and reset all to nullptr.
Expand Down Expand Up @@ -2279,9 +2280,7 @@ bool AppInitMain(Config &config, RPCServer &rpcServer,
LogPrintf("Script verification uses %d additional threads\n",
script_threads);
if (script_threads >= 1) {
for (int i = 0; i < script_threads; ++i) {
threadGroup.create_thread([i]() { return ThreadScriptCheck(i); });
}
StartScriptCheckWorkerThreads(script_threads);
}

assert(!node.scheduler);
Expand Down
49 changes: 12 additions & 37 deletions src/test/checkqueue_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,7 @@ typedef CCheckQueue<FrozenCleanupCheck> FrozenCleanup_Queue;
*/
static void Correct_Queue_range(std::vector<size_t> range) {
auto small_queue = std::make_unique<Correct_Queue>(QUEUE_BATCH_SIZE);
boost::thread_group tg;
for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) {
tg.create_thread([&] { small_queue->Thread(); });
}
small_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);
// Make vChecks here to save on malloc (this test can be slow...)
std::vector<FakeCheckCheckCompletion> vChecks;
for (const size_t i : range) {
Expand All @@ -148,8 +145,7 @@ static void Correct_Queue_range(std::vector<size_t> range) {
BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i);
}
}
tg.interrupt_all();
tg.join_all();
small_queue->StopWorkerThreads();
}

/** Test that 0 checks is correct
Expand Down Expand Up @@ -189,11 +185,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random) {
/** Test that failing checks are caught */
BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) {
auto fail_queue = std::make_unique<Failing_Queue>(QUEUE_BATCH_SIZE);

boost::thread_group tg;
for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) {
tg.create_thread([&] { fail_queue->Thread(); });
}
fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);

for (size_t i = 0; i < 1001; ++i) {
CCheckQueueControl<FailingCheck> control(fail_queue.get());
Expand All @@ -215,17 +207,13 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) {
BOOST_REQUIRE(success);
}
}
tg.interrupt_all();
tg.join_all();
fail_queue->StopWorkerThreads();
}
// Test that a block validation which fails does not interfere with
// future blocks, ie, the bad state is cleared.
BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) {
auto fail_queue = std::make_unique<Failing_Queue>(QUEUE_BATCH_SIZE);
boost::thread_group tg;
for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) {
tg.create_thread([&] { fail_queue->Thread(); });
}
fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);

for (auto times = 0; times < 10; ++times) {
for (const bool end_fails : {true, false}) {
Expand All @@ -240,19 +228,15 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) {
BOOST_REQUIRE(r != end_fails);
}
}
tg.interrupt_all();
tg.join_all();
fail_queue->StopWorkerThreads();
}

// Test that unique checks are actually all called individually, rather than
// just one check being called repeatedly. Test that checks are not called
// more than once as well
BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) {
auto queue = std::make_unique<Unique_Queue>(QUEUE_BATCH_SIZE);
boost::thread_group tg;
for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) {
tg.create_thread([&] { queue->Thread(); });
}
queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);

size_t COUNT = 100000;
size_t total = COUNT;
Expand All @@ -276,8 +260,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) {
}
BOOST_REQUIRE(r);
}
tg.interrupt_all();
tg.join_all();
queue->StopWorkerThreads();
}

// Test that blocks which might allocate lots of memory free their memory
Expand All @@ -288,10 +271,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) {
// time could leave the data hanging across a sequence of blocks.
BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) {
auto queue = std::make_unique<Memory_Queue>(QUEUE_BATCH_SIZE);
boost::thread_group tg;
for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) {
tg.create_thread([&] { queue->Thread(); });
}
queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);
for (size_t i = 0; i < 1000; ++i) {
size_t total = i;
{
Expand All @@ -311,19 +291,15 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) {
}
BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U);
}
tg.interrupt_all();
tg.join_all();
queue->StopWorkerThreads();
}

// Test that a new verification cannot occur until all checks
// have been destructed
BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) {
auto queue = std::make_unique<FrozenCleanup_Queue>(QUEUE_BATCH_SIZE);
boost::thread_group tg;
bool fails = false;
for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) {
tg.create_thread([&] { queue->Thread(); });
}
queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);
std::thread t0([&]() {
CCheckQueueControl<FrozenCleanupCheck> control(queue.get());
std::vector<FrozenCleanupCheck> vChecks(1);
Expand Down Expand Up @@ -356,9 +332,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) {
FrozenCleanupCheck::cv.notify_one();
// Wait for control to finish
t0.join();
tg.interrupt_all();
tg.join_all();
BOOST_REQUIRE(!fails);
queue->StopWorkerThreads();
}

/** Test that CCheckQueueControl is threadsafe */
Expand Down
10 changes: 2 additions & 8 deletions src/test/transaction_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,10 @@ BOOST_AUTO_TEST_CASE(test_big_transaction) {

// check all inputs concurrently, with the cache
PrecomputedTransactionData txdata(tx);
boost::thread_group threadGroup;
CCheckQueue<CScriptCheck> scriptcheckqueue(128);
CCheckQueueControl<CScriptCheck> control(&scriptcheckqueue);

for (int i = 0; i < 20; i++) {
threadGroup.create_thread(std::bind(&CCheckQueue<CScriptCheck>::Thread,
std::ref(scriptcheckqueue)));
}
scriptcheckqueue.StartWorkerThreads(20);

std::vector<Coin> coins;
for (size_t i = 0; i < mtx.vin.size(); i++) {
Expand All @@ -456,9 +452,7 @@ BOOST_AUTO_TEST_CASE(test_big_transaction) {

bool controlCheck = control.Wait();
BOOST_CHECK(controlCheck);

threadGroup.interrupt_all();
threadGroup.join_all();
scriptcheckqueue.StopWorkerThreads();
}

SignatureData CombineSignatures(const CMutableTransaction &input1,
Expand Down
5 changes: 2 additions & 3 deletions src/test/util/setup_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,7 @@ TestingSetup::TestingSetup(const std::string &chainName,
}
}
constexpr int script_check_threads = 2;
for (int i = 0; i < script_check_threads; ++i) {
threadGroup.create_thread([i]() { return ThreadScriptCheck(i); });
}
StartScriptCheckWorkerThreads(script_check_threads);

m_node.banman = std::make_unique<BanMan>(
m_args.GetDataDirPath() / "banlist.dat", chainparams, nullptr,
Expand All @@ -226,6 +224,7 @@ TestingSetup::~TestingSetup() {
}
threadGroup.interrupt_all();
threadGroup.join_all();
StopScriptCheckWorkerThreads();
GetMainSignals().FlushBackgroundCallbacks();
GetMainSignals().UnregisterBackgroundSignalScheduler();
m_node.connman.reset();
Expand Down
9 changes: 6 additions & 3 deletions src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1498,9 +1498,12 @@ static bool WriteUndoDataForBlock(const CBlockUndo &blockundo,

static CCheckQueue<CScriptCheck> scriptcheckqueue(128);

void ThreadScriptCheck(int worker_num) {
util::ThreadRename(strprintf("scriptch.%i", worker_num));
scriptcheckqueue.Thread();
void StartScriptCheckWorkerThreads(int threads_num) {
scriptcheckqueue.StartWorkerThreads(threads_num);
}

void StopScriptCheckWorkerThreads() {
scriptcheckqueue.StopWorkerThreads();
}

VersionBitsCache versionbitscache GUARDED_BY(cs_main);
Expand Down
9 changes: 7 additions & 2 deletions src/validation.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,14 @@ bool LoadGenesisBlock(const CChainParams &chainparams);
void UnloadBlockIndex(CTxMemPool *mempool, ChainstateManager &chainman);

/**
* Run an instance of the script checking thread.
* Run instances of script checking worker threads
*/
void ThreadScriptCheck(int worker_num);
void StartScriptCheckWorkerThreads(int threads_num);

/**
* Stop all of the script checking worker threads
*/
void StopScriptCheckWorkerThreads();

/**
* Return transaction from the block at block_index.
Expand Down

0 comments on commit 8b2338a

Please sign in to comment.