Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry locking of child TXs in batches instead of per locked parent #2858

Merged
merged 1 commit into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 36 additions & 15 deletions src/llmq/quorums_instantsend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,8 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
if (pindexMined) {
db.WriteInstantSendLockMined(hash, pindexMined->nHeight);
}

pendingRetryTxs.emplace(islock.txid);
}

CInv inv(MSG_ISLOCK, hash);
Expand All @@ -779,8 +781,6 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
}

RemoveMempoolConflictsForLock(hash, islock);
RetryLockTxs(islock.txid);

UpdateWalletTransaction(islock.txid, tx);
}

Expand Down Expand Up @@ -836,7 +836,8 @@ void CInstantSendManager::SyncTransaction(const CTransaction& tx, const CBlockIn

bool chainlocked = pindex && chainLocksHandler->HasChainLock(pindex->nHeight, pindex->GetBlockHash());
if (!islockHash.IsNull() || chainlocked) {
RetryLockTxs(tx.GetHash());
LOCK(cs);
pendingRetryTxs.emplace(tx.GetHash());
} else {
ProcessTx(tx, Params().GetConsensus());
}
Expand Down Expand Up @@ -883,14 +884,14 @@ void CInstantSendManager::HandleFullyConfirmedBlock(const CBlockIndex* pindex)
inputRequestIds.erase(inputRequestId);
}
}

// Retry all not yet locked mempool TXs and TX which where mined after the fully confirmed block
pendingRetryAllTxs = true;
}

for (auto& p : removeISLocks) {
UpdateWalletTransaction(p.second->txid, nullptr);
}

// Retry all not yet locked mempool TXs and TX which where mined after the fully confirmed block
RetryLockTxs(uint256());
}

void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock)
Expand All @@ -917,10 +918,23 @@ void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, con
}
}

void CInstantSendManager::RetryLockTxs(const uint256& lockedParentTx)
bool CInstantSendManager::ProcessPendingRetryLockTxs()
{
bool retryAllTxs;
decltype(pendingRetryTxs) parentTxs;
{
LOCK(cs);
retryAllTxs = pendingRetryAllTxs;
parentTxs = std::move(pendingRetryTxs);
pendingRetryAllTxs = false;
}

if (!retryAllTxs && parentTxs.empty()) {
return false;
}

if (!IsNewInstantSendEnabled()) {
return;
return false;
}

// Let's retry all unlocked TXs from mempool and and recently connected blocks
Expand All @@ -930,16 +944,18 @@ void CInstantSendManager::RetryLockTxs(const uint256& lockedParentTx)
{
LOCK(mempool.cs);

if (lockedParentTx.IsNull()) {
if (retryAllTxs) {
txs.reserve(mempool.mapTx.size());
for (auto it = mempool.mapTx.begin(); it != mempool.mapTx.end(); ++it) {
txs.emplace(it->GetTx().GetHash(), it->GetSharedTx());
}
} else {
auto it = mempool.mapNextTx.lower_bound(COutPoint(lockedParentTx, 0));
while (it != mempool.mapNextTx.end() && it->first->hash == lockedParentTx) {
txs.emplace(it->second->GetHash(), mempool.get(it->second->GetHash()));
++it;
for (const auto& parentTx : parentTxs) {
auto it = mempool.mapNextTx.lower_bound(COutPoint(parentTx, 0));
while (it != mempool.mapNextTx.end() && it->first->hash == parentTx) {
txs.emplace(it->second->GetHash(), mempool.get(it->second->GetHash()));
++it;
}
}
}
}
Expand Down Expand Up @@ -969,12 +985,12 @@ void CInstantSendManager::RetryLockTxs(const uint256& lockedParentTx)
}

for (const auto& tx : block.vtx) {
if (lockedParentTx.IsNull()) {
if (retryAllTxs) {
txs.emplace(tx->GetHash(), tx);
} else {
bool isChild = false;
for (auto& in : tx->vin) {
if (in.prevout.hash == lockedParentTx) {
if (parentTxs.count(in.prevout.hash)) {
isChild = true;
break;
}
Expand All @@ -989,6 +1005,7 @@ void CInstantSendManager::RetryLockTxs(const uint256& lockedParentTx)
depth++;
}

bool didWork = false;
for (auto& p : txs) {
auto& tx = p.second;
{
Expand Down Expand Up @@ -1017,7 +1034,10 @@ void CInstantSendManager::RetryLockTxs(const uint256& lockedParentTx)
}

ProcessTx(*tx, Params().GetConsensus());
didWork = true;
}

return didWork;
}

bool CInstantSendManager::AlreadyHave(const CInv& inv)
Expand Down Expand Up @@ -1089,6 +1109,7 @@ void CInstantSendManager::WorkThreadMain()
bool didWork = false;

didWork |= ProcessPendingInstantSendLocks();
didWork |= ProcessPendingRetryLockTxs();

if (!didWork) {
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
Expand Down
6 changes: 5 additions & 1 deletion src/llmq/quorums_instantsend.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class CInstantSendManager : public CRecoveredSigsListener
// Incoming and not verified yet
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLock>> pendingInstantSendLocks;

// a set of recently IS locked TXs for which we can retry locking of children
std::unordered_set<uint256, StaticSaltedHasher> pendingRetryTxs;
bool pendingRetryAllTxs{false};

public:
CInstantSendManager(CDBWrapper& _llmqDb);
~CInstantSendManager();
Expand Down Expand Up @@ -129,7 +133,7 @@ class CInstantSendManager : public CRecoveredSigsListener
void HandleFullyConfirmedBlock(const CBlockIndex* pindex);

void RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock);
void RetryLockTxs(const uint256& lockedParentTx);
bool ProcessPendingRetryLockTxs();

bool AlreadyHave(const CInv& inv);
bool GetInstantSendLockByHash(const uint256& hash, CInstantSendLock& ret);
Expand Down