Skip to content

Commit

Permalink
Merge bitcoin#10179: Give CValidationInterface Support for calling no…
Browse files Browse the repository at this point in the history
…tifications on the CScheduler Thread

1f668b6 Expose if CScheduler is being serviced, assert its not in EmptyQueue (Matt Corallo)
3192975 Flush CValidationInterface callbacks prior to destruction (Matt Corallo)
08096bb Support more than one CScheduler thread for serial clients (Matt Corallo)
2fbf2db Add default arg to CScheduler to schedule() a callback now (Matt Corallo)
cda1429 Give CMainSignals a reference to the global scheduler (Matt Corallo)
3a19fed Make ValidationInterface signals-type-agnostic (Matt Corallo)
ff6a834 Use TestingSetup to DRY qt rpcnestedtests (Matt Corallo)

Tree-SHA512: fab91e34e30b080ed4d0a6d8c1214910e383c45440676e37be61d0bde6ae98d61e8903d22b846e95ba4e73a6ce788798350266feba246d8a2ab357e8523e4ac5
  • Loading branch information
laanwj authored and PastaPastaPasta committed Aug 17, 2019
1 parent 7482e4f commit f4f7a82
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 125 deletions.
18 changes: 17 additions & 1 deletion src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,19 @@ void PrepareShutdown()
fFeeEstimatesInitialized = false;
}

// FlushStateToDisk generates a SetBestChain callback, which we should avoid missing
FlushStateToDisk();

// After there are no more peers/RPC left to give us new data which may generate
// CValidationInterface callbacks, flush them...
GetMainSignals().FlushBackgroundCallbacks();

// Any future callbacks will be dropped. This should absolutely be safe - if
// missing a callback results in an unrecoverable situation, unclean shutdown
// would too. The only reason to do the above flushes is to let the wallet catch
// up with our current chain to avoid any strange pruning edge cases and make
// next startup faster by avoiding rescan.

{
LOCK(cs_main);
if (pcoinsTip != nullptr) {
Expand Down Expand Up @@ -334,6 +347,7 @@ void PrepareShutdown()
}
#endif
UnregisterAllValidationInterfaces();
GetMainSignals().UnregisterBackgroundSignalScheduler();
}

