Skip to content

Commit

Permalink
Optimize DKG debug message processing for performance and lower bandw…
Browse files Browse the repository at this point in the history
…idth (#2672)

* Allow sub-batch verification in CBLSInsecureBatchVerifier

* Implement batch verification of CDKGDebugStatus messages

* Use uint8_t for statusBitset in CDKGDebugMemberStatus and CDKGDebugSessionStatus

No need to waste one byte per member and per LLMQ type.

* Reserve 4k of buffer for CSerializedNetMsg buffer

Profiling has shown that a lot of time is spent in resizing the data
vector when large messages are involved.

* Remove nHeight from CDKGDebugStatus

This field changes every block and causes all masternodes to propagate
its status for every block, even if nothing DKG related has changed.

* Leave out session statuses when we're not a member of that session

Otherwise MNs which are not members of DKG sessions will spam the network

* Remove receivedFinalCommitment from CDKGDebugSessionStatus

This is not bound to a session and thus is prone to spam the network when
final commitments are propagated in the finalization phase.

* Add "minableCommitments" to "quorum dkgstatus"

* Hold cs_main while calling GetMinableCommitment

* Abort processing of pending debug messages when spork18 gets disabled

* Don't ask for debug messages when we've already seen them

"statuses" only contains the current messages but none of the old messages,
so nodes kept re-requesting old messages.
  • Loading branch information
codablock authored and UdjinM6 committed Feb 1, 2019
1 parent 5d1c97d commit 18950f9
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 89 deletions.
14 changes: 11 additions & 3 deletions qa/rpc-tests/test_framework/test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,11 @@ def wait_for_quorum_phase(self, phase, check_received_messages, check_received_m
while time() - t < timeout:
all_ok = True
for mn in self.mninfo:
s = mn.node.quorum("dkgstatus")["session"]["llmq_10"]
s = mn.node.quorum("dkgstatus")["session"]
if "llmq_10" not in s:
all_ok = False
break
s = s["llmq_10"]
if "phase" not in s:
all_ok = False
break
Expand All @@ -467,8 +471,12 @@ def wait_for_quorum_commitment(self, timeout = 15):
while time() - t < timeout:
all_ok = True
for node in self.nodes:
s = node.quorum("dkgstatus")["session"]["llmq_10"]
if "receivedFinalCommitment" not in s or not s["receivedFinalCommitment"]:
s = node.quorum("dkgstatus")
if "minableCommitments" not in s:
all_ok = False
break
s = s["minableCommitments"]
if "llmq_10" not in s:
all_ok = False
break
if all_ok:
Expand Down
22 changes: 21 additions & 1 deletion src/bls/bls_batchverifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ class CBLSInsecureBatchVerifier
typedef typename MessageMap::iterator MessageMapIterator;
typedef std::map<SourceId, std::vector<MessageMapIterator>> MessagesBySourceMap;

bool perMessageFallback;
size_t subBatchSize;

MessageMap messages;
MessagesBySourceMap messagesBySource;

Expand All @@ -33,15 +36,32 @@ class CBLSInsecureBatchVerifier
std::set<MessageId> badMessages;

public:
CBLSInsecureBatchVerifier(bool _perMessageFallback, size_t _subBatchSize = 0) :
perMessageFallback(_perMessageFallback),
subBatchSize(_subBatchSize)
{
}

void PushMessage(const SourceId& sourceId, const MessageId& msgId, const uint256& msgHash, const CBLSSignature& sig, const CBLSPublicKey& pubKey)
{
assert(sig.IsValid() && pubKey.IsValid());

auto it = messages.emplace(msgId, Message{msgId, msgHash, sig, pubKey}).first;
messagesBySource[sourceId].emplace_back(it);

if (subBatchSize != 0 && messages.size() >= subBatchSize) {
Verify();
ClearMessages();
}
}

void ClearMessages()
{
messages.clear();
messagesBySource.clear();
}

void Verify(bool perMessageFallback)
void Verify()
{
std::map<uint256, std::vector<MessageMapIterator>> byMessageHash;

Expand Down
8 changes: 0 additions & 8 deletions src/llmq/quorums_blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,14 +337,6 @@ void CQuorumBlockProcessor::AddMinableCommitment(const CFinalCommitment& fqc)
}
}

quorumDKGDebugManager->UpdateLocalSessionStatus((Consensus::LLMQType)fqc.llmqType, [&](CDKGDebugSessionStatus& status) {
if (status.quorumHash != fqc.quorumHash || status.receivedFinalCommitment) {
return false;
}
status.receivedFinalCommitment = true;
return true;
});

// We only relay the new commitment if it's new or better then the old one
if (relay) {
CInv inv(MSG_QUORUM_FINAL_COMMITMENT, commitmentHash);
Expand Down
184 changes: 133 additions & 51 deletions src/llmq/quorums_debug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "quorums_debug.h"

#include "activemasternode.h"
#include "bls/bls_batchverifier.h"
#include "chainparams.h"
#include "net.h"
#include "net_processing.h"
Expand Down Expand Up @@ -95,7 +96,6 @@ UniValue CDKGDebugSessionStatus::ToJson(int detailLevel) const
push(receivedComplaints, "receivedComplaints");
push(receivedJustifications, "receivedJustifications");
push(receivedPrematureCommitments, "receivedPrematureCommitments");
ret.push_back(Pair("receivedFinalCommitment", receivedFinalCommitment));

if (detailLevel == 2) {
UniValue arr(UniValue::VARR);
Expand All @@ -108,12 +108,9 @@ UniValue CDKGDebugSessionStatus::ToJson(int detailLevel) const
return ret;
}

CDKGDebugManager::CDKGDebugManager(CScheduler* scheduler)
CDKGDebugManager::CDKGDebugManager(CScheduler* _scheduler) :
scheduler(_scheduler)
{
for (const auto& p : Params().GetConsensus().llmqs) {
ResetLocalSessionStatus(p.first, uint256(), 0);
}

if (scheduler) {
scheduler->scheduleEvery([&]() {
SendLocalStatus();
Expand All @@ -131,73 +128,136 @@ void CDKGDebugManager::ProcessMessage(CNode* pfrom, const std::string& strComman
CDKGDebugStatus status;
vRecv >> status;

uint256 hash = ::SerializeHash(status);

{
LOCK(cs_main);
connman.RemoveAskFor(::SerializeHash(status));
connman.RemoveAskFor(hash);
}

bool ban = false;
if (!PreVerifyDebugStatusMessage(hash, status, ban)) {
if (ban) {
LOCK(cs_main);
Misbehaving(pfrom->id, 10);
return;
}
}

ProcessDebugStatusMessage(pfrom->id, status);
LOCK(cs);

pendingIncomingStatuses.emplace(hash, std::make_pair(std::move(status), pfrom->id));

ScheduleProcessPending();
}
}

void CDKGDebugManager::ProcessDebugStatusMessage(NodeId nodeId, llmq::CDKGDebugStatus& status)
bool CDKGDebugManager::PreVerifyDebugStatusMessage(const uint256& hash, llmq::CDKGDebugStatus& status, bool& retBan)
{
retBan = false;

auto dmn = deterministicMNManager->GetListAtChainTip().GetMN(status.proTxHash);
if (!dmn) {
if (nodeId != -1) {
LOCK(cs_main);
Misbehaving(nodeId, 10);
}
return;
retBan = true;
return false;
}

{
LOCK(cs);

if (!seenStatuses.emplace(hash, GetTimeMillis()).second) {
return false;
}

auto it = statusesForMasternodes.find(status.proTxHash);
if (it != statusesForMasternodes.end()) {
if (statuses[it->second].nTime >= status.nTime) {
// we know a more recent status already
return;
return false;
}
}
}

// check if all expected LLMQ types are present and valid
std::set<Consensus::LLMQType> llmqTypes;
// check if all present LLMQ types are valid
for (const auto& p : status.sessions) {
if (!Params().GetConsensus().llmqs.count((Consensus::LLMQType)p.first)) {
if (nodeId != -1) {
LOCK(cs_main);
Misbehaving(nodeId, 10);
}
return;
retBan = true;
return false;
}
const auto& params = Params().GetConsensus().llmqs.at((Consensus::LLMQType)p.first);
if (p.second.llmqType != p.first || p.second.members.size() != (size_t)params.size) {
if (nodeId != -1) {
LOCK(cs_main);
Misbehaving(nodeId, 10);
}
return;
retBan = true;
return false;
}
llmqTypes.emplace((Consensus::LLMQType)p.first);
}
for (const auto& p : Params().GetConsensus().llmqs) {
if (!llmqTypes.count(p.first)) {
if (nodeId != -1) {
LOCK(cs_main);
Misbehaving(nodeId, 10);
}
return;

return true;
}

void CDKGDebugManager::ScheduleProcessPending()
{
AssertLockHeld(cs);

if (hasScheduledProcessPending) {
return;
}

scheduler->schedule([&] {
ProcessPending();
}, boost::chrono::system_clock::now() + boost::chrono::milliseconds(100));
}

void CDKGDebugManager::ProcessPending()
{
decltype(pendingIncomingStatuses) pend;

{
LOCK(cs);
hasScheduledProcessPending = false;
pend = std::move(pendingIncomingStatuses);
}

if (!sporkManager.IsSporkActive(SPORK_18_QUORUM_DEBUG_ENABLED)) {
return;
}

CBLSInsecureBatchVerifier<NodeId, uint256> batchVerifier(true, 8);
for (const auto& p : pend) {
const auto& hash = p.first;
const auto& status = p.second.first;
auto nodeId = p.second.second;
auto dmn = deterministicMNManager->GetListAtChainTip().GetMN(status.proTxHash);
if (!dmn) {
continue;
}
batchVerifier.PushMessage(nodeId, hash, status.GetSignHash(), status.sig, dmn->pdmnState->pubKeyOperator);
}

// TODO batch verification/processing
if (!status.sig.VerifyInsecure(dmn->pdmnState->pubKeyOperator, status.GetSignHash())) {
if (nodeId != -1) {
LOCK(cs_main);
Misbehaving(nodeId, 10);
batchVerifier.Verify();

if (!batchVerifier.badSources.empty()) {
LOCK(cs_main);
for (auto& nodeId : batchVerifier.badSources) {
Misbehaving(nodeId, 100);
}
}
for (const auto& p : pend) {
const auto& hash = p.first;
const auto& status = p.second.first;
auto nodeId = p.second.second;
if (batchVerifier.badMessages.count(p.first)) {
continue;
}

ProcessDebugStatusMessage(hash, status);
}
}

// status must have a validated signature
void CDKGDebugManager::ProcessDebugStatusMessage(const uint256& hash, const llmq::CDKGDebugStatus& status)
{
auto dmn = deterministicMNManager->GetListAtChainTip().GetMN(status.proTxHash);
if (!dmn) {
return;
}

Expand All @@ -208,8 +268,6 @@ void CDKGDebugManager::ProcessDebugStatusMessage(NodeId nodeId, llmq::CDKGDebugS
statusesForMasternodes.erase(it);
}

auto hash = ::SerializeHash(status);

statuses[hash] = status;
statusesForMasternodes[status.proTxHash] = hash;

Expand All @@ -222,7 +280,6 @@ UniValue CDKGDebugStatus::ToJson(int detailLevel) const
UniValue ret(UniValue::VOBJ);

ret.push_back(Pair("proTxHash", proTxHash.ToString()));
ret.push_back(Pair("height", (int)nHeight));
ret.push_back(Pair("time", nTime));
ret.push_back(Pair("timeStr", DateTimeStrFormat("%Y-%m-%d %H:%M:%S", nTime)));

Expand All @@ -248,7 +305,7 @@ bool CDKGDebugManager::AlreadyHave(const CInv& inv)
return true;
}

return statuses.count(inv.hash) != 0;
return statuses.count(inv.hash) != 0 || seenStatuses.count(inv.hash) != 0;
}

bool CDKGDebugManager::GetDebugStatus(const uint256& hash, llmq::CDKGDebugStatus& ret)
Expand Down Expand Up @@ -280,24 +337,37 @@ void CDKGDebugManager::GetLocalDebugStatus(llmq::CDKGDebugStatus& ret)
ret.proTxHash = activeMasternodeInfo.proTxHash;
}

void CDKGDebugManager::ResetLocalSessionStatus(Consensus::LLMQType llmqType, const uint256& quorumHash, int quorumHeight)
void CDKGDebugManager::ResetLocalSessionStatus(Consensus::LLMQType llmqType)
{
LOCK(cs);

auto& params = Params().GetConsensus().llmqs.at(llmqType);
auto it = localStatus.sessions.find(llmqType);
if (it == localStatus.sessions.end()) {
return;
}

localStatus.sessions.erase(it);
localStatus.nTime = GetAdjustedTime();
}

auto& session = localStatus.sessions[llmqType];
void CDKGDebugManager::InitLocalSessionStatus(Consensus::LLMQType llmqType, const uint256& quorumHash, int quorumHeight)
{
LOCK(cs);

auto it = localStatus.sessions.find(llmqType);
if (it == localStatus.sessions.end()) {
it = localStatus.sessions.emplace((uint8_t)llmqType, CDKGDebugSessionStatus()).first;
}

auto& params = Params().GetConsensus().llmqs.at(llmqType);
auto& session = it->second;
session.llmqType = llmqType;
session.quorumHash = quorumHash;
session.quorumHeight = (uint32_t)quorumHeight;
session.phase = 0;
session.statusBitset = 0;
session.members.clear();
session.members.resize((size_t)params.size);
session.receivedFinalCommitment = false;
}

void CDKGDebugManager::UpdateLocalStatus(std::function<bool(CDKGDebugStatus& status)>&& func)
Expand All @@ -311,15 +381,27 @@ void CDKGDebugManager::UpdateLocalStatus(std::function<bool(CDKGDebugStatus& sta
void CDKGDebugManager::UpdateLocalSessionStatus(Consensus::LLMQType llmqType, std::function<bool(CDKGDebugSessionStatus& status)>&& func)
{
LOCK(cs);
if (func(localStatus.sessions.at(llmqType))) {

auto it = localStatus.sessions.find(llmqType);
if (it == localStatus.sessions.end()) {
return;
}

if (func(it->second)) {
localStatus.nTime = GetAdjustedTime();
}
}

void CDKGDebugManager::UpdateLocalMemberStatus(Consensus::LLMQType llmqType, size_t memberIdx, std::function<bool(CDKGDebugMemberStatus& status)>&& func)
{
LOCK(cs);
if (func(localStatus.sessions.at(llmqType).members.at(memberIdx))) {

auto it = localStatus.sessions.find(llmqType);
if (it == localStatus.sessions.end()) {
return;
}

if (func(it->second.members.at(memberIdx))) {
localStatus.nTime = GetAdjustedTime();
}
}
Expand Down Expand Up @@ -356,7 +438,7 @@ void CDKGDebugManager::SendLocalStatus()
status.nTime = nTime;
status.sig = activeMasternodeInfo.blsKeyOperator->Sign(status.GetSignHash());

ProcessDebugStatusMessage(-1, status);
ProcessDebugStatusMessage(newHash, status);
}

}
Loading

0 comments on commit 18950f9

Please sign in to comment.