Skip to content

Commit

Permalink
Move processing of InstantSend locks into its own worker thread (#2857)
Browse files Browse the repository at this point in the history
* Let ProcessPendingInstantSendLocks return true when it did some work

* Introduce own worker thread for CInstantSendManager

Instead of using the scheduler.

* Remove scheduler from CInstantSendManager

* Add missing reset() call for workInterrupt
  • Loading branch information
codablock authored and UdjinM6 committed Apr 11, 2019
1 parent ae78360 commit 90b1b71
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 27 deletions.
9 changes: 6 additions & 3 deletions src/llmq/quorums_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests, bool f
quorumSigSharesManager = new CSigSharesManager();
quorumSigningManager = new CSigningManager(*llmqDb, unitTests);
chainLocksHandler = new CChainLocksHandler(scheduler);
quorumInstantSendManager = new CInstantSendManager(scheduler, *llmqDb);
quorumInstantSendManager = new CInstantSendManager(*llmqDb);
}

void DestroyLLMQSystem()
Expand Down Expand Up @@ -84,14 +84,14 @@ void StartLLMQSystem()
chainLocksHandler->Start();
}
if (quorumInstantSendManager) {
quorumInstantSendManager->RegisterAsRecoveredSigsListener();
quorumInstantSendManager->Start();
}
}

void StopLLMQSystem()
{
if (quorumInstantSendManager) {
quorumInstantSendManager->UnregisterAsRecoveredSigsListener();
quorumInstantSendManager->Stop();
}
if (chainLocksHandler) {
chainLocksHandler->Stop();
Expand All @@ -113,6 +113,9 @@ void InterruptLLMQSystem()
if (quorumSigSharesManager) {
quorumSigSharesManager->InterruptWorkerThread();
}
if (quorumInstantSendManager) {
quorumInstantSendManager->InterruptWorkerThread();
}
}

}
66 changes: 50 additions & 16 deletions src/llmq/quorums_instantsend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "txmempool.h"
#include "masternode-sync.h"
#include "net_processing.h"
#include "scheduler.h"
#include "spork.h"
#include "validation.h"

Expand All @@ -24,6 +23,7 @@
#include "instantx.h"

#include <boost/algorithm/string/replace.hpp>
#include <boost/thread.hpp>

namespace llmq
{
Expand Down Expand Up @@ -208,24 +208,45 @@ CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByInput(const COutPoint& o

////////////////

CInstantSendManager::CInstantSendManager(CScheduler* _scheduler, CDBWrapper& _llmqDb) :
scheduler(_scheduler),
CInstantSendManager::CInstantSendManager(CDBWrapper& _llmqDb) :
db(_llmqDb)
{
workInterrupt.reset();
}

CInstantSendManager::~CInstantSendManager()
{
}

void CInstantSendManager::RegisterAsRecoveredSigsListener()
void CInstantSendManager::Start()
{
// can't start new thread if we have one running already
if (workThread.joinable()) {
assert(false);
}

workThread = std::thread(&TraceThread<std::function<void()> >, "instantsend", std::function<void()>(std::bind(&CInstantSendManager::WorkThreadMain, this)));

quorumSigningManager->RegisterRecoveredSigsListener(this);
}

void CInstantSendManager::UnregisterAsRecoveredSigsListener()
void CInstantSendManager::Stop()
{
quorumSigningManager->UnregisterRecoveredSigsListener(this);

// make sure to call InterruptWorkerThread() first
if (!workInterrupt) {
assert(false);
}

if (workThread.joinable()) {
workThread.join();
}
}

void CInstantSendManager::InterruptWorkerThread()
{
workInterrupt();
}

bool CInstantSendManager::ProcessTx(const CTransaction& tx, const Consensus::Params& params)
Expand Down Expand Up @@ -552,13 +573,6 @@ void CInstantSendManager::ProcessMessageInstantSendLock(CNode* pfrom, const llmq
islock.txid.ToString(), hash.ToString(), pfrom->id);

pendingInstantSendLocks.emplace(hash, std::make_pair(pfrom->id, std::move(islock)));

if (!hasScheduledProcessPending) {
hasScheduledProcessPending = true;
scheduler->scheduleFromNow([&] {
ProcessPendingInstantSendLocks();
}, 100);
}
}

bool CInstantSendManager::PreVerifyInstantSendLock(NodeId nodeId, const llmq::CInstantSendLock& islock, bool& retBan)
Expand All @@ -581,20 +595,23 @@ bool CInstantSendManager::PreVerifyInstantSendLock(NodeId nodeId, const llmq::CI
return true;
}

void CInstantSendManager::ProcessPendingInstantSendLocks()
bool CInstantSendManager::ProcessPendingInstantSendLocks()
{
auto llmqType = Params().GetConsensus().llmqForInstantSend;

decltype(pendingInstantSendLocks) pend;

{
LOCK(cs);
hasScheduledProcessPending = false;
pend = std::move(pendingInstantSendLocks);
}

if (pend.empty()) {
return false;
}

if (!IsNewInstantSendEnabled()) {
return;
return false;
}

int tipHeight;
Expand All @@ -621,7 +638,7 @@ void CInstantSendManager::ProcessPendingInstantSendLocks()
auto quorum = quorumSigningManager->SelectQuorumForSigning(llmqType, tipHeight, id);
if (!quorum) {
// should not happen, but if one fails to select, all others will also fail to select
return;
return false;
}
uint256 signHash = CLLMQUtils::BuildSignHash(llmqType, quorum->qc.quorumHash, id, islock.txid);
batchVerifier.PushMessage(nodeId, hash, signHash, islock.sig, quorum->qc.quorumPublicKey);
Expand Down Expand Up @@ -679,6 +696,8 @@ void CInstantSendManager::ProcessPendingInstantSendLocks()
}
}
}