/**
Expand All @@ -351,7 +365,7 @@ void Shutdown()
if(!fRequestRestart) {
PrepareShutdown();
}
// Shutdown part 2: Stop TOR thread and delete wallet instance
// Shutdown part 2: Stop TOR thread and delete wallet instance
StopTorControl();
#ifdef ENABLE_WALLET
for (CWalletRef pwallet : vpwallets) {
Expand Down Expand Up @@ -1532,6 +1546,8 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler);
threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));

GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);

/* Start the RPC server already. It will be started in "warmup" mode
* and not really process calls already (but it will signify connections
* that the server is there and will be ready later). Warmup mode will
Expand Down
31 changes: 3 additions & 28 deletions src/qt/test/rpcnestedtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "rpc/server.h"
#include "rpcconsole.h"
#include "test/testutil.h"
#include "test/test_bitcoin.h"
#include "univalue.h"
#include "util.h"

Expand All @@ -38,28 +39,15 @@ void RPCNestedTests::rpcNestedTests()
{
// do some test setup
// could be moved to a more generic place when we add more tests on QT level
const CChainParams& chainparams = Params();
RegisterAllCoreRPCCommands(tableRPC);
tableRPC.appendCommand("rpcNestedTest", &vRPCCommands[0]);
ClearDatadirCache();
std::string path = QDir::tempPath().toStdString() + "/" + strprintf("test_dash_qt_%lu_%i", (unsigned long)GetTime(), (int)(GetRand(100000)));
QDir dir(QString::fromStdString(path));
dir.mkpath(".");
gArgs.ForceSetArg("-datadir", path);
//mempool.setSanityCheck(1.0);
evoDb = new CEvoDB(1 << 20, true, true);
pblocktree = new CBlockTreeDB(1 << 20, true);
pcoinsdbview = new CCoinsViewDB(1 << 23, true);
deterministicMNManager = new CDeterministicMNManager(*evoDb);
llmq::InitLLMQSystem(*evoDb, nullptr, true);

pcoinsTip = new CCoinsViewCache(pcoinsdbview);
InitBlockIndex(chainparams);
{
CValidationState state;
bool ok = ActivateBestChain(state, chainparams);
QVERIFY(ok);
}

TestingSetup test;

SetRPCWarmupFinished();

Expand Down Expand Up @@ -152,18 +140,5 @@ void RPCNestedTests::rpcNestedTests()
QVERIFY_EXCEPTION_THROWN(RPCConsole::RPCExecuteCommandLine(result, "rpcNestedTest(abc,,)"), std::runtime_error); //don't tollerate empty arguments when using ,
#endif

UnloadBlockIndex();
delete pcoinsTip;
pcoinsTip = nullptr;
llmq::DestroyLLMQSystem();
delete deterministicMNManager;
deterministicMNManager = nullptr;
delete pcoinsdbview;
pcoinsdbview = nullptr;
delete pblocktree;
pblocktree = nullptr;
delete evoDb;
evoDb = nullptr;

fs::remove_all(fs::path(path));
}
66 changes: 66 additions & 0 deletions src/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,69 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
}
return result;
}

bool CScheduler::AreThreadsServicingQueue() const {
return nThreadsServicingQueue;
}


void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
{
LOCK(m_cs_callbacks_pending);
// Try to avoid scheduling too many copies here, but if we
// accidentally have two ProcessQueue's scheduled at once its
// not a big deal.
if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return;
}
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
}

void SingleThreadedSchedulerClient::ProcessQueue() {
std::function<void (void)> callback;
{
LOCK(m_cs_callbacks_pending);
if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return;
m_are_callbacks_running = true;

callback = std::move(m_callbacks_pending.front());
m_callbacks_pending.pop_front();
}

// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
// to ensure both happen safely even if callback() throws.
struct RAIICallbacksRunning {
SingleThreadedSchedulerClient* instance;
RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
~RAIICallbacksRunning() {
{
LOCK(instance->m_cs_callbacks_pending);
instance->m_are_callbacks_running = false;
}
instance->MaybeScheduleProcessQueue();
}
} raiicallbacksrunning(this);

callback();
}

void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
assert(m_pscheduler);

{
LOCK(m_cs_callbacks_pending);
m_callbacks_pending.emplace_back(std::move(func));
}
MaybeScheduleProcessQueue();
}

void SingleThreadedSchedulerClient::EmptyQueue() {
assert(!m_pscheduler->AreThreadsServicingQueue());
bool should_continue = true;
while (should_continue) {
ProcessQueue();
LOCK(m_cs_callbacks_pending);
should_continue = !m_callbacks_pending.empty();
}
}
33 changes: 32 additions & 1 deletion src/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <boost/thread.hpp>
#include <map>

#include "sync.h"

//
// Simple class for background tasks that should be run
// periodically or once "after a while"
Expand Down Expand Up @@ -41,7 +43,7 @@ class CScheduler
typedef std::function<void(void)> Function;

// Call func at/after time t
void schedule(Function f, boost::chrono::system_clock::time_point t);
void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now());

// Convenience method: call f once deltaSeconds from now
void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
Expand Down Expand Up @@ -69,6 +71,9 @@ class CScheduler
size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
boost::chrono::system_clock::time_point &last) const;

// Returns true if there are threads actively running in serviceQueue()
bool AreThreadsServicingQueue() const;

private:
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
boost::condition_variable newTaskScheduled;
Expand All @@ -79,4 +84,30 @@ class CScheduler
bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
};

/**
* Class used by CScheduler clients which may schedule multiple jobs
* which are required to be run serially. Does not require such jobs
* to be executed on the same thread, but no two jobs will be executed
* at the same time.
*/
class SingleThreadedSchedulerClient {
private:
CScheduler *m_pscheduler;

CCriticalSection m_cs_callbacks_pending;
std::list<std::function<void (void)>> m_callbacks_pending;
bool m_are_callbacks_running = false;

void MaybeScheduleProcessQueue();
void ProcessQueue();

public:
SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
void AddToProcessQueue(std::function<void (void)> func);

// Processes all remaining queue members on the calling thread, blocking until queue is empty
// Must be called after the CScheduler has no remaining processing threads!
void EmptyQueue();
};

#endif
7 changes: 7 additions & 0 deletions src/test/test_dash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha
pathTemp = GetTempPath() / strprintf("test_dash_%lu_%i", (unsigned long)GetTime(), (int)(InsecureRandRange(100000)));
fs::create_directories(pathTemp);
gArgs.ForceSetArg("-datadir", pathTemp.string());

// Note that because we don't bother running a scheduler thread here,
// callbacks via CValidationInterface are unreliable, but that's OK,
// our unit tests aren't testing multiple parts of the code at once.
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
mempool.setSanityCheck(1.0);
g_connman = std::unique_ptr<CConnman>(new CConnman(0x1337, 0x1337)); // Deterministic randomness for tests.
connman = g_connman.get();
Expand Down Expand Up @@ -101,6 +106,8 @@ TestingSetup::~TestingSetup()
llmq::InterruptLLMQSystem();
threadGroup.interrupt_all();
threadGroup.join_all();
GetMainSignals().FlushBackgroundCallbacks();
GetMainSignals().UnregisterBackgroundSignalScheduler();
UnloadBlockIndex();
delete pcoinsTip;
llmq::DestroyLLMQSystem();
Expand Down
2 changes: 2 additions & 0 deletions src/test/test_dash.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "key.h"
#include "pubkey.h"
#include "random.h"
#include "scheduler.h"
#include "txdb.h"
#include "txmempool.h"

Expand Down Expand Up @@ -54,6 +55,7 @@ struct TestingSetup: public BasicTestingSetup {
fs::path pathTemp;
boost::thread_group threadGroup;
CConnman* connman;
CScheduler scheduler;

TestingSetup(const std::string& chainName = CBaseChainParams::MAIN);
~TestingSetup();
Expand Down
Loading

0 comments on commit f4f7a82

Please sign in to comment.