diff --git a/src/llmq/quorums_init.cpp b/src/llmq/quorums_init.cpp index 747b73e34cdb5..641be871cbf43 100644 --- a/src/llmq/quorums_init.cpp +++ b/src/llmq/quorums_init.cpp @@ -35,7 +35,7 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests) quorumSigSharesManager = new CSigSharesManager(); quorumSigningManager = new CSigningManager(*llmqDb, unitTests); chainLocksHandler = new CChainLocksHandler(scheduler); - quorumInstantSendManager = new CInstantSendManager(scheduler); + quorumInstantSendManager = new CInstantSendManager(scheduler, *llmqDb); } void DestroyLLMQSystem() diff --git a/src/llmq/quorums_instantsend.cpp b/src/llmq/quorums_instantsend.cpp index 32ccdb749face..b79a14fbe8ffc 100644 --- a/src/llmq/quorums_instantsend.cpp +++ b/src/llmq/quorums_instantsend.cpp @@ -41,8 +41,122 @@ uint256 CInstantSendLock::GetRequestId() const return hw.GetHash(); } -CInstantSendManager::CInstantSendManager(CScheduler* _scheduler) : - scheduler(_scheduler) +//////////////// + +void CInstantSendDb::WriteNewInstantSendLock(const uint256& hash, const CInstantSendLock& islock) +{ + CDBBatch batch(db); + batch.Write(std::make_tuple(std::string("is_i"), hash), islock); + batch.Write(std::make_tuple(std::string("is_tx"), islock.txid), hash); + for (auto& in : islock.inputs) { + batch.Write(std::make_tuple(std::string("is_in"), in), hash); + } + db.WriteBatch(batch); + + auto p = std::make_shared(islock); + islockCache.insert(hash, p); + txidCache.insert(islock.txid, hash); + for (auto& in : islock.inputs) { + outpointCache.insert(in, hash); + } +} + +void CInstantSendDb::RemoveInstantSendLock(const uint256& hash, CInstantSendLockPtr islock) +{ + if (!islock) { + islock = GetInstantSendLockByHash(hash); + if (!islock) { + return; + } + } + + CDBBatch batch(db); + batch.Erase(std::make_tuple(std::string("is_i"), hash)); + batch.Erase(std::make_tuple(std::string("is_tx"), islock->txid)); + for (auto& in : islock->inputs) { + batch.Erase(std::make_tuple(std::string("is_in"), in)); + } + db.WriteBatch(batch); + + islockCache.erase(hash); + txidCache.erase(islock->txid); + for (auto& in : islock->inputs) { + outpointCache.erase(in); + } +} + +CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByHash(const uint256& hash) +{ + CInstantSendLockPtr ret; + if (islockCache.get(hash, ret)) { + return ret; + } + + ret = std::make_shared(); + bool exists = db.Read(std::make_tuple(std::string("is_i"), hash), *ret); + if (!exists) { + ret = nullptr; + } + islockCache.insert(hash, ret); + return ret; +} + +CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByTxid(const uint256& txid) +{ + uint256 islockHash; + + bool found = txidCache.get(txid, islockHash); + if (found && islockHash.IsNull()) { + return nullptr; + } + + if (!found) { + found = db.Read(std::make_tuple(std::string("is_tx"), txid), islockHash); + txidCache.insert(txid, islockHash); + } + + if (!found) { + return nullptr; + } + return GetInstantSendLockByHash(islockHash); +} + +CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByInput(const COutPoint& outpoint) +{ + uint256 islockHash; + bool found = outpointCache.get(outpoint, islockHash); + if (found && islockHash.IsNull()) { + return nullptr; + } + + if (!found) { + found = db.Read(std::make_tuple(std::string("is_in"), outpoint), islockHash); + outpointCache.insert(outpoint, islockHash); + } + + if (!found) { + return nullptr; + } + return GetInstantSendLockByHash(islockHash); +} + +void CInstantSendDb::WriteLastChainLockBlock(const uint256& hash) +{ + db.Write(std::make_tuple(std::string("is_lcb")), hash); +} + +uint256 CInstantSendDb::GetLastChainLockBlock() +{ + uint256 hashBlock; + db.Read(std::make_tuple(std::string("is_lcb")), hashBlock); + return hashBlock; +} + +//////////////// + +CInstantSendManager::CInstantSendManager(CScheduler* _scheduler, CDBWrapper& _llmqDb) : + scheduler(_scheduler), + db(_llmqDb) { } @@ -287,14 +401,13 @@ void CInstantSendManager::TrySignInstantSendLock(const CTransaction& tx) LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: got all recovered sigs, creating CInstantSendLock\n", __func__, tx.GetHash().ToString()); - CInstantSendLockInfo islockInfo; - islockInfo.time = GetTimeMillis(); - islockInfo.islock.txid = tx.GetHash(); + CInstantSendLock islock; + islock.txid = tx.GetHash(); for (auto& in : tx.vin) { - islockInfo.islock.inputs.emplace_back(in.prevout); + islock.inputs.emplace_back(in.prevout); } - auto id = islockInfo.islock.GetRequestId(); + auto id = islock.GetRequestId(); if (quorumSigningManager->HasRecoveredSigForId(llmqType, id)) { return; @@ -302,7 +415,7 @@ void CInstantSendManager::TrySignInstantSendLock(const CTransaction& tx) { LOCK(cs); - auto e = creatingInstantSendLocks.emplace(id, islockInfo); + auto e = creatingInstantSendLocks.emplace(id, std::move(islock)); if (!e.second) { return; } @@ -314,7 +427,7 @@ void CInstantSendManager::TrySignInstantSendLock(const CTransaction& tx) void CInstantSendManager::HandleNewInstantSendLockRecoveredSig(const llmq::CRecoveredSig& recoveredSig) { - CInstantSendLockInfo islockInfo; + CInstantSendLock islock; { LOCK(cs); @@ -323,19 +436,19 @@ void CInstantSendManager::HandleNewInstantSendLockRecoveredSig(const llmq::CReco return; } - islockInfo = std::move(it->second); + islock = std::move(it->second); creatingInstantSendLocks.erase(it); - txToCreatingInstantSendLocks.erase(islockInfo.islock.txid); + txToCreatingInstantSendLocks.erase(islock.txid); } - if (islockInfo.islock.txid != recoveredSig.msgHash) { + if (islock.txid != recoveredSig.msgHash) { LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: islock conflicts with %s, dropping own version", __func__, - islockInfo.islock.txid.ToString(), recoveredSig.msgHash.ToString()); + islock.txid.ToString(), recoveredSig.msgHash.ToString()); return; } - islockInfo.islock.sig = recoveredSig.sig; - ProcessInstantSendLock(-1, ::SerializeHash(islockInfo.islock), islockInfo.islock); + islock.sig = recoveredSig.sig; + ProcessInstantSendLock(-1, ::SerializeHash(islock), islock); } void CInstantSendManager::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman) @@ -365,7 +478,10 @@ void CInstantSendManager::ProcessMessageInstantSendLock(CNode* pfrom, const llmq auto hash = ::SerializeHash(islock); LOCK(cs); - if (pendingInstantSendLocks.count(hash) || finalInstantSendLocks.count(hash)) { + if (db.GetInstantSendLockByHash(hash) != nullptr) { + return; + } + if (pendingInstantSendLocks.count(hash)) { return; } @@ -509,23 +625,20 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has g_connman->RemoveAskFor(hash); } - CInstantSendLockInfo islockInfo; - islockInfo.time = GetTimeMillis(); - islockInfo.islock = islock; - islockInfo.islockHash = hash; - + CTransactionRef tx; uint256 hashBlock; // we ignore failure here as we must be able to propagate the lock even if we don't have the TX locally - if (GetTransaction(islock.txid, islockInfo.tx, Params().GetConsensus(), hashBlock)) { + if (GetTransaction(islock.txid, tx, Params().GetConsensus(), hashBlock)) { if (!hashBlock.IsNull()) { + const CBlockIndex* pindexMined; { LOCK(cs_main); - islockInfo.pindexMined = mapBlockIndex.at(hashBlock); + pindexMined = mapBlockIndex.at(hashBlock); } // Let's see if the TX that was locked by this islock is already mined in a ChainLocked block. If yes, // we can simply ignore the islock, as the ChainLock implies locking of all TXs in that chain - if (llmq::chainLocksHandler->HasChainLock(islockInfo.pindexMined->nHeight, islockInfo.pindexMined->GetBlockHash())) { + if (llmq::chainLocksHandler->HasChainLock(pindexMined->nHeight, pindexMined->GetBlockHash())) { LogPrint("instantsend", "CInstantSendManager::%s -- txlock=%s, islock=%s: dropping islock as it already got a ChainLock in block %s, peer=%d\n", __func__, islock.txid.ToString(), hash.ToString(), hashBlock.ToString(), from); return; @@ -535,36 +648,29 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has { LOCK(cs); - auto e = finalInstantSendLocks.emplace(hash, islockInfo); - if (!e.second) { - return; - } - auto islockInfoPtr = &e.first->second; - - creatingInstantSendLocks.erase(islockInfoPtr->islock.GetRequestId()); - txToCreatingInstantSendLocks.erase(islockInfoPtr->islock.txid); LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, islock=%s: processsing islock, peer=%d\n", __func__, islock.txid.ToString(), hash.ToString(), from); - if (!txToInstantSendLock.emplace(islock.txid, islockInfoPtr).second) { - LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, islock=%s: duplicate islock, other islock=%s, peer=%d\n", __func__, - islock.txid.ToString(), hash.ToString(), txToInstantSendLock[islock.txid]->islockHash.ToString(), from); - txToInstantSendLock.erase(hash); + creatingInstantSendLocks.erase(islock.GetRequestId()); + txToCreatingInstantSendLocks.erase(islock.txid); + + CInstantSendLockPtr otherIsLock; + if (db.GetInstantSendLockByHash(hash)) { return; } - for (size_t i = 0; i < islock.inputs.size(); i++) { - auto& in = islock.inputs[i]; - if (!inputToInstantSendLock.emplace(in, islockInfoPtr).second) { + if (otherIsLock = db.GetInstantSendLockByTxid(islock.txid)) { + LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, islock=%s: duplicate islock, other islock=%s, peer=%d\n", __func__, + islock.txid.ToString(), hash.ToString(), ::SerializeHash(*otherIsLock).ToString(), from); + } + for (auto& in : islock.inputs) { + if (otherIsLock = db.GetInstantSendLockByInput(in)) { LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, islock=%s: conflicting input in islock. input=%s, other islock=%s, peer=%d\n", __func__, - islock.txid.ToString(), hash.ToString(), in.ToStringShort(), inputToInstantSendLock[in]->islockHash.ToString(), from); - txToInstantSendLock.erase(hash); - for (size_t j = 0; j < i; j++) { - inputToInstantSendLock.erase(islock.inputs[j]); - } - return; + islock.txid.ToString(), hash.ToString(), in.ToStringShort(), ::SerializeHash(*otherIsLock).ToString(), from); } } + + db.WriteNewInstantSendLock(hash, islock); } CInv inv(MSG_ISLOCK, hash); @@ -573,10 +679,10 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has RemoveMempoolConflictsForLock(hash, islock); RetryLockMempoolTxs(islock.txid); - UpdateWalletTransaction(islock.txid); + UpdateWalletTransaction(islock.txid, tx); } -void CInstantSendManager::UpdateWalletTransaction(const uint256& txid) +void CInstantSendManager::UpdateWalletTransaction(const uint256& txid, const CTransactionRef& tx) { #ifdef ENABLE_WALLET if (!pwalletMain) { @@ -595,16 +701,9 @@ void CInstantSendManager::UpdateWalletTransaction(const uint256& txid) } #endif - LOCK(cs); - auto it = txToInstantSendLock.find(txid); - if (it == txToInstantSendLock.end()) { - return; + if (tx) { + GetMainSignals().NotifyTransactionLock(*tx); } - if (it->second->tx == nullptr) { - return; - } - - GetMainSignals().NotifyTransactionLock(*it->second->tx); } void CInstantSendManager::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int posInBlock) @@ -613,24 +712,6 @@ void CInstantSendManager::SyncTransaction(const CTransaction& tx, const CBlockIn return; } - { - LOCK(cs); - auto it = txToInstantSendLock.find(tx.GetHash()); - if (it == txToInstantSendLock.end()) { - return; - } - auto islockInfo = it->second; - if (islockInfo->tx == nullptr) { - islockInfo->tx = MakeTransactionRef(tx); - } - - if (posInBlock == CMainSignals::SYNC_TRANSACTION_NOT_IN_BLOCK) { - UpdateISLockMinedBlock(islockInfo, nullptr); - return; - } - UpdateISLockMinedBlock(islockInfo, pindex); - } - if (IsLocked(tx.GetHash())) { RetryLockMempoolTxs(tx.GetHash()); } @@ -638,80 +719,58 @@ void CInstantSendManager::SyncTransaction(const CTransaction& tx, const CBlockIn void CInstantSendManager::NotifyChainLock(const CBlockIndex* pindex) { + uint256 lastChainLockBlock; { LOCK(cs); + db.GetLastChainLockBlock(); + } - // Let's find all islocks that correspond to TXs which are part of the freshly ChainLocked chain and then delete - // the islocks. We do this because the ChainLocks imply locking and thus it's not needed to further track - // or propagate the islocks - std::unordered_set toDelete; - while (pindex && pindex != pindexLastChainLock) { - auto its = blockToInstantSendLocks.equal_range(pindex->GetBlockHash()); - while (its.first != its.second) { - auto islockInfo = its.first->second; - LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, islock=%s: removing islock as it got ChainLocked in block %s\n", __func__, - islockInfo->islock.txid.ToString(), islockInfo->islockHash.ToString(), pindex->GetBlockHash().ToString()); - toDelete.emplace(its.first->second->islockHash); - ++its.first; + // Let's find all islocks that correspond to TXs which are part of the freshly ChainLocked chain and then delete + // the islocks. We do this because the ChainLocks imply locking and thus it's not needed to further track + // or propagate the islocks + std::unordered_set toDelete; + while (pindex && pindex->GetBlockHash() != lastChainLockBlock) { + CBlock block; + { + if (!ReadBlockFromDisk(block, pindex, Params().GetConsensus())) { + pindex = pindex->pprev; + continue; } + } - pindex = pindex->pprev; + LOCK(cs); + for (const auto& tx : block.vtx) { + auto islock = db.GetInstantSendLockByTxid(tx->GetHash()); + if (!islock) { + continue; + } + auto hash = ::SerializeHash(*islock); + LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, islock=%s: removing islock as it got ChainLocked in block %s\n", __func__, + islock->txid.ToString(), hash.ToString(), pindex->GetBlockHash().ToString()); + RemoveFinalISLock(hash, islock); } - pindexLastChainLock = pindex; + pindex = pindex->pprev; + } - for (auto& islockHash : toDelete) { - RemoveFinalISLock(islockHash); - } + { + LOCK(cs); + db.WriteLastChainLockBlock(pindex ? pindex->GetBlockHash() : uint256()); } RetryLockMempoolTxs(uint256()); } -void CInstantSendManager::UpdateISLockMinedBlock(llmq::CInstantSendLockInfo* islockInfo, const CBlockIndex* pindex) +void CInstantSendManager::RemoveFinalISLock(const uint256& hash, const CInstantSendLockPtr& islock) { AssertLockHeld(cs); - - if (islockInfo->pindexMined == pindex) { - return; - } - - if (islockInfo->pindexMined) { - auto its = blockToInstantSendLocks.equal_range(islockInfo->pindexMined->GetBlockHash()); - while (its.first != its.second) { - if (its.first->second == islockInfo) { - its.first = blockToInstantSendLocks.erase(its.first); - } else { - ++its.first; - } - } - } - - if (pindex) { - blockToInstantSendLocks.emplace(pindex->GetBlockHash(), islockInfo); - } - - islockInfo->pindexMined = pindex; -} -void CInstantSendManager::RemoveFinalISLock(const uint256& hash) -{ - AssertLockHeld(cs); + db.RemoveInstantSendLock(hash, islock); - auto it = finalInstantSendLocks.find(hash); - if (it == finalInstantSendLocks.end()) { - return; - } - auto& islockInfo = it->second; - - txToInstantSendLock.erase(islockInfo.islock.txid); - for (auto& in : islockInfo.islock.inputs) { + for (auto& in : islock->inputs) { auto inputRequestId = ::SerializeHash(std::make_pair(INPUTLOCK_REQUESTID_PREFIX, in)); inputRequestIds.erase(inputRequestId); - inputToInstantSendLock.erase(in); } - UpdateISLockMinedBlock(&islockInfo, nullptr); - finalInstantSendLocks.erase(it); } void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock) @@ -798,7 +857,7 @@ bool CInstantSendManager::AlreadyHave(const CInv& inv) } LOCK(cs); - return finalInstantSendLocks.count(inv.hash) != 0 || pendingInstantSendLocks.count(inv.hash) != 0; + return db.GetInstantSendLockByHash(inv.hash) != nullptr || pendingInstantSendLocks.count(inv.hash) != 0; } bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, llmq::CInstantSendLock& ret) @@ -808,11 +867,11 @@ bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, llmq::CI } LOCK(cs); - auto it = finalInstantSendLocks.find(hash); - if (it == finalInstantSendLocks.end()) { + auto islock = db.GetInstantSendLockByHash(hash); + if (!islock) { return false; } - ret = it->second.islock; + ret = *islock; return true; } @@ -823,7 +882,7 @@ bool CInstantSendManager::IsLocked(const uint256& txHash) } LOCK(cs); - return txToInstantSendLock.count(txHash) != 0; + return db.GetInstantSendLockByTxid(txHash) != nullptr; } bool CInstantSendManager::IsConflicted(const CTransaction& tx) @@ -841,13 +900,13 @@ bool CInstantSendManager::GetConflictingTx(const CTransaction& tx, uint256& retC LOCK(cs); for (const auto& in : tx.vin) { - auto it = inputToInstantSendLock.find(in.prevout); - if (it == inputToInstantSendLock.end()) { + auto otherIsLock = db.GetInstantSendLockByInput(in.prevout); + if (!otherIsLock) { continue; } - if (it->second->islock.txid != tx.GetHash()) { - retConflictTxHash = it->second->islock.txid; + if (otherIsLock->txid != tx.GetHash()) { + retConflictTxHash = otherIsLock->txid; return true; } } diff --git a/src/llmq/quorums_instantsend.h b/src/llmq/quorums_instantsend.h index 91fd2be8db791..8eb763b50fc36 100644 --- a/src/llmq/quorums_instantsend.h +++ b/src/llmq/quorums_instantsend.h @@ -8,6 +8,7 @@ #include "quorums_signing.h" #include "coins.h" +#include "unordered_lru_cache.h" #include "primitives/transaction.h" #include @@ -39,19 +40,29 @@ class CInstantSendLock uint256 GetRequestId() const; }; -class CInstantSendLockInfo +typedef std::shared_ptr CInstantSendLockPtr; + +class CInstantSendDb { +private: + CDBWrapper& db; + + unordered_lru_cache islockCache; + unordered_lru_cache txidCache; + unordered_lru_cache outpointCache; + public: - // might be nullptr when islock is received before the TX itself - CTransactionRef tx; - CInstantSendLock islock; - // only valid when recovered sig was received - uint256 islockHash; - // time when it was created/received - int64_t time; - - // might be null initially (when TX was not mined yet) and will later be filled by SyncTransaction - const CBlockIndex* pindexMined{nullptr}; + CInstantSendDb(CDBWrapper& _db) : db(_db) {} + + void WriteNewInstantSendLock(const uint256& hash, const CInstantSendLock& islock); + void RemoveInstantSendLock(const uint256& hash, CInstantSendLockPtr islock); + + CInstantSendLockPtr GetInstantSendLockByHash(const uint256& hash); + CInstantSendLockPtr GetInstantSendLockByTxid(const uint256& txid); + CInstantSendLockPtr GetInstantSendLockByInput(const COutPoint& outpoint); + + void WriteLastChainLockBlock(const uint256& hashBlock); + uint256 GetLastChainLockBlock(); }; class CInstantSendManager : public CRecoveredSigsListener @@ -59,6 +70,7 @@ class CInstantSendManager : public CRecoveredSigsListener private: CCriticalSection cs; CScheduler* scheduler; + CInstantSendDb db; /** * Request ids of inputs that we signed. Used to determine if a recovered signature belongs to an @@ -71,27 +83,16 @@ class CInstantSendManager : public CRecoveredSigsListener * recovered signatures for all inputs of a TX. At the same time, we initiate signing of our sigshare for the islock. * When the recovered sig for the islock later arrives, we can finish the islock and propagate it. */ - std::unordered_map creatingInstantSendLocks; + std::unordered_map creatingInstantSendLocks; // maps from txid to the in-progress islock - std::unordered_map txToCreatingInstantSendLocks; - - /** - * These are the final islocks, indexed by their own hash. The other maps are used to get from TXs, inputs and blocks - * to islocks. - */ - std::unordered_map finalInstantSendLocks; - std::unordered_map txToInstantSendLock; - std::unordered_map inputToInstantSendLock; - std::unordered_multimap blockToInstantSendLocks; - - const CBlockIndex* pindexLastChainLock{nullptr}; + std::unordered_map txToCreatingInstantSendLocks; // Incoming and not verified yet std::unordered_map> pendingInstantSendLocks; bool hasScheduledProcessPending{false}; public: - CInstantSendManager(CScheduler* _scheduler); + CInstantSendManager(CScheduler* _scheduler, CDBWrapper& _llmqDb); ~CInstantSendManager(); void RegisterAsRecoveredSigsListener(); @@ -116,12 +117,11 @@ class CInstantSendManager : public CRecoveredSigsListener bool PreVerifyInstantSendLock(NodeId nodeId, const CInstantSendLock& islock, bool& retBan); void ProcessPendingInstantSendLocks(); void ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLock& islock); - void UpdateWalletTransaction(const uint256& txid); + void UpdateWalletTransaction(const uint256& txid, const CTransactionRef& tx); void SyncTransaction(const CTransaction &tx, const CBlockIndex *pindex, int posInBlock); void NotifyChainLock(const CBlockIndex* pindex); - void UpdateISLockMinedBlock(CInstantSendLockInfo* islockInfo, const CBlockIndex* pindex); - void RemoveFinalISLock(const uint256& hash); + void RemoveFinalISLock(const uint256& hash, const CInstantSendLockPtr& islock); void RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock); void RetryLockMempoolTxs(const uint256& lockedParentTx);