Skip to content

Commit

Permalink
Merge pull request #3397 from codablock/pr_backport_txrequests
Browse files Browse the repository at this point in the history
Backport bitcoin#14897 and bitcoin#15834 and modify it to work with Dash messages
  • Loading branch information
codablock committed Apr 8, 2020
2 parents 410d4a6 + 5cf417b commit 26dec64
Show file tree
Hide file tree
Showing 23 changed files with 473 additions and 172 deletions.
1 change: 1 addition & 0 deletions src/Makefile.bench.include
Expand Up @@ -29,6 +29,7 @@ bench_bench_dash_SOURCES = \
bench/ccoins_caching.cpp \
bench/merkle_root.cpp \
bench/mempool_eviction.cpp \
bench/util_time.cpp \
bench/base58.cpp \
bench/lockedpool.cpp \
bench/poly1305.cpp \
Expand Down
42 changes: 42 additions & 0 deletions src/bench/util_time.cpp
@@ -0,0 +1,42 @@
// Copyright (c) 2019 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#include <bench/bench.h>

#include <utiltime.h>

static void BenchTimeDeprecated(benchmark::State& state)
{
while (state.KeepRunning()) {
(void)GetTime();
}
}

static void BenchTimeMock(benchmark::State& state)
{
SetMockTime(111);
while (state.KeepRunning()) {
(void)GetTime<std::chrono::seconds>();
}
SetMockTime(0);
}

static void BenchTimeMillis(benchmark::State& state)
{
while (state.KeepRunning()) {
(void)GetTime<std::chrono::milliseconds>();
}
}

static void BenchTimeMillisSys(benchmark::State& state)
{
while (state.KeepRunning()) {
(void)GetTimeMillis();
}
}

