Permalink
Browse files

Merge #10179: Give CValidationInterface Support for calling notificat…

…ions on the CScheduler Thread


1f668b6 Expose if CScheduler is being serviced, assert its not in EmptyQueue (Matt Corallo)
3192975 Flush CValidationInterface callbacks prior to destruction (Matt Corallo)
08096bb Support more than one CScheduler thread for serial clients (Matt Corallo)
2fbf2db Add default arg to CScheduler to schedule() a callback now (Matt Corallo)
cda1429 Give CMainSignals a reference to the global scheduler (Matt Corallo)
3a19fed Make ValidationInterface signals-type-agnostic (Matt Corallo)
ff6a834 Use TestingSetup to DRY qt rpcnestedtests (Matt Corallo)

Tree-SHA512: fab91e34e30b080ed4d0a6d8c1214910e383c45440676e37be61d0bde6ae98d61e8903d22b846e95ba4e73a6ce788798350266feba246d8a2ab357e8523e4ac5
  • Loading branch information...
2 parents 9edda0c + 1f668b6 commit 21ed30a314cf2118bd4719c6a09985f69fd25dbf @laanwj laanwj committed Jul 11, 2017
Showing with 274 additions and 73 deletions.
  1. +16 −0 src/init.cpp
  2. +3 −19 src/qt/test/rpcnestedtests.cpp
  3. +66 −0 src/scheduler.cpp
  4. +32 −1 src/scheduler.h
  5. +8 −0 src/test/test_bitcoin.cpp
  6. +2 −0 src/test/test_bitcoin.h
  7. +104 −27 src/validationinterface.cpp
  8. +43 −26 src/validationinterface.h
