Skip to content

Commit

Permalink
Separate init/destroy and start/stop steps in LLMQ flow (#2709)
Browse files Browse the repository at this point in the history
  • Loading branch information
UdjinM6 committed Feb 17, 2019
1 parent 9f58690 commit 26db020
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 19 deletions.
3 changes: 3 additions & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ void PrepareShutdown()
StopREST();
StopRPC();
StopHTTPServer();
llmq::StopLLMQSystem();

// fRPCInWarmup should be `false` if we completed the loading sequence
// before a shutdown request was received
Expand Down Expand Up @@ -2066,6 +2067,8 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
#endif // ENABLE_WALLET
}

llmq::StartLLMQSystem();

// ********************************************************* Step 12: start node

//// debug print
Expand Down
10 changes: 9 additions & 1 deletion src/llmq/quorums_chainlocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,18 @@ std::string CChainLockSig::ToString() const
CChainLocksHandler::CChainLocksHandler(CScheduler* _scheduler) :
scheduler(_scheduler)
{
quorumSigningManager->RegisterRecoveredSigsListener(this);
}

CChainLocksHandler::~CChainLocksHandler()
{
}

void CChainLocksHandler::RegisterAsRecoveredSigsListener()
{
quorumSigningManager->RegisterRecoveredSigsListener(this);
}

void CChainLocksHandler::UnregisterAsRecoveredSigsListener()
{
quorumSigningManager->UnregisterRecoveredSigsListener(this);
}
Expand Down
4 changes: 3 additions & 1 deletion src/llmq/quorums_chainlocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class CChainLocksHandler : public CRecoveredSigsListener
CChainLocksHandler(CScheduler* _scheduler);
~CChainLocksHandler();

public:
void RegisterAsRecoveredSigsListener();
void UnregisterAsRecoveredSigsListener();

bool AlreadyHave(const CInv& inv);
bool GetChainLockByHash(const uint256& hash, CChainLockSig& ret);

Expand Down
4 changes: 4 additions & 0 deletions src/llmq/quorums_debug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ UniValue CDKGDebugSessionStatus::ToJson(int detailLevel) const

CDKGDebugManager::CDKGDebugManager(CScheduler* _scheduler) :
scheduler(_scheduler)
{
}

void CDKGDebugManager::StartScheduler()
{
if (scheduler) {
scheduler->scheduleEvery([&]() {
Expand Down
2 changes: 2 additions & 0 deletions src/llmq/quorums_debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class CDKGDebugManager
public:
CDKGDebugManager(CScheduler* _scheduler);

void StartScheduler();

void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
bool PreVerifyDebugStatusMessage(const uint256& hash, CDKGDebugStatus& status, bool& retBan);
void ScheduleProcessPending();
Expand Down
12 changes: 10 additions & 2 deletions src/llmq/quorums_dkgsessionmgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,26 @@ static const std::string DB_SKCONTRIB = "qdkg_S";
CDKGSessionManager::CDKGSessionManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker) :
evoDb(_evoDb),
blsWorker(_blsWorker)
{
}

CDKGSessionManager::~CDKGSessionManager()
{
}

void CDKGSessionManager::StartMessageHandlerPool()
{
for (const auto& qt : Params().GetConsensus().llmqs) {
dkgSessionHandlers.emplace(std::piecewise_construct,
std::forward_as_tuple(qt.first),
std::forward_as_tuple(qt.second, _evoDb, messageHandlerPool, blsWorker, *this));
std::forward_as_tuple(qt.second, evoDb, messageHandlerPool, blsWorker, *this));
}

messageHandlerPool.resize(2);
RenameThreadPool(messageHandlerPool, "quorum-msg");
}

CDKGSessionManager::~CDKGSessionManager()
void CDKGSessionManager::StopMessageHandlerPool()
{
messageHandlerPool.stop(true);
}
Expand Down
3 changes: 3 additions & 0 deletions src/llmq/quorums_dkgsessionmgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class CDKGSessionManager
CDKGSessionManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker);
~CDKGSessionManager();

void StartMessageHandlerPool();
void StopMessageHandlerPool();

void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);

void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
Expand Down
43 changes: 36 additions & 7 deletions src/llmq/quorums_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests)
chainLocksHandler = new CChainLocksHandler(scheduler);
}

void InterruptLLMQSystem()
{
if (quorumSigSharesManager) {
quorumSigSharesManager->InterruptWorkerThread();
}
}

void DestroyLLMQSystem()
{
delete chainLocksHandler;
Expand All @@ -56,4 +49,40 @@ void DestroyLLMQSystem()
quorumDKGDebugManager = nullptr;
}

void StartLLMQSystem()
{
if (quorumDKGDebugManager) {
quorumDKGDebugManager->StartScheduler();
}
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StartMessageHandlerPool();
}
if (quorumSigSharesManager) {
quorumSigSharesManager->StartWorkerThread();
}
if (chainLocksHandler) {
chainLocksHandler->RegisterAsRecoveredSigsListener();
}
}

void StopLLMQSystem()
{
if (chainLocksHandler) {
chainLocksHandler->UnregisterAsRecoveredSigsListener();
}
if (quorumSigSharesManager) {
quorumSigSharesManager->StopWorkerThread();
}
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StopMessageHandlerPool();
}
}

void InterruptLLMQSystem()
{
if (quorumSigSharesManager) {
quorumSigSharesManager->InterruptWorkerThread();
}
}

}
6 changes: 5 additions & 1 deletion src/llmq/quorums_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ namespace llmq
// If true, we will connect to all new quorums and watch their communication
static const bool DEFAULT_WATCH_QUORUMS = false;

// Init/destroy LLMQ globals
void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests);
void InterruptLLMQSystem();
void DestroyLLMQSystem();

// Manage scheduled tasks, threads, listeners etc.
void StartLLMQSystem();
void StopLLMQSystem();
void InterruptLLMQSystem();
}

#endif //DASH_QUORUMS_INIT_H
15 changes: 11 additions & 4 deletions src/llmq/quorums_signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,23 +148,32 @@ CSigSharesInv CBatchedSigShares::ToInv() const

CSigSharesManager::CSigSharesManager()
{
StartWorkerThread();
workInterrupt.reset();
}

CSigSharesManager::~CSigSharesManager()
{
StopWorkerThread();
}

void CSigSharesManager::StartWorkerThread()
{
// can't start new thread if we have one running already
if (workThread.joinable()) {
assert(false);
}

workThread = std::thread(&TraceThread<std::function<void()> >,
"sigshares",
std::function<void()>(std::bind(&CSigSharesManager::WorkThreadMain, this)));
}

void CSigSharesManager::StopWorkerThread()
{
// make sure to call InterruptWorkerThread() first
if (!workInterrupt) {
assert(false);
}

if (workThread.joinable()) {
workThread.join();
}
Expand Down Expand Up @@ -1086,8 +1095,6 @@ void CSigSharesManager::BanNode(NodeId nodeId)

void CSigSharesManager::WorkThreadMain()
{
workInterrupt.reset();

int64_t lastSendTime = 0;

while (!workInterrupt) {
Expand Down
5 changes: 2 additions & 3 deletions src/llmq/quorums_signing_shares.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ class CSigSharesManager
CSigSharesManager();
~CSigSharesManager();

void StartWorkerThread();
void StopWorkerThread();
void InterruptWorkerThread();

public:
Expand All @@ -370,9 +372,6 @@ class CSigSharesManager
void Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);

private:
void StartWorkerThread();
void StopWorkerThread();

void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman);
Expand Down

0 comments on commit 26db020

Please sign in to comment.