Skip to content

Commit

Permalink
refactor: remove boost::thread_group usage
Browse files Browse the repository at this point in the history
Summary:
> Post [[bitcoin/bitcoin#18710 | core#18710]], there isn't much left using boost::thread_group, so should just be able to replace it with the standard library. This also removes the last use of `boost::thread_interrupted`.
>
> After this change, last piece of Boost Thread we'd be using is `boost::shared_mutex`. See the commentary [[bitcoin/bitcoin#16684 (comment)]] as to why it may be non-trivial to swap that for `std::shared_mutex` in the near future.

This is a backport of [[bitcoin/bitcoin#21016 | core#21016]] and [[bitcoin/bitcoin#22433 | core#22433]]

Test Plan:
With clang + debug and (separate run) TSAN
`ninja && ninja check check-functional`

Reviewers: #bitcoin_abc, Fabien

Reviewed By: #bitcoin_abc, Fabien

Subscribers: Fabien

Differential Revision: https://reviews.bitcoinabc.org/D10992
  • Loading branch information
fanquake authored and PiRK committed Feb 4, 2022
1 parent 1121f6f commit 5d1d773
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 41 deletions.
1 change: 1 addition & 0 deletions src/bitcoin-cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <univalue.h>

#include <algorithm>
#include <cmath>
#include <cstdio>
#include <functional>
#include <memory>
Expand Down
11 changes: 4 additions & 7 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@

#include <boost/algorithm/string/replace.hpp>
#include <boost/signals2/signal.hpp>
#include <boost/thread/thread.hpp>

#if ENABLE_ZMQ
#include <zmq/zmqabstractnotifier.h>
Expand All @@ -90,6 +89,8 @@
#include <cstdio>
#include <functional>
#include <set>
#include <thread>
#include <vector>

static const bool DEFAULT_PROXYRANDOMIZE = true;
static const bool DEFAULT_REST_ENABLE = false;
Expand Down Expand Up @@ -159,8 +160,6 @@ static std::unique_ptr<ECCVerifyHandle> globalVerifyHandle;

static std::thread g_load_block;

static boost::thread_group threadGroup;

void Interrupt(NodeContext &node) {
InterruptHTTPServer();
InterruptHTTPRPC();
Expand Down Expand Up @@ -243,15 +242,13 @@ void Shutdown(NodeContext &node) {
StopTorControl();

// After everything has been shut down, but before things get flushed, stop
// the CScheduler/checkqueue, threadGroup and load block thread.
// the CScheduler/checkqueue, scheduler and load block thread.
if (node.scheduler) {
node.scheduler->stop();
}
if (g_load_block.joinable()) {
g_load_block.join();
}
threadGroup.interrupt_all();
threadGroup.join_all();
StopScriptCheckWorkerThreads();

// After the threads that potentially access these pointers have been
Expand Down Expand Up @@ -2287,7 +2284,7 @@ bool AppInitMain(Config &config, RPCServer &rpcServer,
node.scheduler = std::make_unique<CScheduler>();

// Start the lightweight task scheduler thread
threadGroup.create_thread([&] {
node.scheduler->m_service_thread = std::thread([&] {
TraceThread("scheduler", [&] { node.scheduler->serviceQueue(); });
});

Expand Down
3 changes: 0 additions & 3 deletions src/init.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ struct BlockAndHeaderTipInfo;
}
class RPCServer;

namespace boost {
class thread_group;
} // namespace boost
namespace util {
class Ref;
} // namespace util
Expand Down
12 changes: 10 additions & 2 deletions src/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <functional>
#include <list>
#include <map>
#include <thread>

/**
* Simple class for background tasks that should be run periodically or once
Expand All @@ -37,6 +38,8 @@ class CScheduler {
CScheduler();
~CScheduler();

std::thread m_service_thread;

typedef std::function<void()> Function;
typedef std::function<bool()> Predicate;

Expand Down Expand Up @@ -65,8 +68,7 @@ class CScheduler {
void MockForward(std::chrono::seconds delta_seconds);

/**
* Services the queue 'forever'. Should be run in a thread, and interrupted
* using boost::interrupt_thread
* Services the queue 'forever'. Should be run in a thread.
*/
void serviceQueue();

Expand All @@ -77,6 +79,9 @@ class CScheduler {
void stop() {
WITH_LOCK(newTaskMutex, stopRequested = true);
newTaskScheduled.notify_all();
if (m_service_thread.joinable()) {
m_service_thread.join();
}
}

/**
Expand All @@ -86,6 +91,9 @@ class CScheduler {
void StopWhenDrained() {
WITH_LOCK(newTaskMutex, stopWhenEmpty = true);
newTaskScheduled.notify_all();
if (m_service_thread.joinable()) {
m_service_thread.join();
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/script/sigcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <uint256.h>
#include <util/system.h>

#include <boost/thread/lock_types.hpp>
#include <boost/thread/shared_mutex.hpp>

namespace {
Expand Down
20 changes: 14 additions & 6 deletions src/test/checkqueue_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,23 +340,27 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) {
BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) {
auto queue = std::make_unique<Standard_Queue>(QUEUE_BATCH_SIZE);
{
boost::thread_group tg;
std::vector<std::thread> tg;
std::atomic<int> nThreads{0};
std::atomic<int> fails{0};
for (size_t i = 0; i < 3; ++i) {
tg.create_thread([&] {
tg.emplace_back([&] {
CCheckQueueControl<FakeCheck> control(queue.get());
// While sleeping, no other thread should execute to this point
auto observed = ++nThreads;
UninterruptibleSleep(std::chrono::milliseconds{10});
fails += observed != nThreads;
});
}
tg.join_all();
for (auto &thread : tg) {
if (thread.joinable()) {
thread.join();
}
}
BOOST_REQUIRE_EQUAL(fails, 0);
}
{
boost::thread_group tg;
std::vector<std::thread> tg;
std::mutex m;
std::condition_variable cv;
bool has_lock{false};
Expand All @@ -365,7 +369,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) {
bool done_ack{false};
{
std::unique_lock<std::mutex> l(m);
tg.create_thread([&] {
tg.emplace_back([&] {
CCheckQueueControl<FakeCheck> control(queue.get());
std::unique_lock<std::mutex> ll(m);
has_lock = true;
Expand All @@ -391,7 +395,11 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) {
cv.notify_one();
BOOST_REQUIRE(!fails);
}
tg.join_all();
for (auto &thread : tg) {
if (thread.joinable()) {
thread.join();
}
}
}
}
BOOST_AUTO_TEST_SUITE_END()
1 change: 1 addition & 0 deletions src/test/cuckoocache_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <test/util/setup_common.h>

#include <boost/test/unit_test.hpp>
#include <boost/thread/lock_types.hpp>
#include <boost/thread/shared_mutex.hpp>

/**
Expand Down
26 changes: 17 additions & 9 deletions src/test/scheduler_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
#include <sync.h>

#include <boost/test/unit_test.hpp>
#include <boost/thread/thread.hpp>

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>

BOOST_AUTO_TEST_SUITE(scheduler_tests)

Expand Down Expand Up @@ -89,9 +89,9 @@ BOOST_AUTO_TEST_CASE(manythreads) {

// As soon as these are created they will start running and servicing the
// queue
boost::thread_group microThreads;
std::vector<std::thread> microThreads;
for (int i = 0; i < 5; i++) {
microThreads.create_thread(
microThreads.emplace_back(
std::bind(&CScheduler::serviceQueue, &microTasks));
}

Expand All @@ -100,7 +100,7 @@ BOOST_AUTO_TEST_CASE(manythreads) {

// More threads and more tasks:
for (int i = 0; i < 5; i++) {
microThreads.create_thread(
microThreads.emplace_back(
std::bind(&CScheduler::serviceQueue, &microTasks));
}

Expand All @@ -119,8 +119,12 @@ BOOST_AUTO_TEST_CASE(manythreads) {

// Drain the task queue then exit threads
microTasks.StopWhenDrained();
// ... wait until all the threads are done
microThreads.join_all();
// wait until all the threads are done
for (auto &thread : microThreads) {
if (thread.joinable()) {
thread.join();
}
}

int counterSum = 0;
for (int i = 0; i < 10; i++) {
Expand Down Expand Up @@ -208,9 +212,9 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) {
// if the queues only permit execution of one task at once then
// the extra threads should effectively be doing nothing
// if they don't we'll get out of order behaviour
boost::thread_group threads;
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.create_thread(std::bind(&CScheduler::serviceQueue, &scheduler));
threads.emplace_back(std::bind(&CScheduler::serviceQueue, &scheduler));
}

// these are not atomic, if SinglethreadedSchedulerClient prevents
Expand All @@ -236,7 +240,11 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) {

// finish up
scheduler.StopWhenDrained();
threads.join_all();
for (auto &thread : threads) {
if (thread.joinable()) {
thread.join();
}
}

BOOST_CHECK_EQUAL(counter1, 100);
BOOST_CHECK_EQUAL(counter2, 100);
Expand Down
4 changes: 1 addition & 3 deletions src/test/util/setup_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ TestingSetup::TestingSetup(const std::string &chainName,

// We have to run a scheduler thread to prevent ActivateBestChain
// from blocking due to queue overrun.
threadGroup.create_thread([&] {
m_node.scheduler->m_service_thread = std::thread([&] {
TraceThread("scheduler", [&] { m_node.scheduler->serviceQueue(); });
});
GetMainSignals().RegisterBackgroundSignalScheduler(*m_node.scheduler);
Expand Down Expand Up @@ -224,8 +224,6 @@ TestingSetup::~TestingSetup() {
if (m_node.scheduler) {
m_node.scheduler->stop();
}
threadGroup.interrupt_all();
threadGroup.join_all();
StopScriptCheckWorkerThreads();
GetMainSignals().FlushBackgroundCallbacks();
GetMainSignals().UnregisterBackgroundSignalScheduler();
Expand Down
5 changes: 1 addition & 4 deletions src/test/util/setup_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
#include <util/string.h>
#include <util/system.h>

#include <boost/thread/thread.hpp>

#include <type_traits>
#include <vector>

/**
* Version of Boost::test prior to 1.64 have issues when dealing with nullptr_t.
Expand Down Expand Up @@ -117,8 +116,6 @@ struct BasicTestingSetup {
* Included are coins database, script check threads setup.
*/
struct TestingSetup : public BasicTestingSetup {
boost::thread_group threadGroup;

explicit TestingSetup(const std::string &chainName = CBaseChainParams::MAIN,
const std::vector<const char *> &extra_args = {});
~TestingSetup();
Expand Down
5 changes: 0 additions & 5 deletions src/util/system.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
#include <util/threadnames.h>
#include <util/time.h>

#include <boost/thread/condition_variable.hpp> // for boost::thread_interrupted

#include <cstdint>
#include <exception>
#include <map>
Expand Down Expand Up @@ -470,9 +468,6 @@ template <typename Callable> void TraceThread(const char *name, Callable func) {
LogPrintf("%s thread start\n", name);
func();
LogPrintf("%s thread exit\n", name);
} catch (const boost::thread_interrupted &) {
LogPrintf("%s thread interrupt\n", name);
throw;
} catch (const std::exception &e) {
PrintExceptionContinue(&e, name);
throw;
Expand Down
3 changes: 1 addition & 2 deletions test/lint/lint-boost-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ EXPECTED_BOOST_INCLUDES=(
boost/signals2/optional_last_value.hpp
boost/signals2/signal.hpp
boost/test/unit_test.hpp
boost/thread/condition_variable.hpp
boost/thread/lock_types.hpp
boost/thread/shared_mutex.hpp
boost/thread/thread.hpp
boost/variant.hpp
boost/variant/apply_visitor.hpp
boost/variant/static_visitor.hpp
Expand Down

0 comments on commit 5d1d773

Please sign in to comment.