Skip to content

Commit

Permalink
Rework handling of CSigSharesManager worker thread (#2703)
Browse files Browse the repository at this point in the history
  • Loading branch information
UdjinM6 authored and codablock committed Feb 15, 2019
1 parent 3e4286a commit bedfc26
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ void Interrupt(boost::thread_group& threadGroup)
InterruptRPC();
InterruptREST();
InterruptTorControl();
llmq::InterruptLLMQSystem();
if (g_connman)
g_connman->Interrupt();
threadGroup.interrupt_all();
Expand Down
9 changes: 5 additions & 4 deletions src/llmq/quorums_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests)
quorumSigSharesManager = new CSigSharesManager();
quorumSigningManager = new CSigningManager(unitTests);
chainLocksHandler = new CChainLocksHandler(scheduler);

quorumSigSharesManager->StartWorkerThread();
}

void DestroyLLMQSystem()
void InterruptLLMQSystem()
{
if (quorumSigSharesManager) {
quorumSigSharesManager->StopWorkerThread();
quorumSigSharesManager->InterruptWorkerThread();
}
}

void DestroyLLMQSystem()
{
delete chainLocksHandler;
chainLocksHandler = nullptr;
delete quorumSigningManager;
Expand Down
1 change: 1 addition & 0 deletions src/llmq/quorums_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace llmq
static const bool DEFAULT_WATCH_QUORUMS = false;

void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests);
void InterruptLLMQSystem();
void DestroyLLMQSystem();

}
Expand Down
34 changes: 22 additions & 12 deletions src/llmq/quorums_signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ CSigSharesInv CBatchedSigShares::ToInv() const

CSigSharesManager::CSigSharesManager()
{
StartWorkerThread();
}

CSigSharesManager::~CSigSharesManager()
Expand All @@ -185,24 +186,23 @@ CSigSharesManager::~CSigSharesManager()

void CSigSharesManager::StartWorkerThread()
{
workThread = std::thread([this]() {
RenameThread("quorum-sigshares");
WorkThreadMain();
});
workThread = std::thread(&TraceThread<std::function<void()> >,
"sigshares",
std::function<void()>(std::bind(&CSigSharesManager::WorkThreadMain, this)));
}

void CSigSharesManager::StopWorkerThread()
{
if (stopWorkThread) {
return;
}

stopWorkThread = true;
if (workThread.joinable()) {
workThread.join();
}
}

void CSigSharesManager::InterruptWorkerThread()
{
workInterrupt();
}

void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman)
{
// non-masternodes are not interested in sigshares
Expand Down Expand Up @@ -1096,8 +1096,16 @@ void CSigSharesManager::BanNode(NodeId nodeId)

void CSigSharesManager::WorkThreadMain()
{
int64_t lastProcessTime = GetTimeMillis();
while (!stopWorkThread && !ShutdownRequested()) {
workInterrupt.reset();

while (!workInterrupt) {
if (!quorumSigningManager || !g_connman || !quorumSigningManager) {
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return;
}
continue;
}

RemoveBannedNodeStates();
quorumSigningManager->ProcessPendingRecoveredSigs(*g_connman);
ProcessPendingSigShares(*g_connman);
Expand All @@ -1107,7 +1115,9 @@ void CSigSharesManager::WorkThreadMain()
quorumSigningManager->Cleanup();

// TODO Wakeup when pending signing is needed?
MilliSleep(100);
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return;
}
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/llmq/quorums_signing_shares.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class CSigSharesManager
CCriticalSection cs;

std::thread workThread;
std::atomic<bool> stopWorkThread{false};
CThreadInterrupt workInterrupt;

std::map<SigShareKey, CSigShare> sigShares;
std::map<uint256, int64_t> firstSeenForSessions;
Expand All @@ -214,8 +214,7 @@ class CSigSharesManager
CSigSharesManager();
~CSigSharesManager();

void StartWorkerThread();
void StopWorkerThread();
void InterruptWorkerThread();

public:
void ProcessMessage(CNode* pnode, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
Expand All @@ -224,6 +223,9 @@ class CSigSharesManager
void Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);

private:
void StartWorkerThread();
void StopWorkerThread();

void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman);
Expand Down
1 change: 1 addition & 0 deletions src/test/test_dash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha
TestingSetup::~TestingSetup()
{
UnregisterNodeSignals(GetNodeSignals());
llmq::InterruptLLMQSystem();
threadGroup.interrupt_all();
threadGroup.join_all();
UnloadBlockIndex();
Expand Down

0 comments on commit bedfc26

Please sign in to comment.