Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dash llmq backports #2921

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
198 changes: 136 additions & 62 deletions src/llmq/quorums_signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,46 @@ bool CRecoveredSigsDb::HasRecoveredSig(Consensus::LLMQType llmqType, const uint2

bool CRecoveredSigsDb::HasRecoveredSigForId(Consensus::LLMQType llmqType, const uint256& id)
{
int64_t t = GetTimeMillis();

auto cacheKey = std::make_pair(llmqType, id);
{
LOCK(cs);
auto it = hasSigForIdCache.find(cacheKey);
if (it != hasSigForIdCache.end()) {
it->second.second = t;
return it->second.first;
}
}


auto k = std::make_tuple('r', (uint8_t)llmqType, id);
return db.Exists(k);
bool ret = db.Exists(k);

LOCK(cs);
hasSigForIdCache.emplace(cacheKey, std::make_pair(ret, t));
return ret;
}

bool CRecoveredSigsDb::HasRecoveredSigForSession(const uint256& signHash)
{
int64_t t = GetTimeMillis();

{
LOCK(cs);
auto it = hasSigForSessionCache.find(signHash);
if (it != hasSigForSessionCache.end()) {
it->second.second = t;
return it->second.first;
}
}

auto k = std::make_tuple('s', signHash);
return db.Exists(k);
bool ret = db.Exists(k);

LOCK(cs);
hasSigForSessionCache.emplace(signHash, std::make_pair(ret, t));
return ret;
}

bool CRecoveredSigsDb::HasRecoveredSigForHash(const uint256& hash)
Expand Down Expand Up @@ -100,7 +132,8 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig)
batch.Write(k3, std::make_pair(recSig.llmqType, recSig.id));

// store by signHash
auto k4 = std::make_tuple('s', llmq::utils::BuildSignHash(recSig));
auto signHash = llmq::utils::BuildSignHash(recSig);
auto k4 = std::make_tuple('s', signHash);
batch.Write(k4, (uint8_t)1);

// remove the votedForId entry as we won't need it anymore
Expand All @@ -112,14 +145,48 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig)
batch.Write(k6, (uint8_t)1);

db.WriteBatch(batch);

{
int64_t t = GetTimeMillis();

LOCK(cs);
hasSigForIdCache[std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id)] = std::make_pair(true, t);
hasSigForSessionCache[signHash] = std::make_pair(true, t);
}
}

template<typename K>
static void TruncateCacheMap(std::unordered_map<K, std::pair<bool, int64_t>>& m, size_t maxSize, size_t truncateThreshold)
{
typedef typename std::unordered_map<K, std::pair<bool, int64_t>> Map;
typedef typename Map::iterator Iterator;

if (m.size() <= truncateThreshold) {
return;
}

std::vector<Iterator> vec;
vec.reserve(m.size());
for (auto it = m.begin(); it != m.end(); ++it) {
vec.emplace_back(it);
}
// sort by last access time (descending order)
std::sort(vec.begin(), vec.end(), [](const Iterator& it1, const Iterator& it2) {
return it1->second.second > it2->second.second;
});

for (size_t i = maxSize; i < vec.size(); i++) {
m.erase(vec[i]);
}
}

void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge)
{
std::unique_ptr<CDBIterator> pcursor(db.NewIterator());

static const uint256 maxUint256 = uint256S("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff");
auto start = std::make_tuple('t', (uint32_t)0, (uint8_t)0, uint256());
auto end = std::make_tuple('t', (uint32_t)(GetAdjustedTime() - maxAge), (uint8_t)0, uint256());
auto end = std::make_tuple('t', (uint32_t)(GetAdjustedTime() - maxAge), (uint8_t)255, maxUint256);
pcursor->Seek(start);

std::vector<std::pair<Consensus::LLMQType, uint256>> toDelete;
Expand Down Expand Up @@ -147,22 +214,33 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge)
}