return true;
}

void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLock& islock)
Expand Down Expand Up @@ -1052,6 +1071,21 @@ bool CInstantSendManager::GetConflictingTx(const CTransaction& tx, uint256& retC
return false;
}

void CInstantSendManager::WorkThreadMain()
{
while (!workInterrupt) {
bool didWork = false;

didWork |= ProcessPendingInstantSendLocks();

if (!didWork) {
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return;
}
}
}
}

bool IsOldInstantSendEnabled()
{
return sporkManager.IsSporkActive(SPORK_2_INSTANTSEND_ENABLED) && !sporkManager.IsSporkActive(SPORK_20_INSTANTSEND_LLMQ_BASED);
Expand Down
18 changes: 10 additions & 8 deletions src/llmq/quorums_instantsend.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
#include <unordered_map>
#include <unordered_set>

class CScheduler;

namespace llmq
{

Expand Down Expand Up @@ -72,9 +70,11 @@ class CInstantSendManager : public CRecoveredSigsListener
{
private:
CCriticalSection cs;
CScheduler* scheduler;
CInstantSendDb db;

std::thread workThread;
CThreadInterrupt workInterrupt;

/**
* Request ids of inputs that we signed. Used to determine if a recovered signature belongs to an
* in-progress input lock.
Expand All @@ -92,14 +92,14 @@ class CInstantSendManager : public CRecoveredSigsListener

// Incoming and not verified yet
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLock>> pendingInstantSendLocks;
bool hasScheduledProcessPending{false};

public:
CInstantSendManager(CScheduler* _scheduler, CDBWrapper& _llmqDb);
CInstantSendManager(CDBWrapper& _llmqDb);
~CInstantSendManager();

void RegisterAsRecoveredSigsListener();
void UnregisterAsRecoveredSigsListener();
void Start();
void Stop();
void InterruptWorkerThread();

public:
bool ProcessTx(const CTransaction& tx, const Consensus::Params& params);
Expand All @@ -118,7 +118,7 @@ class CInstantSendManager : public CRecoveredSigsListener
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
void ProcessMessageInstantSendLock(CNode* pfrom, const CInstantSendLock& islock, CConnman& connman);
bool PreVerifyInstantSendLock(NodeId nodeId, const CInstantSendLock& islock, bool& retBan);
void ProcessPendingInstantSendLocks();
bool ProcessPendingInstantSendLocks();
void ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLock& islock);
void UpdateWalletTransaction(const uint256& txid, const CTransactionRef& tx);

Expand All @@ -133,6 +133,8 @@ class CInstantSendManager : public CRecoveredSigsListener

bool AlreadyHave(const CInv& inv);
bool GetInstantSendLockByHash(const uint256& hash, CInstantSendLock& ret);

void WorkThreadMain();
};

extern CInstantSendManager* quorumInstantSendManager;
Expand Down

0 comments on commit 90b1b71

Please sign in to comment.