From 0e94c77f6ef5b6db5e88fca3736da834e5e51172 Mon Sep 17 00:00:00 2001 From: Hennadii Stepanov <32963518+hebasto@users.noreply.github.com> Date: Mon, 7 Jun 2021 04:00:45 +0300 Subject: [PATCH] Add local thread pool to CCheckQueue Backport of bitcoin/bitcoin#18710 PR message: This PR: gets rid of boost::thread_group in the CCheckQueue class allows thread safety annotation usage in the CCheckQueue class is alternative to #14464 (#18710 (comment), #18710 (comment)) Also, with this PR (I hope) it could be easier to resurrect a bunch of brilliant ideas from #9938. Related: #17307 Squashed commits: 0ef938685b5c079a6f5a98daf0e3865d718d817b refactor: Use member initializers in CCheckQueue 01511776acb0c7ec216dc9c8112531067763f1cb Add local thread pool to CCheckQueue dba30695fc42f45828db008e7e5b81cb2b5d8551 test: Use CCheckQueue local thread pool 6784ac471bb32b6bb8e2de60986f123eb4990706 bench: Use CCheckQueue local thread pool bb6fcc75d1ec94b733d1477c816351c50be5faf9 refactor: Drop boost::thread stuff in CCheckQueue Also in this commit: Slight modification to our custom CCheckQueue_32MB benchmark to use the new API. Signed-off-by: Calin Culianu --- src/bench/checkqueue.cpp | 24 +++------ src/checkqueue.h | 94 +++++++++++++++++++++++----------- src/init.cpp | 5 +- src/test/checkqueue_tests.cpp | 52 +++++-------------- src/test/setup_common.cpp | 5 +- src/test/transaction_tests.cpp | 9 +--- src/validation.cpp | 9 ++-- src/validation.h | 8 +-- test/lint/lint-includes.sh | 1 - 9 files changed, 100 insertions(+), 107 deletions(-) diff --git a/src/bench/checkqueue.cpp b/src/bench/checkqueue.cpp index 6c9e4af413..c03457c78c 100644 --- a/src/bench/checkqueue.cpp +++ b/src/bench/checkqueue.cpp @@ -16,8 +16,6 @@ #include #include -#include - #include #include #include @@ -43,10 +41,7 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State &state) { void swap(PrevectorJob &x) { p.swap(x.p); }; }; CCheckQueue queue{QUEUE_BATCH_SIZE}; - boost::thread_group tg; - for (auto x = 0; x < std::max(MIN_CORES, GetNumCores()); ++x) { - tg.create_thread([&] { queue.Thread(); }); - } + queue.StartWorkerThreads(std::max(MIN_CORES, GetNumCores())); while (state.KeepRunning()) { // Make insecure_rand here so that each iteration is identical. FastRandomContext insecure_rand(true); @@ -63,8 +58,7 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State &state) { // for clarity control.Wait(); } - tg.interrupt_all(); - tg.join_all(); + queue.StopWorkerThreads(); } static void CCheckQueue_RealData32MB(bool cacheSigs, benchmark::State &state) { @@ -132,19 +126,15 @@ static void CCheckQueue_RealData32MB(bool cacheSigs, benchmark::State &state) { // Step 3: Setup threads for our CCheckQueue CCheckQueue queue{QUEUE_BATCH_SIZE}; - boost::thread_group tg; int nThreads = gArgs.GetArg("-par", DEFAULT_SCRIPTCHECK_THREADS); const int nCores = std::max(GetNumCores(), 1); if (!nThreads) nThreads = nCores; - else if (nThreads < 0) nThreads = std::max(0, nCores + nThreads); // negative means leave n cores free - LogPrintf("%s: Using %d threads for signature verification\n", __func__, nThreads); + else if (nThreads < 0) nThreads = std::max(1, nCores + nThreads); // negative means leave n cores free + LogPrintf("%s: Using %d thread%s for signature verification\n", __func__, nThreads, nThreads != 1 ? "s" : ""); --nThreads; // account for the fact that this main thread also does processing in .Wait() below - for (int i = 0; i < nThreads; ++i) { - tg.create_thread([&] { queue.Thread(); }); - } - Defer d([&tg]{ - tg.interrupt_all(); - tg.join_all(); + queue.StartWorkerThreads(nThreads); + Defer d([&queue]{ + queue.StopWorkerThreads(); }); // And finally: Run the benchmark diff --git a/src/checkqueue.h b/src/checkqueue.h index 30c63706d8..6a0a521b12 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -1,4 +1,4 @@ -// Copyright (c) 2012-2018 The Bitcoin Core developers +// Copyright (c) 2012-2021 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -6,13 +6,12 @@ #define BITCOIN_CHECKQUEUE_H #include +#include +#include #include #include -#include -#include - template class CCheckQueueControl; /** @@ -28,47 +27,50 @@ template class CCheckQueueControl; template class CCheckQueue { private: //! Mutex to protect the inner state - boost::mutex mutex; + Mutex m_mutex; //! Worker threads block on this when out of work - boost::condition_variable condWorker; + std::condition_variable m_worker_cv; //! Master thread blocks on this when out of work - boost::condition_variable condMaster; + std::condition_variable m_master_cv; //! The queue of elements to be processed. //! As the order of booleans doesn't matter, it is used as a LIFO (stack) - std::vector queue; + std::vector queue GUARDED_BY(m_mutex); //! The number of workers (including the master) that are idle. - int nIdle; + int nIdle GUARDED_BY(m_mutex){0}; //! The total number of workers (including the master). - int nTotal; + int nTotal GUARDED_BY(m_mutex){0}; //! The temporary evaluation result. - bool fAllOk; + bool fAllOk GUARDED_BY(m_mutex){true}; /** * Number of verifications that haven't completed yet. * This includes elements that are no longer queued, but still in the * worker's own batches. */ - unsigned int nTodo; + unsigned int nTodo GUARDED_BY(m_mutex){0}; //! The maximum number of elements to be processed in one batch - unsigned int nBatchSize; + const unsigned int nBatchSize; + + std::vector m_worker_threads; + bool m_request_stop GUARDED_BY(m_mutex){false}; /** Internal function that does bulk of the verification work. */ - bool Loop(bool fMaster = false) { - boost::condition_variable &cond = fMaster ? condMaster : condWorker; + bool Loop(bool fMaster) { + std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; std::vector vChecks; vChecks.reserve(nBatchSize); unsigned int nNow = 0; bool fOk = true; do { { - boost::unique_lock lock(mutex); + WAIT_LOCK(m_mutex, lock); // first do the clean-up of the previous loop run (allowing us // to do it in the same critsect) if (nNow) { @@ -77,14 +79,14 @@ template class CCheckQueue { if (nTodo == 0 && !fMaster) { // We processed the last element; inform the master it // can exit and return the result - condMaster.notify_one(); + m_master_cv.notify_one(); } } else { // first iteration nTotal++; } // logically, the do loop starts here - while (queue.empty()) { + while (queue.empty() && !m_request_stop) { if (fMaster && nTodo == 0) { nTotal--; bool fRet = fAllOk; @@ -97,6 +99,10 @@ template class CCheckQueue { cond.wait(lock); // wait nIdle--; } + if (m_request_stop) { + return false; + } + // Decide how many work units to process now. // * Do not try to do everything at once, but aim for // increasingly smaller batches so all workers finish @@ -131,36 +137,62 @@ template class CCheckQueue { public: //! Mutex to ensure only one concurrent CCheckQueueControl - boost::mutex ControlMutex; + Mutex m_control_mutex; //! Create a new check queue explicit CCheckQueue(unsigned int nBatchSizeIn) - : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), - nBatchSize(nBatchSizeIn) {} - - //! Worker thread - void Thread() { Loop(); } + : nBatchSize(nBatchSizeIn) {} + + //! Create a pool of new worker threads. + void StartWorkerThreads(const int threads_num) + { + { + LOCK(m_mutex); + nIdle = 0; + nTotal = 0; + fAllOk = true; + } + assert(m_worker_threads.empty()); + for (int n = 0; n < threads_num; ++n) { + m_worker_threads.emplace_back([this, n]() { + util::ThreadRename(strprintf("scriptch.%i", n)); + Loop(false /* worker thread */); + }); + } + } //! Wait until execution finishes, and return whether all evaluations were //! successful. - bool Wait() { return Loop(true); } + bool Wait() { return Loop(true /* master thread */); } //! Add a batch of checks to the queue void Add(std::vector &vChecks) { - boost::unique_lock lock(mutex); + LOCK(m_mutex); for (T &check : vChecks) { queue.push_back(T()); check.swap(queue.back()); } nTodo += vChecks.size(); if (vChecks.size() == 1) { - condWorker.notify_one(); + m_worker_cv.notify_one(); } else if (vChecks.size() > 1) { - condWorker.notify_all(); + m_worker_cv.notify_all(); + } + } + + //! Stop all of the worker threads. + void StopWorkerThreads() + { + WITH_LOCK(m_mutex, m_request_stop = true); + m_worker_cv.notify_all(); + for (std::thread& t : m_worker_threads) { + t.join(); } + m_worker_threads.clear(); + WITH_LOCK(m_mutex, m_request_stop = false); } - ~CCheckQueue() {} + ~CCheckQueue() { assert(m_worker_threads.empty()); } }; /** @@ -180,7 +212,7 @@ template class CCheckQueueControl { : pqueue(pqueueIn), fDone(false) { // passed queue is supposed to be unused, or nullptr if (pqueue != nullptr) { - ENTER_CRITICAL_SECTION(pqueue->ControlMutex); + ENTER_CRITICAL_SECTION(pqueue->m_control_mutex); } } @@ -204,7 +236,7 @@ template class CCheckQueueControl { Wait(); } if (pqueue != nullptr) { - LEAVE_CRITICAL_SECTION(pqueue->ControlMutex); + LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex); } } }; diff --git a/src/init.cpp b/src/init.cpp index b88c72f8cf..335cb4c2e2 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -247,6 +247,7 @@ void Shutdown(NodeContext &node) { // the CScheduler/checkqueue threadGroup threadGroup.interrupt_all(); threadGroup.join_all(); + StopScriptCheckWorkerThreads(); // After the threads that potentially access these pointers have been // stopped, destruct and reset all to nullptr. @@ -2105,9 +2106,7 @@ bool AppInitMain(Config &config, RPCServer &rpcServer, LogPrintf("Script verification uses %d additional threads\n", script_threads); if (script_threads >= 1) { g_parallel_script_checks = true; - for (int i = 0; i < script_threads; ++i) { - threadGroup.create_thread([i]() { return ThreadScriptCheck(i); }); - } + StartScriptCheckWorkerThreads(script_threads); } // Start the lightweight task scheduler thread diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp index d6dc64337f..c0735fd92a 100644 --- a/src/test/checkqueue_tests.cpp +++ b/src/test/checkqueue_tests.cpp @@ -126,10 +126,7 @@ typedef CCheckQueue FrozenCleanup_Queue; */ static void Correct_Queue_range(std::vector range) { auto small_queue = std::make_unique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&] { small_queue->Thread(); }); - } + small_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); // Make vChecks here to save on malloc (this test can be slow...) std::vector vChecks; for (const size_t i : range) { @@ -146,8 +143,7 @@ static void Correct_Queue_range(std::vector range) { BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i); } } - tg.interrupt_all(); - tg.join_all(); + small_queue->StopWorkerThreads(); } /** Test that 0 checks is correct @@ -188,10 +184,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random) { BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) { auto fail_queue = std::make_unique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&] { fail_queue->Thread(); }); - } + fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (size_t i = 0; i < 1001; ++i) { CCheckQueueControl control(fail_queue.get()); @@ -213,17 +206,13 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) { BOOST_REQUIRE(success); } } - tg.interrupt_all(); - tg.join_all(); + fail_queue->StopWorkerThreads(); } // Test that a block validation which fails does not interfere with // future blocks, ie, the bad state is cleared. BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) { auto fail_queue = std::make_unique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&] { fail_queue->Thread(); }); - } + fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (auto times = 0; times < 10; ++times) { for (const bool end_fails : {true, false}) { @@ -238,8 +227,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) { BOOST_REQUIRE(r != end_fails); } } - tg.interrupt_all(); - tg.join_all(); + fail_queue->StopWorkerThreads(); } // Test that unique checks are actually all called individually, rather than @@ -247,10 +235,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) { // more than once as well BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) { auto queue = std::make_unique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&] { queue->Thread(); }); - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); size_t COUNT = 100000; size_t total = COUNT; @@ -271,8 +256,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) { r = r && UniqueCheck::results.count(i) == 1; } BOOST_REQUIRE(r); - tg.interrupt_all(); - tg.join_all(); + queue->StopWorkerThreads(); } // Test that blocks which might allocate lots of memory free their memory @@ -283,10 +267,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) { // time could leave the data hanging across a sequence of blocks. BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) { auto queue = std::make_unique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&] { queue->Thread(); }); - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (size_t i = 0; i < 1000; ++i) { size_t total = i; { @@ -306,19 +287,15 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) { } BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U); } - tg.interrupt_all(); - tg.join_all(); + queue->StopWorkerThreads(); } // Test that a new verification cannot occur until all checks // have been destructed BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) { auto queue = std::make_unique(QUEUE_BATCH_SIZE); - boost::thread_group tg; bool fails = false; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&] { queue->Thread(); }); - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); std::thread t0([&]() { CCheckQueueControl control(queue.get()); std::vector vChecks(1); @@ -338,7 +315,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) { } // Try to get control of the queue a bunch of times for (auto x = 0; x < 100 && !fails; ++x) { - fails = queue->ControlMutex.try_lock(); + fails = queue->m_control_mutex.try_lock(); } { // Unfreeze (we need lock n case of spurious wakeup) @@ -349,9 +326,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) { FrozenCleanupCheck::cv.notify_one(); // Wait for control to finish t0.join(); - tg.interrupt_all(); - tg.join_all(); BOOST_REQUIRE(!fails); + queue->StopWorkerThreads(); } /** Test that CCheckQueueControl is threadsafe */ @@ -399,7 +375,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) { cv.wait(l, [&]() { return has_lock; }); bool fails = false; for (auto x = 0; x < 100 && !fails; ++x) { - fails = queue->ControlMutex.try_lock(); + fails = queue->m_control_mutex.try_lock(); } has_tried = true; cv.notify_one(); diff --git a/src/test/setup_common.cpp b/src/test/setup_common.cpp index a91fea34eb..4f880e0ebf 100644 --- a/src/test/setup_common.cpp +++ b/src/test/setup_common.cpp @@ -124,9 +124,7 @@ TestingSetup::TestingSetup(const std::string &chainName) // Start script-checking threads. Set g_parallel_script_checks to true so they are used. constexpr int script_check_threads = 2; - for (int i = 0; i < script_check_threads; ++i) { - threadGroup.create_thread([i]() { return ThreadScriptCheck(i); }); - } + StartScriptCheckWorkerThreads(script_check_threads); g_parallel_script_checks = true; g_banman = @@ -139,6 +137,7 @@ TestingSetup::TestingSetup(const std::string &chainName) TestingSetup::~TestingSetup() { threadGroup.interrupt_all(); threadGroup.join_all(); + StopScriptCheckWorkerThreads(); GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().UnregisterBackgroundSignalScheduler(); g_connman.reset(); diff --git a/src/test/transaction_tests.cpp b/src/test/transaction_tests.cpp index 533c216d22..376f432851 100644 --- a/src/test/transaction_tests.cpp +++ b/src/test/transaction_tests.cpp @@ -466,14 +466,10 @@ BOOST_AUTO_TEST_CASE(test_big_transaction) { // check all inputs concurrently, with the cache PrecomputedTransactionData txdata(tx); - boost::thread_group threadGroup; CCheckQueue scriptcheckqueue(128); CCheckQueueControl control(&scriptcheckqueue); - for (int i = 0; i < 20; i++) { - threadGroup.create_thread(std::bind(&CCheckQueue::Thread, - std::ref(scriptcheckqueue))); - } + scriptcheckqueue.StartWorkerThreads(20); std::vector coins; for (size_t i = 0; i < mtx.vin.size(); i++) { @@ -496,8 +492,7 @@ BOOST_AUTO_TEST_CASE(test_big_transaction) { bool controlCheck = control.Wait(); BOOST_CHECK(controlCheck); - threadGroup.interrupt_all(); - threadGroup.join_all(); + scriptcheckqueue.StopWorkerThreads(); } SignatureData CombineSignatures(const CMutableTransaction &input1, diff --git a/src/validation.cpp b/src/validation.cpp index 00e4fd9cea..009539274b 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -1619,9 +1619,12 @@ static bool WriteUndoDataForBlock(const CBlockUndo &blockundo, static CCheckQueue scriptcheckqueue(128); -void ThreadScriptCheck(int worker_num) { - util::ThreadRename(strprintf("scriptch.%i", worker_num)); - scriptcheckqueue.Thread(); +void StartScriptCheckWorkerThreads(int threads_num) { + scriptcheckqueue.StartWorkerThreads(threads_num); +} + +void StopScriptCheckWorkerThreads() { + scriptcheckqueue.StopWorkerThreads(); } int32_t ComputeBlockVersion(const CBlockIndex *pindexPrev, diff --git a/src/validation.h b/src/validation.h index 9ddbea5f9d..036db1b48b 100644 --- a/src/validation.h +++ b/src/validation.h @@ -398,10 +398,10 @@ bool LoadChainTip(const Config &config) EXCLUSIVE_LOCKS_REQUIRED(cs_main); */ void UnloadBlockIndex(); -/** - * Run an instance of the script checking thread. - */ -void ThreadScriptCheck(int worker_num); +/** Run instances of script checking worker threads */ +void StartScriptCheckWorkerThreads(int threads_num); +/** Stop all of the script checking worker threads */ +void StopScriptCheckWorkerThreads(); /** * Check whether we are doing an initial block download (synchronizing from disk diff --git a/test/lint/lint-includes.sh b/test/lint/lint-includes.sh index d1c3cba13a..28f859a423 100755 --- a/test/lint/lint-includes.sh +++ b/test/lint/lint-includes.sh @@ -74,7 +74,6 @@ EXPECTED_BOOST_INCLUDES=( boost/thread.hpp boost/thread/condition_variable.hpp boost/thread/locks.hpp - boost/thread/mutex.hpp boost/thread/shared_mutex.hpp boost/thread/thread.hpp boost/variant.hpp