BENCHMARK(BenchTimeDeprecated/*, 100000000*/);
BENCHMARK(BenchTimeMillis/*, 6000000*/);
BENCHMARK(BenchTimeMillisSys/*, 6000000*/);
BENCHMARK(BenchTimeMock/*, 300000000*/);
8 changes: 4 additions & 4 deletions src/governance/governance.cpp
Expand Up @@ -135,7 +135,7 @@ void CGovernanceManager::ProcessMessage(CNode* pfrom, const std::string& strComm

{
LOCK(cs_main);
connman.RemoveAskFor(nHash);
EraseObjectRequest(pfrom->GetId(), CInv(MSG_GOVERNANCE_OBJECT, nHash));
}

if (pfrom->nVersion < MIN_GOVERNANCE_PEER_PROTO_VERSION) {
Expand Down Expand Up @@ -210,7 +210,7 @@ void CGovernanceManager::ProcessMessage(CNode* pfrom, const std::string& strComm

{
LOCK(cs_main);
connman.RemoveAskFor(nHash);
EraseObjectRequest(pfrom->GetId(), CInv(MSG_GOVERNANCE_OBJECT_VOTE, nHash));
}

if (pfrom->nVersion < MIN_GOVERNANCE_PEER_PROTO_VERSION) {
Expand Down Expand Up @@ -1033,8 +1033,8 @@ int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector<CNode*>&
// stop early to prevent setAskFor overflow
{
LOCK(cs_main);
size_t nProjectedSize = pnode->setAskFor.size() + nProjectedVotes;
if (nProjectedSize > SETASKFOR_MAX_SZ / 2) continue;
size_t nProjectedSize = GetRequestedObjectCount(pnode->GetId()) + nProjectedVotes;
if (nProjectedSize > MAX_INV_SZ) continue;
// to early to ask the same node
if (mapAskedRecently[nHashGovobj].count(pnode->addr)) continue;
}
Expand Down
2 changes: 1 addition & 1 deletion src/llmq/quorums_blockprocessor.cpp
Expand Up @@ -36,7 +36,7 @@ void CQuorumBlockProcessor::ProcessMessage(CNode* pfrom, const std::string& strC
auto hash = ::SerializeHash(qc);
{
LOCK(cs_main);
connman.RemoveAskFor(hash);
EraseObjectRequest(pfrom->GetId(), CInv(MSG_QUORUM_FINAL_COMMITMENT, hash));
}

if (qc.IsNull()) {
Expand Down
2 changes: 1 addition & 1 deletion src/llmq/quorums_chainlocks.cpp
Expand Up @@ -103,7 +103,7 @@ void CChainLocksHandler::ProcessNewChainLock(NodeId from, const llmq::CChainLock
{
{
LOCK(cs_main);
g_connman->RemoveAskFor(hash);
EraseObjectRequest(from, CInv(MSG_CLSIG, hash));
}

{
Expand Down
27 changes: 14 additions & 13 deletions src/llmq/quorums_dkgsessionhandler.cpp
Expand Up @@ -18,8 +18,9 @@
namespace llmq
{

CDKGPendingMessages::CDKGPendingMessages(size_t _maxMessagesPerNode) :
maxMessagesPerNode(_maxMessagesPerNode)
CDKGPendingMessages::CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) :
maxMessagesPerNode(_maxMessagesPerNode),
invType(_invType)
{
}

Expand Down Expand Up @@ -50,7 +51,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv)
return;
}

g_connman->RemoveAskFor(hash);
EraseObjectRequest(from, CInv(invType, hash));

pendingMessages.emplace_back(std::make_pair(from, std::move(pm)));
}
Expand Down Expand Up @@ -90,10 +91,10 @@ CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, ctp
blsWorker(_blsWorker),
dkgManager(_dkgManager),
curSession(std::make_shared<CDKGSession>(_params, _blsWorker, _dkgManager)),
pendingContributions((size_t)_params.size * 2), // we allow size*2 messages as we need to make sure we see bad behavior (double messages)
pendingComplaints((size_t)_params.size * 2),
pendingJustifications((size_t)_params.size * 2),
pendingPrematureCommitments((size_t)_params.size * 2)
pendingContributions((size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages)
pendingComplaints((size_t)_params.size * 2, MSG_QUORUM_COMPLAINT),
pendingJustifications((size_t)_params.size * 2, MSG_QUORUM_JUSTIFICATION),
pendingPrematureCommitments((size_t)_params.size * 2, MSG_QUORUM_PREMATURE_COMMITMENT)
{
phaseHandlerThread = std::thread([this] {
RenameThread(strprintf("dash-q-phase-%d", (uint8_t)params.type).c_str());
Expand Down Expand Up @@ -416,7 +417,7 @@ std::set<NodeId> BatchVerifyMessageSigs(CDKGSession& session, const std::vector<
return ret;
}

template<typename Message>
template<typename Message, int MessageType>
bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendingMessages, size_t maxCount)
{
auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount);
Expand All @@ -443,7 +444,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendi
auto hash = ::SerializeHash(msg);
{
LOCK(cs_main);
g_connman->RemoveAskFor(hash);
EraseObjectRequest(p.first, CInv(MessageType, hash));
}

bool ban = false;
Expand Down Expand Up @@ -540,7 +541,7 @@ void CDKGSessionHandler::HandleDKGRound()
curSession->Contribute(pendingContributions);
};
auto fContributeWait = [this] {
return ProcessPendingMessageBatch<CDKGContribution>(*curSession, pendingContributions, 8);
return ProcessPendingMessageBatch<CDKGContribution, MSG_QUORUM_CONTRIB>(*curSession, pendingContributions, 8);
};
HandlePhase(QuorumPhase_Contribute, QuorumPhase_Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait);

Expand All @@ -549,7 +550,7 @@ void CDKGSessionHandler::HandleDKGRound()
curSession->VerifyAndComplain(pendingComplaints);
};
auto fComplainWait = [this] {
return ProcessPendingMessageBatch<CDKGComplaint>(*curSession, pendingComplaints, 8);
return ProcessPendingMessageBatch<CDKGComplaint, MSG_QUORUM_COMPLAINT>(*curSession, pendingComplaints, 8);
};
HandlePhase(QuorumPhase_Complain, QuorumPhase_Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait);

Expand All @@ -558,7 +559,7 @@ void CDKGSessionHandler::HandleDKGRound()
curSession->VerifyAndJustify(pendingJustifications);
};
auto fJustifyWait = [this] {
return ProcessPendingMessageBatch<CDKGJustification>(*curSession, pendingJustifications, 8);
return ProcessPendingMessageBatch<CDKGJustification, MSG_QUORUM_JUSTIFICATION>(*curSession, pendingJustifications, 8);
};
HandlePhase(QuorumPhase_Justify, QuorumPhase_Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait);

Expand All @@ -567,7 +568,7 @@ void CDKGSessionHandler::HandleDKGRound()
curSession->VerifyAndCommit(pendingPrematureCommitments);
};
auto fCommitWait = [this] {
return ProcessPendingMessageBatch<CDKGPrematureCommitment>(*curSession, pendingPrematureCommitments, 8);
return ProcessPendingMessageBatch<CDKGPrematureCommitment, MSG_QUORUM_PREMATURE_COMMITMENT>(*curSession, pendingPrematureCommitments, 8);
};
HandlePhase(QuorumPhase_Commit, QuorumPhase_Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait);

Expand Down
3 changes: 2 additions & 1 deletion src/llmq/quorums_dkgsessionhandler.h
Expand Up @@ -40,13 +40,14 @@ class CDKGPendingMessages

private:
mutable CCriticalSection cs;
int invType;
size_t maxMessagesPerNode;
std::list<BinaryMessage> pendingMessages;
std::map<NodeId, size_t> messagesPerNode;
std::set<uint256> seenMessages;

public:
explicit CDKGPendingMessages(size_t _maxMessagesPerNode);
explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType);

void PushPendingMessage(NodeId from, CDataStream& vRecv);
std::list<BinaryMessage> PopPendingMessages(size_t maxCount);
Expand Down
4 changes: 2 additions & 2 deletions src/llmq/quorums_instantsend.cpp
Expand Up @@ -902,7 +902,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
{
{
LOCK(cs_main);
g_connman->RemoveAskFor(hash);
EraseObjectRequest(from, CInv(MSG_ISLOCK, hash));
}

CTransactionRef tx;
Expand Down Expand Up @@ -1367,7 +1367,7 @@ void CInstantSendManager::AskNodesForLockedTx(const uint256& txid)
txid.ToString(), pnode->GetId());

CInv inv(MSG_TX, txid);
pnode->AskFor(inv);
RequestObject(pnode->GetId(), inv, GetTime<std::chrono::microseconds>(), true);
}
}
for (CNode* pnode : nodesToAskFor) {
Expand Down
2 changes: 1 addition & 1 deletion src/llmq/quorums_signing.cpp
Expand Up @@ -684,7 +684,7 @@ void CSigningManager::ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& re

{
LOCK(cs_main);
connman.RemoveAskFor(recoveredSig.GetHash());
EraseObjectRequest(nodeId, CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig.GetHash()));
}

if (db.HasRecoveredSigForHash(recoveredSig.GetHash())) {
Expand Down
68 changes: 0 additions & 68 deletions src/net.cpp
Expand Up @@ -105,8 +105,6 @@ std::map<CNetAddr, LocalServiceInfo> mapLocalHost;
static bool vfLimited[NET_MAX] = {};
std::string strSubVersion;

unordered_limitedmap<uint256, int64_t, StaticSaltedHasher> mapAlreadyAskedFor(MAX_INV_SZ, MAX_INV_SZ * 2);

void CConnman::AddOneShot(const std::string& strDest)
{
LOCK(cs_vOneShots);
Expand Down Expand Up @@ -3175,16 +3173,6 @@ void CConnman::RelayInvFiltered(CInv &inv, const uint256& relatedTxHash, const i
}
}

void CConnman::RemoveAskFor(const uint256& hash)
{
mapAlreadyAskedFor.erase(hash);

LOCK(cs_vNodes);
for (const auto& pnode : vNodes) {
pnode->RemoveAskFor(hash);
}
}

void CConnman::RecordBytesRecv(uint64_t bytes)
{
LOCK(cs_totalBytesRecv);
Expand Down Expand Up @@ -3390,62 +3378,6 @@ CNode::~CNode()
CloseSocket(hSocket);
}

void CNode::AskFor(const CInv& inv, int64_t doubleRequestDelay)
{
if (queueAskFor.size() > MAPASKFOR_MAX_SZ || setAskFor.size() > SETASKFOR_MAX_SZ) {
int64_t nNow = GetTime();
if(nNow - nLastWarningTime > WARNING_INTERVAL) {
LogPrintf("CNode::AskFor -- WARNING: inventory message dropped: vecAskFor.size = %d, setAskFor.size = %d, MAPASKFOR_MAX_SZ = %d, SETASKFOR_MAX_SZ = %d, nSkipped = %d, peer=%d\n",
queueAskFor.size(), setAskFor.size(), MAPASKFOR_MAX_SZ, SETASKFOR_MAX_SZ, nNumWarningsSkipped, id);
nLastWarningTime = nNow;
nNumWarningsSkipped = 0;
}
else {
++nNumWarningsSkipped;
}
return;
}
// a peer may not have multiple non-responded queue positions for a single inv item
if (!setAskFor.emplace(inv.hash).second)
return;

// We're using queueAskFor as a priority queue,
// the key is the earliest time the request can be sent
int64_t nRequestTime;
auto it = mapAlreadyAskedFor.find(inv.hash);
if (it != mapAlreadyAskedFor.end())
nRequestTime = it->second;
else
nRequestTime = 0;

LogPrint(BCLog::NET, "askfor %s %d (%s) peer=%d\n", inv.ToString(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000), id);

// Make sure not to reuse time indexes to keep things in the same order
int64_t nNow = GetTimeMicros() - 1000000;
static int64_t nLastTime;
++nLastTime;
nNow = std::max(nNow, nLastTime);
nLastTime = nNow;

// Each retry is 2 minutes after the last
nRequestTime = std::max(nRequestTime + doubleRequestDelay, nNow);
if (it != mapAlreadyAskedFor.end())
mapAlreadyAskedFor.update(it, nRequestTime);
else
mapAlreadyAskedFor.insert(std::make_pair(inv.hash, nRequestTime));

queueAskFor.emplace(nRequestTime, inv);
setAskForInQueue.emplace(inv.hash);
}

void CNode::RemoveAskFor(const uint256& hash)
{
setAskFor.erase(hash);
// we don't really remove it from queueAskFor as it would be too expensive to rebuild the heap
// instead, we're ignoring the entry later as it won't be found in setAskForInQueue anymore
setAskForInQueue.erase(hash);
}

bool CConnman::NodeFullyConnected(const CNode* pnode)
{
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
Expand Down
13 changes: 0 additions & 13 deletions src/net.h
Expand Up @@ -90,10 +90,6 @@ static const bool DEFAULT_UPNP = USE_UPNP;
#else
static const bool DEFAULT_UPNP = false;
#endif
/** The maximum number of entries in mapAskFor */
static const size_t MAPASKFOR_MAX_SZ = MAX_INV_SZ;
/** The maximum number of entries in setAskFor (larger due to getdata latency)*/
static const size_t SETASKFOR_MAX_SZ = 2 * MAX_INV_SZ;
/** The maximum number of peer connections to maintain.
* Masternodes are forced to accept at least this many connections
*/
Expand Down Expand Up @@ -355,7 +351,6 @@ class CConnman
void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx, const int minProtoVersion = MIN_PEER_PROTO_VERSION, bool fAllowMasternodeConnections = false);
// This overload will not update node filters, so use it only for the cases when other messages will update related transaction data in filters
void RelayInvFiltered(CInv &inv, const uint256 &relatedTxHash, const int minProtoVersion = MIN_PEER_PROTO_VERSION, bool fAllowMasternodeConnections = false);
void RemoveAskFor(const uint256& hash);

// Addrman functions
size_t GetAddressCount() const;
Expand Down Expand Up @@ -667,8 +662,6 @@ extern bool fDiscover;
extern bool fListen;
extern bool fRelayTxes;

extern unordered_limitedmap<uint256, int64_t, StaticSaltedHasher> mapAlreadyAskedFor;

/** Subversion as sent to the P2P network in `version` messages */
extern std::string strSubVersion;

Expand Down Expand Up @@ -864,9 +857,6 @@ class CNode
// List of non-tx/non-block inventory items
std::vector<CInv> vInventoryOtherToSend;
CCriticalSection cs_inventory;
std::unordered_set<uint256, StaticSaltedHasher> setAskFor;
std::unordered_set<uint256, StaticSaltedHasher> setAskForInQueue;
std::priority_queue<std::pair<int64_t, CInv>, std::vector<std::pair<int64_t, CInv>>, std::greater<>> queueAskFor;
int64_t nNextInvSend;
// Used for headers announcements - unfiltered blocks to relay
// Also protected by cs_inventory
Expand Down Expand Up @@ -1041,9 +1031,6 @@ class CNode
vBlockHashesToAnnounce.push_back(hash);
}

void AskFor(const CInv& inv, int64_t doubleRequestDelay = 2 * 60 * 1000000);
void RemoveAskFor(const uint256& hash);

void CloseSocketDisconnect();

void copyStats(CNodeStats &stats);
Expand Down

0 comments on commit 26dec64

Please sign in to comment.