diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 36315de564f0f..9652d2d59ca3c 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -26,10 +26,14 @@ void CSigShare::UpdateKey() key.second = quorumMember; } +std::string CSigSesAnn::ToString() const +{ + return strprintf("sessionId=%d, llmqType=%d, quorumHash=%s, id=%s, msgHash=%s", + sessionId, llmqType, quorumHash.ToString(), id.ToString(), msgHash.ToString()); +} + void CSigSharesInv::Merge(const CSigSharesInv& inv2) { - assert(llmqType == inv2.llmqType); - assert(signHash == inv2.signHash); for (size_t i = 0; i < inv.size(); i++) { if (inv2.inv[i]) { inv[i] = inv2.inv[i]; @@ -44,7 +48,7 @@ size_t CSigSharesInv::CountSet() const std::string CSigSharesInv::ToString() const { - std::string str = strprintf("signHash=%s, inv=(", signHash.ToString()); + std::string str = "("; bool first = true; for (size_t i = 0; i < inv.size(); i++) { if (!inv[i]) { @@ -61,11 +65,8 @@ std::string CSigSharesInv::ToString() const return str; } -void CSigSharesInv::Init(Consensus::LLMQType _llmqType, const uint256& _signHash) +void CSigSharesInv::Init(Consensus::LLMQType _llmqType) { - llmqType = _llmqType; - signHash = _signHash; - size_t llmqSize = (size_t)(Params().GetConsensus().llmqs.at(_llmqType).size); inv.resize(llmqSize, false); } @@ -82,68 +83,94 @@ void CSigSharesInv::Set(uint16_t quorumMember, bool v) inv[quorumMember] = v; } -CSigSharesNodeState::Session& CSigSharesNodeState::GetOrCreateSession(Consensus::LLMQType llmqType, const uint256& signHash) +CSigSharesInv CBatchedSigShares::ToInv(Consensus::LLMQType llmqType) const { - auto& s = sessions[signHash]; - if (s.announced.inv.empty()) { - s.announced.Init(llmqType, signHash); - s.requested.Init(llmqType, signHash); - s.knows.Init(llmqType, signHash); - } else { - assert(s.announced.llmqType == llmqType); - assert(s.requested.llmqType == llmqType); - assert(s.knows.llmqType == llmqType); + CSigSharesInv inv; + inv.Init(llmqType); + for (size_t i = 0; i < sigShares.size(); i++) { + inv.inv[sigShares[i].first] = true; } - return s; + return inv; } -void CSigSharesNodeState::MarkAnnounced(const uint256& signHash, const CSigSharesInv& inv) +template +static void InitSession(CSigSharesNodeState::Session& s, const uint256& signHash, T& from) { - GetOrCreateSession((Consensus::LLMQType)inv.llmqType, signHash).announced.Merge(inv); + s.llmqType = (Consensus::LLMQType)from.llmqType; + s.quorumHash = from.quorumHash; + s.id = from.id; + s.msgHash = from.msgHash; + s.signHash = signHash; + s.announced.Init((Consensus::LLMQType)from.llmqType); + s.requested.Init((Consensus::LLMQType)from.llmqType); + s.knows.Init((Consensus::LLMQType)from.llmqType); } -void CSigSharesNodeState::MarkRequested(const uint256& signHash, const CSigSharesInv& inv) +CSigSharesNodeState::Session& CSigSharesNodeState::GetOrCreateSessionFromShare(const llmq::CSigShare& sigShare) { - GetOrCreateSession((Consensus::LLMQType)inv.llmqType, signHash).requested.Merge(inv); + auto& s = sessions[sigShare.GetSignHash()]; + if (s.announced.inv.empty()) { + InitSession(s, sigShare.GetSignHash(), sigShare); + } + return s; } -void CSigSharesNodeState::MarkKnows(const uint256& signHash, const CSigSharesInv& inv) +CSigSharesNodeState::Session& CSigSharesNodeState::GetOrCreateSessionFromAnn(const llmq::CSigSesAnn& ann) { - GetOrCreateSession((Consensus::LLMQType)inv.llmqType, signHash).knows.Merge(inv); + auto signHash = CLLMQUtils::BuildSignHash((Consensus::LLMQType)ann.llmqType, ann.quorumHash, ann.id, ann.msgHash); + auto& s = sessions[signHash]; + if (s.announced.inv.empty()) { + InitSession(s, signHash, ann); + } + return s; } -void CSigSharesNodeState::MarkAnnounced(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember) +CSigSharesNodeState::Session* CSigSharesNodeState::GetSessionBySignHash(const uint256& signHash) { - GetOrCreateSession(llmqType, signHash).announced.Set(quorumMember, true); + auto it = sessions.find(signHash); + if (it == sessions.end()) { + return nullptr; + } + return &it->second; } -void CSigSharesNodeState::MarkRequested(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember) +CSigSharesNodeState::Session* CSigSharesNodeState::GetSessionByRecvId(uint32_t sessionId) { - GetOrCreateSession(llmqType, signHash).requested.Set(quorumMember, true); + auto it = sessionByRecvId.find(sessionId); + if (it == sessionByRecvId.end()) { + return nullptr; + } + return it->second; } -void CSigSharesNodeState::MarkKnows(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember) +bool CSigSharesNodeState::GetSessionInfoByRecvId(uint32_t sessionId, SessionInfo& retInfo) { - GetOrCreateSession(llmqType, signHash).knows.Set(quorumMember, true); + auto s = GetSessionByRecvId(sessionId); + if (!s) { + return false; + } + retInfo.recvSessionId = sessionId; + retInfo.llmqType = s->llmqType; + retInfo.quorumHash = s->quorumHash; + retInfo.id = s->id; + retInfo.msgHash = s->msgHash; + retInfo.signHash = s->signHash; + retInfo.quorum = s->quorum; + + return true; } void CSigSharesNodeState::RemoveSession(const uint256& signHash) { - sessions.erase(signHash); + auto it = sessions.find(signHash); + if (it != sessions.end()) { + sessionByRecvId.erase(it->second.recvSessionId); + sessions.erase(it); + } requestedSigShares.EraseAllForSignHash(signHash); pendingIncomingSigShares.EraseAllForSignHash(signHash); } -CSigSharesInv CBatchedSigShares::ToInv() const -{ - CSigSharesInv inv; - inv.Init((Consensus::LLMQType)llmqType, CLLMQUtils::BuildSignHash(*this)); - for (size_t i = 0; i < sigShares.size(); i++) { - inv.inv[sigShares[i].first] = true; - } - return inv; -} - ////////////////////// CSigSharesManager::CSigSharesManager() @@ -201,7 +228,11 @@ void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strComma return; } - if (strCommand == NetMsgType::QSIGSHARESINV) { + if (strCommand == NetMsgType::QSIGSESANN) { + CSigSesAnn ann; + vRecv >> ann; + ProcessMessageSigSesAnn(pfrom, ann, connman); + } else if (strCommand == NetMsgType::QSIGSHARESINV) { CSigSharesInv inv; vRecv >> inv; ProcessMessageSigSharesInv(pfrom, inv, connman); @@ -216,14 +247,42 @@ void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strComma } } -bool CSigSharesManager::VerifySigSharesInv(NodeId from, const CSigSharesInv& inv) +void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman) { - Consensus::LLMQType llmqType = (Consensus::LLMQType)inv.llmqType; - if (!Params().GetConsensus().llmqs.count(llmqType) || inv.signHash.IsNull()) { - BanNode(from); - return false; + auto llmqType = (Consensus::LLMQType)ann.llmqType; + if (!Params().GetConsensus().llmqs.count(llmqType)) { + BanNode(pfrom->id); + return; } + if (ann.sessionId == (uint32_t)-1 || ann.quorumHash.IsNull() || ann.id.IsNull() || ann.msgHash.IsNull()) { + BanNode(pfrom->id); + return; + } + + LogPrint("llmq", "CSigSharesManager::%s -- ann={%s}, node=%d\n", __func__, ann.ToString(), pfrom->id); + + auto quorum = quorumManager->GetQuorum(llmqType, ann.quorumHash); + if (!quorum) { + // TODO should we ban here? + LogPrintf("CSigSharesManager::%s -- quorum %s not found, node=%d\n", __func__, + ann.quorumHash.ToString(), pfrom->id); + return; + } + + auto signHash = CLLMQUtils::BuildSignHash(llmqType, ann.quorumHash, ann.id, ann.msgHash); + LOCK(cs); + auto& nodeState = nodeStates[pfrom->id]; + auto& session = nodeState.GetOrCreateSessionFromAnn(ann); + nodeState.sessionByRecvId.erase(session.recvSessionId); + nodeState.sessionByRecvId.erase(ann.sessionId); + session.recvSessionId = ann.sessionId; + session.quorum = quorum; + nodeState.sessionByRecvId.emplace(ann.sessionId, &session); +} + +bool CSigSharesManager::VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv) +{ if (!fMasternodeMode || activeMasternodeInfo.proTxHash.IsNull()) { return false; } @@ -239,46 +298,71 @@ bool CSigSharesManager::VerifySigSharesInv(NodeId from, const CSigSharesInv& inv void CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) { - if (!VerifySigSharesInv(pfrom->id, inv)) { + CSigSharesNodeState::SessionInfo sessionInfo; + if (!GetSessionInfoByRecvId(pfrom->id, inv.sessionId, sessionInfo)) { + return; + } + + if (!VerifySigSharesInv(pfrom->id, sessionInfo.llmqType, inv)) { return; } // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig - if (quorumSigningManager->HasRecoveredSigForSession(inv.signHash)) { + if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) { return; } - LogPrint("llmq", "CSigSharesManager::%s -- inv={%s}, node=%d\n", __func__, inv.ToString(), pfrom->id); + LogPrint("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, + sessionInfo.signHash.ToString(), inv.ToString(), pfrom->id); LOCK(cs); auto& nodeState = nodeStates[pfrom->id]; - nodeState.MarkAnnounced(inv.signHash, inv); - nodeState.MarkKnows(inv.signHash, inv); + auto session = nodeState.GetSessionByRecvId(inv.sessionId); + if (!session) { + return; + } + session->announced.Merge(inv); + session->knows.Merge(inv); } void CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) { - if (!VerifySigSharesInv(pfrom->id, inv)) { + CSigSharesNodeState::SessionInfo sessionInfo; + if (!GetSessionInfoByRecvId(pfrom->id, inv.sessionId, sessionInfo)) { + return; + } + + if (!VerifySigSharesInv(pfrom->id, sessionInfo.llmqType, inv)) { return; } // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig - if (quorumSigningManager->HasRecoveredSigForSession(inv.signHash)) { + if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) { return; } - LogPrint("llmq", "CSigSharesManager::%s -- inv={%s}, node=%d\n", __func__, inv.ToString(), pfrom->id); + LogPrint("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, + sessionInfo.signHash.ToString(), inv.ToString(), pfrom->id); LOCK(cs); auto& nodeState = nodeStates[pfrom->id]; - nodeState.MarkRequested(inv.signHash, inv); - nodeState.MarkKnows(inv.signHash, inv); + auto session = nodeState.GetSessionByRecvId(inv.sessionId); + if (!session) { + return; + } + session->requested.Merge(inv); + session->knows.Merge(inv); } void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman) { + CSigSharesNodeState::SessionInfo sessionInfo; + if (!GetSessionInfoByRecvId(pfrom->id, batchedSigShares.sessionId, sessionInfo)) { + return; + } + bool ban = false; - if (!PreVerifyBatchedSigShares(pfrom->id, batchedSigShares, ban)) { + if (!PreVerifyBatchedSigShares(pfrom->id, sessionInfo, batchedSigShares, ban)) { if (ban) { BanNode(pfrom->id); return; @@ -294,7 +378,7 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc auto& nodeState = nodeStates[pfrom->id]; for (size_t i = 0; i < batchedSigShares.sigShares.size(); i++) { - CSigShare sigShare = batchedSigShares.RebuildSigShare(i); + CSigShare sigShare = RebuildSigShare(sessionInfo, batchedSigShares, i); nodeState.requestedSigShares.Erase(sigShare.GetKey()); // TODO track invalid sig shares received for PoSe? @@ -314,8 +398,8 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc } } - LogPrint("llmq", "CSigSharesManager::%s -- shares=%d, new=%d, inv={%s}, node=%d\n", __func__, - batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInv().ToString(), pfrom->id); + LogPrint("llmq", "CSigSharesManager::%s -- signHash=%s, shares=%d, new=%d, inv={%s}, node=%d\n", __func__, + sessionInfo.signHash.ToString(), batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInv(sessionInfo.llmqType).ToString(), pfrom->id); if (sigShares.empty()) { return; @@ -328,35 +412,22 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc } } -bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedSigShares& batchedSigShares, bool& retBan) +bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan) { retBan = false; - auto llmqType = (Consensus::LLMQType)batchedSigShares.llmqType; - if (!Params().GetConsensus().llmqs.count(llmqType)) { - retBan = true; - return false; - } - - CQuorumCPtr quorum = quorumManager->GetQuorum(llmqType, batchedSigShares.quorumHash); - if (!quorum) { - // TODO should we ban here? - LogPrintf("CSigSharesManager::%s -- quorum %s not found, node=%d\n", __func__, - batchedSigShares.quorumHash.ToString(), nodeId); - return false; - } - if (!CLLMQUtils::IsQuorumActive(llmqType, quorum->quorumHash)) { + if (!CLLMQUtils::IsQuorumActive(session.llmqType, session.quorum->quorumHash)) { // quorum is too old return false; } - if (!quorum->IsMember(activeMasternodeInfo.proTxHash)) { + if (!session.quorum->IsMember(activeMasternodeInfo.proTxHash)) { // we're not a member so we can't verify it (we actually shouldn't have received it) return false; } - if (quorum->quorumVvec == nullptr) { + if (session.quorum->quorumVvec == nullptr) { // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG LogPrintf("CSigSharesManager::%s -- we don't have the quorum vvec for %s, no verification possible. node=%d\n", __func__, - batchedSigShares.quorumHash.ToString(), nodeId); + session.quorumHash.ToString(), nodeId); return false; } @@ -369,12 +440,12 @@ bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedS return false; } - if (quorumMember >= quorum->members.size()) { + if (quorumMember >= session.quorum->members.size()) { LogPrintf("CSigSharesManager::%s -- quorumMember out of bounds\n", __func__); retBan = true; return false; } - if (!quorum->validMembers[quorumMember]) { + if (!session.quorum->validMembers[quorumMember]) { LogPrintf("CSigSharesManager::%s -- quorumMember not valid\n", __func__); retBan = true; return false; @@ -589,8 +660,10 @@ void CSigSharesManager::ProcessSigShare(NodeId nodeId, const CSigShare& sigShare if (!quorumNodes.count(p.first) && !p.second.interestedIn.count(std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash))) { continue; } - p.second.MarkRequested((Consensus::LLMQType)sigShare.llmqType, sigShare.GetSignHash(), sigShare.quorumMember); - p.second.MarkKnows((Consensus::LLMQType)sigShare.llmqType, sigShare.GetSignHash(), sigShare.quorumMember); + auto& session = p.second.GetOrCreateSessionFromShare(sigShare); + session.quorum = quorum; + session.requested.Set(sigShare.quorumMember, true); + session.knows.Set(sigShare.quorumMember, true); } } @@ -753,7 +826,7 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_mapllmqType; - batchedSigShares.quorumHash = sigShare->quorumHash; - batchedSigShares.id = sigShare->id; - batchedSigShares.msgHash = sigShare->msgHash; - } batchedSigShares.sigShares.emplace_back((uint16_t)i, sigShare->sigShare); } @@ -861,7 +928,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_mapllmqType, signHash); + auto& session = nodeState.GetOrCreateSessionFromShare(*sigShare); if (session.knows.inv[quorumMember]) { // he already knows that one @@ -870,7 +937,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_mapllmqType, signHash); + inv.Init((Consensus::LLMQType)sigShare->llmqType); } inv.inv[quorumMember] = true; session.knows.inv[quorumMember] = true; @@ -894,12 +961,48 @@ bool CSigSharesManager::SendMessages() std::unordered_map> sigSharesToRequest; std::unordered_map> sigSharesToSend; std::unordered_map> sigSharesToAnnounce; + std::unordered_map> sigSessionAnnouncements; + + auto addSigSesAnnIfNeeded = [&](NodeId nodeId, const uint256& signHash) { + auto& nodeState = nodeStates[nodeId]; + auto session = nodeState.GetSessionBySignHash(signHash); + assert(session); + if (session->sendSessionId == (uint32_t)-1) { + session->sendSessionId = nodeState.nextSendSessionId++; + + CSigSesAnn sigSesAnn; + sigSesAnn.sessionId = session->sendSessionId; + sigSesAnn.llmqType = (uint8_t)session->llmqType; + sigSesAnn.quorumHash = session->quorumHash; + sigSesAnn.id = session->id; + sigSesAnn.msgHash = session->msgHash; + + sigSessionAnnouncements[nodeId].emplace_back(sigSesAnn); + } + return session->sendSessionId; + }; { LOCK(cs); CollectSigSharesToRequest(sigSharesToRequest); CollectSigSharesToSend(sigSharesToSend); CollectSigSharesToAnnounce(sigSharesToAnnounce); + + for (auto& p : sigSharesToRequest) { + for (auto& p2 : p.second) { + p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first); + } + } + for (auto& p : sigSharesToSend) { + for (auto& p2 : p.second) { + p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first); + } + } + for (auto& p : sigSharesToAnnounce) { + for (auto& p2 : p.second) { + p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first); + } + } } bool didSend = false; @@ -907,12 +1010,22 @@ bool CSigSharesManager::SendMessages() g_connman->ForEachNode([&](CNode* pnode) { CNetMsgMaker msgMaker(pnode->GetSendVersion()); + auto it1 = sigSessionAnnouncements.find(pnode->id); + if (it1 != sigSessionAnnouncements.end()) { + for (auto& sigSesAnn : it1->second) { + LogPrint("llmq", "CSigSharesManager::SendMessages -- QSIGSESANN signHash=%s, sessionId=%d, node=%d\n", + CLLMQUtils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->id); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, sigSesAnn), false); + didSend = true; + } + } + auto it = sigSharesToRequest.find(pnode->id); if (it != sigSharesToRequest.end()) { for (auto& p : it->second) { assert(p.second.CountSet() != 0); - LogPrint("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES inv={%s}, node=%d\n", - p.second.ToString(), pnode->id); + LogPrint("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES signHash=%s, inv={%s}, node=%d\n", + p.first.ToString(), p.second.ToString(), pnode->id); g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second), false); didSend = true; } @@ -922,8 +1035,12 @@ bool CSigSharesManager::SendMessages() if (jt != sigSharesToSend.end()) { for (auto& p : jt->second) { assert(!p.second.sigShares.empty()); - LogPrint("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES inv={%s}, node=%d\n", - p.second.ToInv().ToString(), pnode->id); + if (LogAcceptCategory("llmq")) { + LOCK(cs); + auto session = nodeStates[pnode->id].GetSessionBySignHash(p.first); + LogPrint("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n", + p.first.ToString(), p.second.ToInv(session->llmqType).ToString(), pnode->id); + } g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second), false); didSend = true; } @@ -933,8 +1050,8 @@ bool CSigSharesManager::SendMessages() if (kt != sigSharesToAnnounce.end()) { for (auto& p : kt->second) { assert(p.second.CountSet() != 0); - LogPrint("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV inv={%s}, node=%d\n", - p.second.ToString(), pnode->id); + LogPrint("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV signHash=%s, inv={%s}, node=%d\n", + p.first.ToString(), p.second.ToString(), pnode->id); g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second), false); didSend = true; } @@ -950,6 +1067,27 @@ bool CSigSharesManager::SendMessages() return didSend; } +bool CSigSharesManager::GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo) +{ + LOCK(cs); + return nodeStates[nodeId].GetSessionInfoByRecvId(sessionId, retInfo); +} + +CSigShare CSigSharesManager::RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, size_t idx) +{ + assert(idx < batchedSigShares.sigShares.size()); + auto& s = batchedSigShares.sigShares[idx]; + CSigShare sigShare; + sigShare.llmqType = session.llmqType; + sigShare.quorumHash = session.quorumHash; + sigShare.quorumMember = s.first; + sigShare.id = session.id; + sigShare.msgHash = session.msgHash; + sigShare.sigShare = s.second; + sigShare.UpdateKey(); + return sigShare; +} + void CSigSharesManager::Cleanup() { int64_t now = GetTimeMillis(); @@ -1201,15 +1339,15 @@ void CSigSharesManager::Sign(const CQuorumCPtr& quorum, const uint256& id, const sigShare.sigShare.SetSig(skShare.Sign(signHash)); if (!sigShare.sigShare.GetSig().IsValid()) { - LogPrintf("CSigSharesManager::%s -- failed to sign sigShare. id=%s, msgHash=%s, time=%s\n", __func__, - sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); + LogPrintf("CSigSharesManager::%s -- failed to sign sigShare. signHahs=%s, id=%s, msgHash=%s, time=%s\n", __func__, + signHash.ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); return; } sigShare.UpdateKey(); - LogPrintf("CSigSharesManager::%s -- signed sigShare. id=%s, msgHash=%s, time=%s\n", __func__, - sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); + LogPrintf("CSigSharesManager::%s -- signed sigShare. signHash=%s, id=%s, msgHash=%s, time=%s\n", __func__, + signHash.ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); ProcessSigShare(-1, sigShare, *g_connman, quorum); } diff --git a/src/llmq/quorums_signing_shares.h b/src/llmq/quorums_signing_shares.h index 3f11c98b5ed13..c9c4723b4a8c4 100644 --- a/src/llmq/quorums_signing_shares.h +++ b/src/llmq/quorums_signing_shares.h @@ -56,11 +56,36 @@ class CSigShare } }; -class CSigSharesInv +// Nodes will first announce a signing session with a sessionId to be used in all future P2P messages related to that +// session. We locally keep track of the mapping for each node. We also assign new sessionIds for outgoing sessions +// and send QSIGSESANN messages appropriately. All values except the max value for uint32_t are valid as sessionId +class CSigSesAnn { public: + uint32_t sessionId{(uint32_t)-1}; uint8_t llmqType; - uint256 signHash; + uint256 quorumHash; + uint256 id; + uint256 msgHash; + + ADD_SERIALIZE_METHODS + + template + inline void SerializationOp(Stream& s, Operation ser_action) { + READWRITE(VARINT(sessionId)); + READWRITE(llmqType); + READWRITE(quorumHash); + READWRITE(id); + READWRITE(msgHash); + } + + std::string ToString() const; +}; + +class CSigSharesInv +{ +public: + uint32_t sessionId{(uint32_t)-1}; std::vector inv; public: @@ -69,20 +94,14 @@ class CSigSharesInv template inline void SerializationOp(Stream& s, Operation ser_action) { - READWRITE(llmqType); - - auto& consensus = Params().GetConsensus(); - auto it = consensus.llmqs.find((Consensus::LLMQType)llmqType); - if (it == consensus.llmqs.end()) { - throw std::ios_base::failure("invalid llmqType"); - } - const auto& params = it->second; + uint64_t invSize = inv.size(); - READWRITE(signHash); - READWRITE(AUTOBITSET(inv, (size_t)params.size)); + READWRITE(VARINT(sessionId)); + READWRITE(COMPACTSIZE(invSize)); + READWRITE(AUTOBITSET(inv, (size_t)invSize)); } - void Init(Consensus::LLMQType _llmqType, const uint256& _signHash); + void Init(Consensus::LLMQType _llmqType); bool IsSet(uint16_t quorumMember) const; void Set(uint16_t quorumMember, bool v); void Merge(const CSigSharesInv& inv2); @@ -95,10 +114,7 @@ class CSigSharesInv class CBatchedSigShares { public: - uint8_t llmqType; - uint256 quorumHash; - uint256 id; - uint256 msgHash; + uint32_t sessionId{(uint32_t)-1}; std::vector> sigShares; public: @@ -107,29 +123,11 @@ class CBatchedSigShares template inline void SerializationOp(Stream& s, Operation ser_action) { - READWRITE(llmqType); - READWRITE(quorumHash); - READWRITE(id); - READWRITE(msgHash); + READWRITE(VARINT(sessionId)); READWRITE(sigShares); } - CSigShare RebuildSigShare(size_t idx) const - { - assert(idx < sigShares.size()); - auto& s = sigShares[idx]; - CSigShare sigShare; - sigShare.llmqType = llmqType; - sigShare.quorumHash = quorumHash; - sigShare.quorumMember = s.first; - sigShare.id = id; - sigShare.msgHash = msgHash; - sigShare.sigShare = s.second; - sigShare.UpdateKey(); - return sigShare; - } - - CSigSharesInv ToInv() const; + CSigSharesInv ToInv(Consensus::LLMQType llmqType) const; }; template @@ -280,7 +278,31 @@ class SigShareMap class CSigSharesNodeState { public: + // Used to avoid holding locks too long + struct SessionInfo + { + uint32_t recvSessionId; + Consensus::LLMQType llmqType; + uint256 quorumHash; + uint256 id; + uint256 msgHash; + uint256 signHash; + + CQuorumCPtr quorum; + }; + struct Session { + uint32_t recvSessionId{(uint32_t)-1}; + uint32_t sendSessionId{(uint32_t)-1}; + + Consensus::LLMQType llmqType; + uint256 quorumHash; + uint256 id; + uint256 msgHash; + uint256 signHash; + + CQuorumCPtr quorum; + CSigSharesInv announced; CSigSharesInv requested; CSigSharesInv knows; @@ -288,6 +310,9 @@ class CSigSharesNodeState // TODO limit number of sessions per node std::unordered_map sessions; + std::unordered_map sessionByRecvId; + uint32_t nextSendSessionId{1}; + SigShareMap pendingIncomingSigShares; SigShareMap requestedSigShares; @@ -297,15 +322,11 @@ class CSigSharesNodeState bool banned{false}; - Session& GetOrCreateSession(Consensus::LLMQType llmqType, const uint256& signHash); - - void MarkAnnounced(const uint256& signHash, const CSigSharesInv& inv); - void MarkRequested(const uint256& signHash, const CSigSharesInv& inv); - void MarkKnows(const uint256& signHash, const CSigSharesInv& inv); - - void MarkAnnounced(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember); - void MarkRequested(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember); - void MarkKnows(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember); + Session& GetOrCreateSessionFromShare(const CSigShare& sigShare); + Session& GetOrCreateSessionFromAnn(const CSigSesAnn& ann); + Session* GetSessionBySignHash(const uint256& signHash); + Session* GetSessionByRecvId(uint32_t sessionId); + bool GetSessionInfoByRecvId(uint32_t sessionId, SessionInfo& retInfo); void RemoveSession(const uint256& signHash); }; @@ -357,12 +378,13 @@ class CSigSharesManager : public CRecoveredSigsListener void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig); private: + void ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman); void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman); - bool VerifySigSharesInv(NodeId from, const CSigSharesInv& inv); - bool PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedSigShares& batchedSigShares, bool& retBan); + bool VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv); + bool PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan); void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, std::unordered_map>& retSigShares, @@ -378,6 +400,9 @@ class CSigSharesManager : public CRecoveredSigsListener void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash, CConnman& connman); private: + bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo); + CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, size_t idx); + void Cleanup(); void RemoveSigSharesForSession(const uint256& signHash); void RemoveBannedNodeStates(); diff --git a/src/protocol.cpp b/src/protocol.cpp index 114f1958a442f..6df80cae74fa4 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -65,6 +65,7 @@ const char *QJUSTIFICATION="qjustify"; const char *QPCOMMITMENT="qpcommit"; const char *QWATCH="qwatch"; const char *QDEBUGSTATUS="qdebugstatus"; +const char *QSIGSESANN="qsigsesann"; const char *QSIGSHARESINV="qsigsinv"; const char *QGETSIGSHARES="qgetsigs"; const char *QBSIGSHARES="qbsigs"; @@ -165,6 +166,7 @@ const static std::string allNetMessageTypes[] = { NetMsgType::QPCOMMITMENT, NetMsgType::QWATCH, NetMsgType::QDEBUGSTATUS, + NetMsgType::QSIGSESANN, NetMsgType::QSIGSHARESINV, NetMsgType::QGETSIGSHARES, NetMsgType::QBSIGSHARES, diff --git a/src/protocol.h b/src/protocol.h index ae2c95b4d1b98..b4d6eab3eccaf 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -271,6 +271,7 @@ extern const char *QJUSTIFICATION; extern const char *QPCOMMITMENT; extern const char *QWATCH; extern const char *QDEBUGSTATUS; +extern const char *QSIGSESANN; extern const char *QSIGSHARESINV; extern const char *QGETSIGSHARES; extern const char *QBSIGSHARES;