View
@@ -215,6 +215,19 @@ void Shutdown()
fFeeEstimatesInitialized = false;
}
+ // FlushStateToDisk generates a SetBestChain callback, which we should avoid missing
+ FlushStateToDisk();
+
+ // After there are no more peers/RPC left to give us new data which may generate
+ // CValidationInterface callbacks, flush them...
+ GetMainSignals().FlushBackgroundCallbacks();
+
+ // Any future callbacks will be dropped. This should absolutely be safe - if
+ // missing a callback results in an unrecoverable situation, unclean shutdown
+ // would too. The only reason to do the above flushes is to let the wallet catch
+ // up with our current chain to avoid any strange pruning edge cases and make
+ // next startup faster by avoiding rescan.
+
{
LOCK(cs_main);
if (pcoinsTip != NULL) {
@@ -251,6 +264,7 @@ void Shutdown()
}
#endif
UnregisterAllValidationInterfaces();
+ GetMainSignals().UnregisterBackgroundSignalScheduler();
#ifdef ENABLE_WALLET
for (CWalletRef pwallet : vpwallets) {
delete pwallet;
@@ -1203,6 +1217,8 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler);
threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
+ GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
+
/* Start the RPC server already. It will be started in "warmup" mode
* and not really process calls already (but it will signify connections
* that the server is there and will be ready later). Warmup mode will
@@ -12,6 +12,7 @@
#include "rpc/server.h"
#include "rpcconsole.h"
#include "test/testutil.h"
+#include "test/test_bitcoin.h"
#include "univalue.h"
#include "util.h"
@@ -35,24 +36,15 @@ void RPCNestedTests::rpcNestedTests()
{
// do some test setup
// could be moved to a more generic place when we add more tests on QT level
- const CChainParams& chainparams = Params();
- RegisterAllCoreRPCCommands(tableRPC);
tableRPC.appendCommand("rpcNestedTest", &vRPCCommands[0]);
ClearDatadirCache();
std::string path = QDir::tempPath().toStdString() + "/" + strprintf("test_bitcoin_qt_%lu_%i", (unsigned long)GetTime(), (int)(GetRand(100000)));
QDir dir(QString::fromStdString(path));
dir.mkpath(".");
ForceSetArg("-datadir", path);
//mempool.setSanityCheck(1.0);
- pblocktree = new CBlockTreeDB(1 << 20, true);
- pcoinsdbview = new CCoinsViewDB(1 << 23, true);
- pcoinsTip = new CCoinsViewCache(pcoinsdbview);
- InitBlockIndex(chainparams);
- {
- CValidationState state;
- bool ok = ActivateBestChain(state, chainparams);
- QVERIFY(ok);
- }
+
+ TestingSetup test;
SetRPCWarmupFinished();
@@ -145,13 +137,5 @@ void RPCNestedTests::rpcNestedTests()
QVERIFY_EXCEPTION_THROWN(RPCConsole::RPCExecuteCommandLine(result, "rpcNestedTest(abc,,)"), std::runtime_error); //don't tollerate empty arguments when using ,
#endif
- UnloadBlockIndex();
- delete pcoinsTip;
- pcoinsTip = nullptr;
- delete pcoinsdbview;
- pcoinsdbview = nullptr;
- delete pblocktree;
- pblocktree = nullptr;
-
fs::remove_all(fs::path(path));
}
View
@@ -139,3 +139,69 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
}
return result;
}
+
+bool CScheduler::AreThreadsServicingQueue() const {
+ return nThreadsServicingQueue;
+}
+
+
+void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
+ {
+ LOCK(m_cs_callbacks_pending);
+ // Try to avoid scheduling too many copies here, but if we
+ // accidentally have two ProcessQueue's scheduled at once its
+ // not a big deal.
+ if (m_are_callbacks_running) return;
+ if (m_callbacks_pending.empty()) return;
+ }
+ m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
+}
+
+void SingleThreadedSchedulerClient::ProcessQueue() {
+ std::function<void (void)> callback;
+ {
+ LOCK(m_cs_callbacks_pending);
+ if (m_are_callbacks_running) return;
+ if (m_callbacks_pending.empty()) return;
+ m_are_callbacks_running = true;
+
+ callback = std::move(m_callbacks_pending.front());
+ m_callbacks_pending.pop_front();
+ }
+
+ // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
+ // to ensure both happen safely even if callback() throws.
+ struct RAIICallbacksRunning {
+ SingleThreadedSchedulerClient* instance;
+ RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
+ ~RAIICallbacksRunning() {
+ {
+ LOCK(instance->m_cs_callbacks_pending);
+ instance->m_are_callbacks_running = false;
+ }
+ instance->MaybeScheduleProcessQueue();
+ }
+ } raiicallbacksrunning(this);
+
+ callback();
+}
+
+void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
+ assert(m_pscheduler);
+
+ {
+ LOCK(m_cs_callbacks_pending);
+ m_callbacks_pending.emplace_back(std::move(func));
+ }
+ MaybeScheduleProcessQueue();
+}
+
+void SingleThreadedSchedulerClient::EmptyQueue() {
+ assert(!m_pscheduler->AreThreadsServicingQueue());
+ bool should_continue = true;
+ while (should_continue) {
+ ProcessQueue();
+ LOCK(m_cs_callbacks_pending);
+ should_continue = !m_callbacks_pending.empty();
+ }
+}
View
@@ -14,6 +14,8 @@
#include <boost/thread.hpp>
#include <map>
+#include "sync.h"
+
//
// Simple class for background tasks that should be run
// periodically or once "after a while"
@@ -41,7 +43,7 @@ class CScheduler
typedef std::function<void(void)> Function;
// Call func at/after time t
- void schedule(Function f, boost::chrono::system_clock::time_point t);
+ void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now());
// Convenience method: call f once deltaSeconds from now
void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
@@ -69,6 +71,9 @@ class CScheduler
size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
boost::chrono::system_clock::time_point &last) const;
+ // Returns true if there are threads actively running in serviceQueue()
+ bool AreThreadsServicingQueue() const;
+
private:
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
boost::condition_variable newTaskScheduled;
@@ -79,4 +84,30 @@ class CScheduler
bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
};
+/**
+ * Class used by CScheduler clients which may schedule multiple jobs
+ * which are required to be run serially. Does not require such jobs
+ * to be executed on the same thread, but no two jobs will be executed
+ * at the same time.
+ */
+class SingleThreadedSchedulerClient {
+private:
+ CScheduler *m_pscheduler;
+
+ CCriticalSection m_cs_callbacks_pending;
+ std::list<std::function<void (void)>> m_callbacks_pending;
+ bool m_are_callbacks_running = false;
+
+ void MaybeScheduleProcessQueue();
+ void ProcessQueue();
+
+public:
+ SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
+ void AddToProcessQueue(std::function<void (void)> func);
+
+ // Processes all remaining queue members on the calling thread, blocking until queue is empty
+ // Must be called after the CScheduler has no remaining processing threads!
+ void EmptyQueue();
+};
+
#endif
@@ -62,6 +62,12 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha
pathTemp = GetTempPath() / strprintf("test_bitcoin_%lu_%i", (unsigned long)GetTime(), (int)(InsecureRandRange(100000)));
fs::create_directories(pathTemp);
ForceSetArg("-datadir", pathTemp.string());
+
+ // Note that because we don't bother running a scheduler thread here,
+ // callbacks via CValidationInterface are unreliable, but that's OK,
+ // our unit tests aren't testing multiple parts of the code at once.
+ GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
+
mempool.setSanityCheck(1.0);
pblocktree = new CBlockTreeDB(1 << 20, true);
pcoinsdbview = new CCoinsViewDB(1 << 23, true);
@@ -88,6 +94,8 @@ TestingSetup::~TestingSetup()
UnregisterNodeSignals(GetNodeSignals());
threadGroup.interrupt_all();
threadGroup.join_all();
+ GetMainSignals().FlushBackgroundCallbacks();
+ GetMainSignals().UnregisterBackgroundSignalScheduler();
UnloadBlockIndex();
delete pcoinsTip;
delete pcoinsdbview;
View
@@ -10,6 +10,7 @@
#include "key.h"
#include "pubkey.h"
#include "random.h"
+#include "scheduler.h"
#include "txdb.h"
#include "txmempool.h"
@@ -53,6 +54,7 @@ struct TestingSetup: public BasicTestingSetup {
fs::path pathTemp;
boost::thread_group threadGroup;
CConnman* connman;
+ CScheduler scheduler;
TestingSetup(const std::string& chainName = CBaseChainParams::MAIN);
~TestingSetup();
Oops, something went wrong.

0 comments on commit 21ed30a

Please sign in to comment.