CDBBatch batch(CLIENT_VERSION | ADDRV2_FORMAT);
for (auto& e : toDelete) {
CRecoveredSig recSig;
if (!ReadRecoveredSig(e.first, e.second, recSig)) {
continue;
{
LOCK(cs);
for (auto& e : toDelete) {
CRecoveredSig recSig;
if (!ReadRecoveredSig(e.first, e.second, recSig)) {
continue;
}

auto signHash = llmq::utils::BuildSignHash(recSig);

auto k1 = std::make_tuple('r', recSig.llmqType, recSig.id);
auto k2 = std::make_tuple('r', recSig.llmqType, recSig.id, recSig.msgHash);
auto k3 = std::make_tuple('h', recSig.GetHash());
auto k4 = std::make_tuple('s', signHash);
auto k5 = std::make_tuple('v', recSig.llmqType, recSig.id);
batch.Erase(k1);
batch.Erase(k2);
batch.Erase(k3);
batch.Erase(k4);
batch.Erase(k5);

hasSigForIdCache.erase(std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id));
hasSigForSessionCache.erase(signHash);
}

auto k1 = std::make_tuple('r', recSig.llmqType, recSig.id);
auto k2 = std::make_tuple('r', recSig.llmqType, recSig.id, recSig.msgHash);
auto k3 = std::make_tuple('h', recSig.GetHash());
auto k4 = std::make_tuple('s', llmq::utils::BuildSignHash(recSig));
auto k5 = std::make_tuple('v', recSig.llmqType, recSig.id);
batch.Erase(k1);
batch.Erase(k2);
batch.Erase(k3);
batch.Erase(k4);
batch.Erase(k5);
TruncateCacheMap(hasSigForIdCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
TruncateCacheMap(hasSigForSessionCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
}

for (auto& e : toDelete2) {
Expand Down Expand Up @@ -256,19 +334,15 @@ bool CSigningManager::PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig&
return false;
}

CQuorumCPtr quorum;
{
LOCK(cs_main);
CQuorumCPtr quorum = quorumManager->GetQuorum(llmqType, recoveredSig.quorumHash);

quorum = quorumManager->GetQuorum(llmqType, recoveredSig.quorumHash);
if (!quorum) {
LogPrintf("CSigningManager::%s -- quorum %s not found, node=%d\n", __func__,
recoveredSig.quorumHash.ToString(), nodeId);
return false;
}
if (!llmq::utils::IsQuorumActive(llmqType, quorum->pindexQuorum->GetBlockHash())) {
return false;
}
if (!quorum) {
LogPrintf("CSigningManager::%s -- quorum %s not found, node=%d\n", __func__,
recoveredSig.quorumHash.ToString(), nodeId);
return false;
}
if (!llmq::utils::IsQuorumActive(llmqType, quorum->pindexQuorum->GetBlockHash())) {
return false;
}

if (!recoveredSig.sig.IsValid()) {
Expand Down Expand Up @@ -311,49 +385,46 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify(
}
}

{
LOCK(cs_main);
for (auto& p : retSigShares) {
NodeId nodeId = p.first;
auto& v = p.second;

for (auto it = v.begin(); it != v.end();) {
auto& recSig = *it;

Consensus::LLMQType llmqType = (Consensus::LLMQType)recSig.llmqType;
auto quorumKey = std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.quorumHash);
if (!retQuorums.count(quorumKey)) {
CQuorumCPtr quorum = quorumManager->GetQuorum(llmqType, recSig.quorumHash);
if (!quorum) {
LogPrintf("CSigningManager::%s -- quorum %s not found, node=%d\n", __func__,
recSig.quorumHash.ToString(), nodeId);
it = v.erase(it);
continue;
}
if (!llmq::utils::IsQuorumActive(llmqType, quorum->pindexQuorum->GetBlockHash())) {
LogPrintf("CSigningManager::%s -- quorum %s not active anymore, node=%d\n", __func__,
recSig.quorumHash.ToString(), nodeId);
it = v.erase(it);
continue;
}

retQuorums.emplace(quorumKey, quorum);
for (auto& p : retSigShares) {
NodeId nodeId = p.first;
auto& v = p.second;

for (auto it = v.begin(); it != v.end();) {
auto& recSig = *it;

Consensus::LLMQType llmqType = (Consensus::LLMQType)recSig.llmqType;
auto quorumKey = std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.quorumHash);
if (!retQuorums.count(quorumKey)) {
CQuorumCPtr quorum = quorumManager->GetQuorum(llmqType, recSig.quorumHash);
if (!quorum) {
LogPrintf("CSigningManager::%s -- quorum %s not found, node=%d\n", __func__,
recSig.quorumHash.ToString(), nodeId);
it = v.erase(it);
continue;
}
if (!llmq::utils::IsQuorumActive(llmqType, quorum->pindexQuorum->GetBlockHash())) {
LogPrintf("CSigningManager::%s -- quorum %s not active anymore, node=%d\n", __func__,
recSig.quorumHash.ToString(), nodeId);
it = v.erase(it);
continue;
}

++it;
retQuorums.emplace(quorumKey, quorum);
}

++it;
}
}
}

void CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
{
std::map<NodeId, std::list<CRecoveredSig>> recSigsByNode;
std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr> quorums;

CollectPendingRecoveredSigsToVerify(32, recSigsByNode, quorums);
if (recSigsByNode.empty()) {
return;
return false;
}

// It's ok to perform insecure batched verification here as we verify against the quorum public keys, which are not
Expand Down Expand Up @@ -399,6 +470,8 @@ void CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
ProcessRecoveredSig(nodeId, recSig, quorum, connman);
}
}

return true;
}

// signature must be verified already
Expand Down Expand Up @@ -557,10 +630,11 @@ CQuorumCPtr CSigningManager::SelectQuorumForSigning(Consensus::LLMQType llmqType
CBlockIndex* pindexStart;
{
LOCK(cs_main);
if (signHeight > chainActive.Height()) {
int startBlockHeight = signHeight - SIGN_HEIGHT_OFFSET;
if (startBlockHeight > chainActive.Height()) {
return nullptr;
}
pindexStart = chainActive[signHeight - SIGN_HEIGHT_OFFSET];
pindexStart = chainActive[startBlockHeight];
}

auto quorums = quorumManager->ScanQuorums(llmqType, pindexStart, poolSize);
Expand Down
22 changes: 21 additions & 1 deletion src/llmq/quorums_signing.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@
#include "net.h"
#include "sync.h"

#include <unordered_map>

namespace std {
template <>
struct hash<std::pair<Consensus::LLMQType, uint256>>
{
std::size_t operator()(const std::pair<Consensus::LLMQType, uint256>& k) const
{
return (std::size_t)((k.first + 1) * k.second.GetCheapHash());
}
};
}

namespace llmq
{

Expand Down Expand Up @@ -67,9 +80,16 @@ class CRecoveredSig
// TODO implement caching to speed things up
class CRecoveredSigsDb
{
static const size_t MAX_CACHE_SIZE = 30000;
static const size_t MAX_CACHE_TRUNCATE_THRESHOLD = 50000;

private:
CDBWrapper db;

RecursiveMutex cs;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, std::pair<bool, int64_t>> hasSigForIdCache;
std::unordered_map<uint256, std::pair<bool, int64_t>> hasSigForSessionCache;

public:
CRecoveredSigsDb(bool fMemory);

Expand Down Expand Up @@ -138,7 +158,7 @@ class CSigningManager
bool PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, bool& retBan);

void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, std::map<NodeId, std::list<CRecoveredSig>>& retSigShares, std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums);
void ProcessPendingRecoveredSigs(CConnman& connman); // called from the worker thread of CSigSharesManager
bool ProcessPendingRecoveredSigs(CConnman& connman); // called from the worker thread of CSigSharesManager
void ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, const CQuorumCPtr& quorum, CConnman& connman);
void Cleanup(); // called from the worker thread of CSigSharesManager

Expand Down
Loading
Loading