diff --git a/src/Makefile.bench.include b/src/Makefile.bench.include index 20192f9fed9fd..b5ae95a3bb386 100644 --- a/src/Makefile.bench.include +++ b/src/Makefile.bench.include @@ -29,6 +29,7 @@ bench_bench_dash_SOURCES = \ bench/ccoins_caching.cpp \ bench/merkle_root.cpp \ bench/mempool_eviction.cpp \ + bench/util_time.cpp \ bench/base58.cpp \ bench/lockedpool.cpp \ bench/poly1305.cpp \ diff --git a/src/bench/util_time.cpp b/src/bench/util_time.cpp new file mode 100644 index 0000000000000..6900ff3f33119 --- /dev/null +++ b/src/bench/util_time.cpp @@ -0,0 +1,42 @@ +// Copyright (c) 2019 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include + +static void BenchTimeDeprecated(benchmark::State& state) +{ + while (state.KeepRunning()) { + (void)GetTime(); + } +} + +static void BenchTimeMock(benchmark::State& state) +{ + SetMockTime(111); + while (state.KeepRunning()) { + (void)GetTime(); + } + SetMockTime(0); +} + +static void BenchTimeMillis(benchmark::State& state) +{ + while (state.KeepRunning()) { + (void)GetTime(); + } +} + +static void BenchTimeMillisSys(benchmark::State& state) +{ + while (state.KeepRunning()) { + (void)GetTimeMillis(); + } +} + +BENCHMARK(BenchTimeDeprecated/*, 100000000*/); +BENCHMARK(BenchTimeMillis/*, 6000000*/); +BENCHMARK(BenchTimeMillisSys/*, 6000000*/); +BENCHMARK(BenchTimeMock/*, 300000000*/); diff --git a/src/governance/governance.cpp b/src/governance/governance.cpp index 949da6b19b1f3..53675979cc43c 100644 --- a/src/governance/governance.cpp +++ b/src/governance/governance.cpp @@ -135,7 +135,7 @@ void CGovernanceManager::ProcessMessage(CNode* pfrom, const std::string& strComm { LOCK(cs_main); - connman.RemoveAskFor(nHash); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_GOVERNANCE_OBJECT, nHash)); } if (pfrom->nVersion < MIN_GOVERNANCE_PEER_PROTO_VERSION) { @@ -210,7 +210,7 @@ void CGovernanceManager::ProcessMessage(CNode* pfrom, const std::string& strComm { LOCK(cs_main); - connman.RemoveAskFor(nHash); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_GOVERNANCE_OBJECT_VOTE, nHash)); } if (pfrom->nVersion < MIN_GOVERNANCE_PEER_PROTO_VERSION) { @@ -1033,8 +1033,8 @@ int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector& // stop early to prevent setAskFor overflow { LOCK(cs_main); - size_t nProjectedSize = pnode->setAskFor.size() + nProjectedVotes; - if (nProjectedSize > SETASKFOR_MAX_SZ / 2) continue; + size_t nProjectedSize = GetRequestedObjectCount(pnode->GetId()) + nProjectedVotes; + if (nProjectedSize > MAX_INV_SZ) continue; // to early to ask the same node if (mapAskedRecently[nHashGovobj].count(pnode->addr)) continue; } diff --git a/src/llmq/quorums_blockprocessor.cpp b/src/llmq/quorums_blockprocessor.cpp index 86ab347521195..c13caa17372ed 100644 --- a/src/llmq/quorums_blockprocessor.cpp +++ b/src/llmq/quorums_blockprocessor.cpp @@ -36,7 +36,7 @@ void CQuorumBlockProcessor::ProcessMessage(CNode* pfrom, const std::string& strC auto hash = ::SerializeHash(qc); { LOCK(cs_main); - connman.RemoveAskFor(hash); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_QUORUM_FINAL_COMMITMENT, hash)); } if (qc.IsNull()) { diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index c43055505edfe..f31ecee175131 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -103,7 +103,7 @@ void CChainLocksHandler::ProcessNewChainLock(NodeId from, const llmq::CChainLock { { LOCK(cs_main); - g_connman->RemoveAskFor(hash); + EraseObjectRequest(from, CInv(MSG_CLSIG, hash)); } { diff --git a/src/llmq/quorums_dkgsessionhandler.cpp b/src/llmq/quorums_dkgsessionhandler.cpp index 45453f12ccfca..ef0369a05bc19 100644 --- a/src/llmq/quorums_dkgsessionhandler.cpp +++ b/src/llmq/quorums_dkgsessionhandler.cpp @@ -18,8 +18,9 @@ namespace llmq { -CDKGPendingMessages::CDKGPendingMessages(size_t _maxMessagesPerNode) : - maxMessagesPerNode(_maxMessagesPerNode) +CDKGPendingMessages::CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) : + maxMessagesPerNode(_maxMessagesPerNode), + invType(_invType) { } @@ -50,7 +51,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv) return; } - g_connman->RemoveAskFor(hash); + EraseObjectRequest(from, CInv(invType, hash)); pendingMessages.emplace_back(std::make_pair(from, std::move(pm))); } @@ -90,10 +91,10 @@ CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, ctp blsWorker(_blsWorker), dkgManager(_dkgManager), curSession(std::make_shared(_params, _blsWorker, _dkgManager)), - pendingContributions((size_t)_params.size * 2), // we allow size*2 messages as we need to make sure we see bad behavior (double messages) - pendingComplaints((size_t)_params.size * 2), - pendingJustifications((size_t)_params.size * 2), - pendingPrematureCommitments((size_t)_params.size * 2) + pendingContributions((size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages) + pendingComplaints((size_t)_params.size * 2, MSG_QUORUM_COMPLAINT), + pendingJustifications((size_t)_params.size * 2, MSG_QUORUM_JUSTIFICATION), + pendingPrematureCommitments((size_t)_params.size * 2, MSG_QUORUM_PREMATURE_COMMITMENT) { phaseHandlerThread = std::thread([this] { RenameThread(strprintf("dash-q-phase-%d", (uint8_t)params.type).c_str()); @@ -416,7 +417,7 @@ std::set BatchVerifyMessageSigs(CDKGSession& session, const std::vector< return ret; } -template +template bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendingMessages, size_t maxCount) { auto msgs = pendingMessages.PopAndDeserializeMessages(maxCount); @@ -443,7 +444,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendi auto hash = ::SerializeHash(msg); { LOCK(cs_main); - g_connman->RemoveAskFor(hash); + EraseObjectRequest(p.first, CInv(MessageType, hash)); } bool ban = false; @@ -540,7 +541,7 @@ void CDKGSessionHandler::HandleDKGRound() curSession->Contribute(pendingContributions); }; auto fContributeWait = [this] { - return ProcessPendingMessageBatch(*curSession, pendingContributions, 8); + return ProcessPendingMessageBatch(*curSession, pendingContributions, 8); }; HandlePhase(QuorumPhase_Contribute, QuorumPhase_Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait); @@ -549,7 +550,7 @@ void CDKGSessionHandler::HandleDKGRound() curSession->VerifyAndComplain(pendingComplaints); }; auto fComplainWait = [this] { - return ProcessPendingMessageBatch(*curSession, pendingComplaints, 8); + return ProcessPendingMessageBatch(*curSession, pendingComplaints, 8); }; HandlePhase(QuorumPhase_Complain, QuorumPhase_Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait); @@ -558,7 +559,7 @@ void CDKGSessionHandler::HandleDKGRound() curSession->VerifyAndJustify(pendingJustifications); }; auto fJustifyWait = [this] { - return ProcessPendingMessageBatch(*curSession, pendingJustifications, 8); + return ProcessPendingMessageBatch(*curSession, pendingJustifications, 8); }; HandlePhase(QuorumPhase_Justify, QuorumPhase_Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait); @@ -567,7 +568,7 @@ void CDKGSessionHandler::HandleDKGRound() curSession->VerifyAndCommit(pendingPrematureCommitments); }; auto fCommitWait = [this] { - return ProcessPendingMessageBatch(*curSession, pendingPrematureCommitments, 8); + return ProcessPendingMessageBatch(*curSession, pendingPrematureCommitments, 8); }; HandlePhase(QuorumPhase_Commit, QuorumPhase_Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait); diff --git a/src/llmq/quorums_dkgsessionhandler.h b/src/llmq/quorums_dkgsessionhandler.h index 5244c6d9ceb0b..7ee399973bd89 100644 --- a/src/llmq/quorums_dkgsessionhandler.h +++ b/src/llmq/quorums_dkgsessionhandler.h @@ -40,13 +40,14 @@ class CDKGPendingMessages private: mutable CCriticalSection cs; + int invType; size_t maxMessagesPerNode; std::list pendingMessages; std::map messagesPerNode; std::set seenMessages; public: - explicit CDKGPendingMessages(size_t _maxMessagesPerNode); + explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType); void PushPendingMessage(NodeId from, CDataStream& vRecv); std::list PopPendingMessages(size_t maxCount); diff --git a/src/llmq/quorums_instantsend.cpp b/src/llmq/quorums_instantsend.cpp index 355650517058a..f245874f1091e 100644 --- a/src/llmq/quorums_instantsend.cpp +++ b/src/llmq/quorums_instantsend.cpp @@ -902,7 +902,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has { { LOCK(cs_main); - g_connman->RemoveAskFor(hash); + EraseObjectRequest(from, CInv(MSG_ISLOCK, hash)); } CTransactionRef tx; @@ -1367,7 +1367,7 @@ void CInstantSendManager::AskNodesForLockedTx(const uint256& txid) txid.ToString(), pnode->GetId()); CInv inv(MSG_TX, txid); - pnode->AskFor(inv); + RequestObject(pnode->GetId(), inv, GetTime(), true); } } for (CNode* pnode : nodesToAskFor) { diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index 1d28ee61f2ac5..cc95268d50f49 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -684,7 +684,7 @@ void CSigningManager::ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& re { LOCK(cs_main); - connman.RemoveAskFor(recoveredSig.GetHash()); + EraseObjectRequest(nodeId, CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig.GetHash())); } if (db.HasRecoveredSigForHash(recoveredSig.GetHash())) { diff --git a/src/net.cpp b/src/net.cpp index 9a49b275f5bef..f07c177263181 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -105,8 +105,6 @@ std::map mapLocalHost; static bool vfLimited[NET_MAX] = {}; std::string strSubVersion; -unordered_limitedmap mapAlreadyAskedFor(MAX_INV_SZ, MAX_INV_SZ * 2); - void CConnman::AddOneShot(const std::string& strDest) { LOCK(cs_vOneShots); @@ -3175,16 +3173,6 @@ void CConnman::RelayInvFiltered(CInv &inv, const uint256& relatedTxHash, const i } } -void CConnman::RemoveAskFor(const uint256& hash) -{ - mapAlreadyAskedFor.erase(hash); - - LOCK(cs_vNodes); - for (const auto& pnode : vNodes) { - pnode->RemoveAskFor(hash); - } -} - void CConnman::RecordBytesRecv(uint64_t bytes) { LOCK(cs_totalBytesRecv); @@ -3390,62 +3378,6 @@ CNode::~CNode() CloseSocket(hSocket); } -void CNode::AskFor(const CInv& inv, int64_t doubleRequestDelay) -{ - if (queueAskFor.size() > MAPASKFOR_MAX_SZ || setAskFor.size() > SETASKFOR_MAX_SZ) { - int64_t nNow = GetTime(); - if(nNow - nLastWarningTime > WARNING_INTERVAL) { - LogPrintf("CNode::AskFor -- WARNING: inventory message dropped: vecAskFor.size = %d, setAskFor.size = %d, MAPASKFOR_MAX_SZ = %d, SETASKFOR_MAX_SZ = %d, nSkipped = %d, peer=%d\n", - queueAskFor.size(), setAskFor.size(), MAPASKFOR_MAX_SZ, SETASKFOR_MAX_SZ, nNumWarningsSkipped, id); - nLastWarningTime = nNow; - nNumWarningsSkipped = 0; - } - else { - ++nNumWarningsSkipped; - } - return; - } - // a peer may not have multiple non-responded queue positions for a single inv item - if (!setAskFor.emplace(inv.hash).second) - return; - - // We're using queueAskFor as a priority queue, - // the key is the earliest time the request can be sent - int64_t nRequestTime; - auto it = mapAlreadyAskedFor.find(inv.hash); - if (it != mapAlreadyAskedFor.end()) - nRequestTime = it->second; - else - nRequestTime = 0; - - LogPrint(BCLog::NET, "askfor %s %d (%s) peer=%d\n", inv.ToString(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000), id); - - // Make sure not to reuse time indexes to keep things in the same order - int64_t nNow = GetTimeMicros() - 1000000; - static int64_t nLastTime; - ++nLastTime; - nNow = std::max(nNow, nLastTime); - nLastTime = nNow; - - // Each retry is 2 minutes after the last - nRequestTime = std::max(nRequestTime + doubleRequestDelay, nNow); - if (it != mapAlreadyAskedFor.end()) - mapAlreadyAskedFor.update(it, nRequestTime); - else - mapAlreadyAskedFor.insert(std::make_pair(inv.hash, nRequestTime)); - - queueAskFor.emplace(nRequestTime, inv); - setAskForInQueue.emplace(inv.hash); -} - -void CNode::RemoveAskFor(const uint256& hash) -{ - setAskFor.erase(hash); - // we don't really remove it from queueAskFor as it would be too expensive to rebuild the heap - // instead, we're ignoring the entry later as it won't be found in setAskForInQueue anymore - setAskForInQueue.erase(hash); -} - bool CConnman::NodeFullyConnected(const CNode* pnode) { return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; diff --git a/src/net.h b/src/net.h index 90c23b6300dd6..8c6fb5391c66c 100644 --- a/src/net.h +++ b/src/net.h @@ -90,10 +90,6 @@ static const bool DEFAULT_UPNP = USE_UPNP; #else static const bool DEFAULT_UPNP = false; #endif -/** The maximum number of entries in mapAskFor */ -static const size_t MAPASKFOR_MAX_SZ = MAX_INV_SZ; -/** The maximum number of entries in setAskFor (larger due to getdata latency)*/ -static const size_t SETASKFOR_MAX_SZ = 2 * MAX_INV_SZ; /** The maximum number of peer connections to maintain. * Masternodes are forced to accept at least this many connections */ @@ -355,7 +351,6 @@ class CConnman void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx, const int minProtoVersion = MIN_PEER_PROTO_VERSION, bool fAllowMasternodeConnections = false); // This overload will not update node filters, so use it only for the cases when other messages will update related transaction data in filters void RelayInvFiltered(CInv &inv, const uint256 &relatedTxHash, const int minProtoVersion = MIN_PEER_PROTO_VERSION, bool fAllowMasternodeConnections = false); - void RemoveAskFor(const uint256& hash); // Addrman functions size_t GetAddressCount() const; @@ -667,8 +662,6 @@ extern bool fDiscover; extern bool fListen; extern bool fRelayTxes; -extern unordered_limitedmap mapAlreadyAskedFor; - /** Subversion as sent to the P2P network in `version` messages */ extern std::string strSubVersion; @@ -864,9 +857,6 @@ class CNode // List of non-tx/non-block inventory items std::vector vInventoryOtherToSend; CCriticalSection cs_inventory; - std::unordered_set setAskFor; - std::unordered_set setAskForInQueue; - std::priority_queue, std::vector>, std::greater<>> queueAskFor; int64_t nNextInvSend; // Used for headers announcements - unfiltered blocks to relay // Also protected by cs_inventory @@ -1041,9 +1031,6 @@ class CNode vBlockHashesToAnnounce.push_back(hash); } - void AskFor(const CInv& inv, int64_t doubleRequestDelay = 2 * 60 * 1000000); - void RemoveAskFor(const uint256& hash); - void CloseSocketDisconnect(); void copyStats(CNodeStats &stats); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 4e0d7322cb046..2bbcf1d0ae57f 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -57,6 +57,23 @@ # error "Dash Core cannot be compiled without assertions." #endif +/** Maximum number of in-flight objects from a peer */ +static constexpr int32_t MAX_PEER_OBJECT_IN_FLIGHT = 100; +/** Maximum number of announced objects from a peer */ +static constexpr int32_t MAX_PEER_OBJECT_ANNOUNCEMENTS = 2 * MAX_INV_SZ; +/** How many microseconds to delay requesting transactions from inbound peers */ +static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}}; +/** How long to wait (in microseconds) before downloading a transaction from an additional peer */ +static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}}; +/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ +static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}}; +/** How long to wait (expiry * factor microseconds) before expiring an in-flight getdata request to a peer */ +static constexpr int64_t TX_EXPIRY_INTERVAL_FACTOR = 10; +static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, +"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); +/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ +static const unsigned int MAX_GETDATA_SZ = 1000; + std::atomic nTimeBestReceived(0); // Used only to inform the wallet of when we last received a block struct IteratorComparator @@ -254,6 +271,69 @@ struct CNodeState { //! Time of last new block announcement int64_t m_last_block_announcement; + /* + * State associated with objects download. + * + * Tx download algorithm: + * + * When inv comes in, queue up (process_time, inv) inside the peer's + * CNodeState (m_object_process_time) as long as m_object_announced for the peer + * isn't too big (MAX_PEER_OBJECT_ANNOUNCEMENTS). + * + * The process_time for a objects is set to nNow for outbound peers, + * nNow + 2 seconds for inbound peers. This is the time at which we'll + * consider trying to request the objects from the peer in + * SendMessages(). The delay for inbound peers is to allow outbound peers + * a chance to announce before we request from inbound peers, to prevent + * an adversary from using inbound connections to blind us to a + * objects (InvBlock). + * + * When we call SendMessages() for a given peer, + * we will loop over the objects in m_object_process_time, looking + * at the objects whose process_time <= nNow. We'll request each + * such objects that we don't have already and that hasn't been + * requested from another peer recently, up until we hit the + * MAX_PEER_OBJECT_IN_FLIGHT limit for the peer. Then we'll update + * g_already_asked_for for each requested inv, storing the time of the + * GETDATA request. We use g_already_asked_for to coordinate objects + * requests amongst our peers. + * + * For objects that we still need but we have already recently + * requested from some other peer, we'll reinsert (process_time, inv) + * back into the peer's m_object_process_time at the point in the future at + * which the most recent GETDATA request would time out (ie + * GetObjectInterval + the request time stored in g_already_asked_for). + * We add an additional delay for inbound peers, again to prefer + * attempting download from outbound peers first. + * We also add an extra small random delay up to 2 seconds + * to avoid biasing some peers over others. (e.g., due to fixed ordering + * of peer processing in ThreadMessageHandler). + * + * When we receive a objects from a peer, we remove the inv from the + * peer's m_object_in_flight set and from their recently announced set + * (m_object_announced). We also clear g_already_asked_for for that entry, so + * that if somehow the objects is not accepted but also not added to + * the reject filter, then we will eventually redownload from other + * peers. + */ + struct ObjectDownloadState { + /* Track when to attempt download of announced objects (process + * time in micros -> inv) + */ + std::multimap m_object_process_time; + + //! Store all the objects a peer has recently announced + std::set m_object_announced; + + //! Store objects which were requested by us, with timestamp + std::map m_object_in_flight; + + //! Periodically check for stuck getdata requests + std::chrono::microseconds m_check_expiry_timer{0}; + }; + + ObjectDownloadState m_object_download; + CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) { fCurrentlyConnected = false; nMisbehavior = 0; @@ -279,6 +359,10 @@ struct CNodeState { } }; +// Keeps track of the time (in microseconds) when transactions were requested last time +unordered_limitedmap g_already_asked_for(MAX_INV_SZ, MAX_INV_SZ * 2); +unordered_limitedmap g_erased_object_requests(MAX_INV_SZ, MAX_INV_SZ * 2); + /** Map maintaining per-node state. Requires cs_main. */ std::map mapNodeState; @@ -571,9 +655,149 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector())); + + if (nodestate) { + nodestate->m_object_download.m_object_announced.erase(inv); + nodestate->m_object_download.m_object_in_flight.erase(inv); + } +} + +void EraseObjectRequest(NodeId nodeId, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + auto* state = State(nodeId); + if (!state) { + return; + } + EraseObjectRequest(state, inv); +} + +std::chrono::microseconds GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + auto it = g_already_asked_for.find(hash); + if (it != g_already_asked_for.end()) { + return it->second; + } + return {}; +} + +void UpdateObjectRequestTime(const uint256& hash, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + auto it = g_already_asked_for.find(hash); + if (it == g_already_asked_for.end()) { + g_already_asked_for.insert(std::make_pair(hash, request_time)); + } else { + g_already_asked_for.update(it, request_time); + } +} + +std::chrono::microseconds GetObjectInterval(int invType) +{ + // some messages need to be re-requested faster when the first announcing peer did not answer to GETDATA + switch(invType) + { + case MSG_QUORUM_RECOVERED_SIG: + return std::chrono::seconds{15}; + case MSG_CLSIG: + return std::chrono::seconds{5}; + case MSG_ISLOCK: + return std::chrono::seconds{10}; + default: + return GETDATA_TX_INTERVAL; + } +} + +std::chrono::microseconds GetObjectExpiryInterval(int invType) +{ + return GetObjectInterval(invType) * TX_EXPIRY_INTERVAL_FACTOR; +} + +std::chrono::microseconds GetObjectRandomDelay(int invType) +{ + if (invType == MSG_TX) { + return GetRandMicros(MAX_GETDATA_RANDOM_DELAY); + } + return {}; +} + +std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chrono::microseconds current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + std::chrono::microseconds process_time; + const auto last_request_time = GetObjectRequestTime(inv.hash); + // First time requesting this tx + if (last_request_time.count() == 0) { + process_time = current_time; + } else { + // Randomize the delay to avoid biasing some peers over others (such as due to + // fixed ordering of peer processing in ThreadMessageHandler) + process_time = last_request_time + GetObjectInterval(inv.type) + GetObjectRandomDelay(inv.type); + } + + // We delay processing announcements from inbound peers + if (inv.type == MSG_TX && !fMasternodeMode && use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY; + + return process_time; +} + +void RequestObject(CNodeState* state, const CInv& inv, std::chrono::microseconds current_time, bool fForce = false) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + CNodeState::ObjectDownloadState& peer_download_state = state->m_object_download; + if (peer_download_state.m_object_announced.size() >= MAX_PEER_OBJECT_ANNOUNCEMENTS || + peer_download_state.m_object_process_time.size() >= MAX_PEER_OBJECT_ANNOUNCEMENTS || + peer_download_state.m_object_announced.count(inv)) { + // Too many queued announcements from this peer, or we already have + // this announcement + return; + } + peer_download_state.m_object_announced.insert(inv); + + // Calculate the time to try requesting this transaction. Use + // fPreferredDownload as a proxy for outbound peers. + std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, !state->fPreferredDownload); + + peer_download_state.m_object_process_time.emplace(process_time, inv); + + if (fForce) { + // make sure this object is actually requested ASAP + g_erased_object_requests.erase(inv.hash); + g_already_asked_for.erase(inv.hash); + } + + LogPrint(BCLog::NET, "%s -- inv=(%s), current_time=%d, process_time=%d, delta=%d\n", __func__, inv.ToString(), current_time.count(), process_time.count(), (process_time - current_time).count()); +} + +void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds current_time, bool fForce) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + auto* state = State(nodeId); + if (!state) { + return; + } + RequestObject(state, inv, current_time, fForce); +} + +size_t GetRequestedObjectCount(NodeId nodeId) +{ + AssertLockHeld(cs_main); + auto* state = State(nodeId); + if (!state) { + return 0; + } + return state->m_object_download.m_object_process_time.size(); +} + // This function is used for testing the stale tip eviction logic, see // DoS_tests.cpp void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) @@ -1450,12 +1674,19 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam if (!vNotFound.empty()) { // Let the peer know that we didn't find what it asked for, so it doesn't - // have to wait around forever. Currently only SPV clients actually care - // about this message: it's needed when they are recursively walking the - // dependencies of relevant unconfirmed transactions. SPV clients want to - // do that because they want to know about (and store and rebroadcast and - // risk analyze) the dependencies of transactions relevant to them, without - // having to download the entire memory pool. + // have to wait around forever. + // SPV clients care about this message: it's needed when they are + // recursively walking the dependencies of relevant unconfirmed + // transactions. SPV clients want to do that because they want to know + // about (and store and rebroadcast and risk analyze) the dependencies + // of transactions relevant to them, without having to download the + // entire memory pool. + // Also, other nodes can use these messages to automatically request a + // transaction from some other peer that annnounced it, and stop + // waiting for us to respond. + // In normal operation, we often send NOTFOUND messages for parents of + // transactions that we relay; if a peer is missing a parent, they may + // assume we have them and request the parents from us. connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::NOTFOUND, vNotFound)); } } @@ -2189,6 +2420,8 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr LOCK(cs_main); + const auto current_time = GetTime(); + for (CInv &inv : vInv) { if(!inv.IsKnownType()) { @@ -2244,20 +2477,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr } else if (!fAlreadyHave) { bool allowWhileInIBD = allowWhileInIBDObjs.count(inv.type); if (allowWhileInIBD || (!fImporting && !fReindex && !IsInitialBlockDownload())) { - int64_t doubleRequestDelay = 2 * 60 * 1000000; - // some messages need to be re-requested faster when the first announcing peer did not answer to GETDATA - switch (inv.type) { - case MSG_QUORUM_RECOVERED_SIG: - doubleRequestDelay = 15 * 1000000; - break; - case MSG_CLSIG: - doubleRequestDelay = 5 * 1000000; - break; - case MSG_ISLOCK: - doubleRequestDelay = 10 * 1000000; - break; - } - pfrom->AskFor(inv, doubleRequestDelay); + RequestObject(State(pfrom->GetId()), inv, current_time); } } } @@ -2489,10 +2709,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr CInv inv(nInvType, tx.GetHash()); pfrom->AddInventoryKnown(inv); - { - LOCK(cs_main); - connman->RemoveAskFor(inv.hash); - } // Process custom logic, no matter if tx will be accepted to mempool later or not if (nInvType == MSG_DSTX) { @@ -2546,6 +2762,8 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr bool fMissingInputs = false; CValidationState state; + EraseObjectRequest(pfrom->GetId(), inv); + if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, ptx, &fMissingInputs /* pfMissingInputs */, false /* bypass_limits */, 0 /* nAbsurdFee */)) { // Process custom txes, this changes AlreadyHave to "true" @@ -2587,14 +2805,16 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr } } if (!fRejectedParents) { + const auto current_time = GetTime(); + for (const CTxIn& txin : tx.vin) { CInv _inv(MSG_TX, txin.prevout.hash); pfrom->AddInventoryKnown(_inv); - if (!AlreadyHave(_inv)) pfrom->AskFor(_inv); + if (!AlreadyHave(_inv)) RequestObject(State(pfrom->GetId()), _inv, current_time); // We don't know if the previous tx was a regular or a mixing one, try both CInv _inv2(MSG_DSTX, txin.prevout.hash); pfrom->AddInventoryKnown(_inv2); - if (!AlreadyHave(_inv2)) pfrom->AskFor(_inv2); + if (!AlreadyHave(_inv2)) RequestObject(State(pfrom->GetId()), _inv2, current_time); } AddOrphanTx(ptx, pfrom->GetId()); @@ -3203,8 +3423,27 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr if (strCommand == NetMsgType::NOTFOUND) { - // We do not care about the NOTFOUND message, but logging an Unknown Command - // message would be undesirable as we transmit it ourselves. + // Remove the NOTFOUND transactions from the peer + LOCK(cs_main); + CNodeState *state = State(pfrom->GetId()); + std::vector vInv; + vRecv >> vInv; + if (vInv.size() <= MAX_PEER_OBJECT_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + for (CInv &inv : vInv) { + if (inv.IsKnownType()) { + // If we receive a NOTFOUND message for a txid we requested, erase + // it from our data structures for this peer. + auto in_flight_it = state->m_object_download.m_object_in_flight.find(inv); + if (in_flight_it == state->m_object_download.m_object_in_flight.end()) { + // Skip any further work if this is a spurious NOTFOUND + // message. + continue; + } + state->m_object_download.m_object_in_flight.erase(in_flight_it); + state->m_object_download.m_object_announced.erase(inv); + } + } + } return true; } @@ -3970,6 +4209,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); // Detect whether we're stalling + const auto current_time = GetTime(); + // nNow is the current system time (GetTimeMicros is not mockable) and + // should be replaced by the mockable current_time eventually nNow = GetTimeMicros(); if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) { // Stalling only triggers when the block download window cannot move. During normal steady state, @@ -4055,33 +4297,71 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // // Message: getdata (non-blocks) // - while (!pto->queueAskFor.empty() && pto->queueAskFor.top().first <= nNow) - { - const CInv& inv = pto->queueAskFor.top().second; - auto jt = pto->setAskForInQueue.find(inv.hash); - if (jt == pto->setAskForInQueue.end()) { - pto->queueAskFor.pop(); + + // For robustness, expire old requests after a long timeout, so that + // we can resume downloading objects from a peer even if they + // were unresponsive in the past. + // Eventually we should consider disconnecting peers, but this is + // conservative. + if (state.m_object_download.m_check_expiry_timer <= current_time) { + for (auto it=state.m_object_download.m_object_in_flight.begin(); it != state.m_object_download.m_object_in_flight.end();) { + if (it->second <= current_time - GetObjectExpiryInterval(it->first.type)) { + LogPrint(BCLog::NET, "timeout of inflight object %s from peer=%d\n", it->first.ToString(), pto->GetId()); + state.m_object_download.m_object_announced.erase(it->first); + state.m_object_download.m_object_in_flight.erase(it++); + } else { + ++it; + } + } + // On average, we do this check every GetObjectExpiryInterval. Randomize + // so that we're not doing this for all peers at the same time. + state.m_object_download.m_check_expiry_timer = current_time + GetObjectExpiryInterval(MSG_TX)/2 + GetRandMicros(GetObjectExpiryInterval(MSG_TX)); + } + + // DASH this code also handles non-TXs (Dash specific messages) + auto& object_process_time = state.m_object_download.m_object_process_time; + while (!object_process_time.empty() && object_process_time.begin()->first <= current_time && state.m_object_download.m_object_in_flight.size() < MAX_PEER_OBJECT_IN_FLIGHT) { + const CInv inv = object_process_time.begin()->second; + // Erase this entry from object_process_time (it may be added back for + // processing at a later time, see below) + object_process_time.erase(object_process_time.begin()); + if (g_erased_object_requests.count(inv.hash)) { + LogPrint(BCLog::NET, "%s -- GETDATA skipping inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); + state.m_object_download.m_object_announced.erase(inv); + state.m_object_download.m_object_in_flight.erase(inv); continue; } - - if (!AlreadyHave(inv)) - { - LogPrint(BCLog::NET, "SendMessages -- GETDATA -- requesting inv = %s peer=%d\n", inv.ToString(), pto->GetId()); - vGetData.push_back(inv); - if (vGetData.size() >= 1000) - { - connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); - LogPrint(BCLog::NET, "SendMessages -- GETDATA -- pushed size = %lu peer=%d\n", vGetData.size(), pto->GetId()); - vGetData.clear(); + if (!AlreadyHave(inv)) { + // If this object was last requested more than GetObjectInterval ago, + // then request. + const auto last_request_time = GetObjectRequestTime(inv.hash); + if (last_request_time <= current_time - GetObjectInterval(inv.type)) { + LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); + vGetData.push_back(inv); + if (vGetData.size() >= MAX_GETDATA_SZ) { + connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + vGetData.clear(); + } + UpdateObjectRequestTime(inv.hash, current_time); + state.m_object_download.m_object_in_flight.emplace(inv, current_time); + } else { + // This object is in flight from someone else; queue + // up processing to happen after the download times out + // (with a slight delay for inbound peers, to prefer + // requests to outbound peers). + const auto next_process_time = CalculateObjectGetDataTime(inv, current_time, !state.fPreferredDownload); + object_process_time.emplace(next_process_time, inv); + LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time.count(), (next_process_time - current_time).count(), pto->GetId()); } } else { - //If we're not going to ask, don't expect a response. - LogPrint(BCLog::NET, "SendMessages -- GETDATA -- already have inv = %s peer=%d\n", inv.ToString(), pto->GetId()); - pto->setAskFor.erase(inv.hash); + // We have already seen this object, no need to download. + state.m_object_download.m_object_announced.erase(inv); + state.m_object_download.m_object_in_flight.erase(inv); + LogPrint(BCLog::NET, "%s -- GETDATA already seen inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); } - pto->queueAskFor.pop(); - pto->setAskForInQueue.erase(jt); } + + if (!vGetData.empty()) { connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); LogPrint(BCLog::NET, "SendMessages -- GETDATA -- pushed size = %lu peer=%d\n", vGetData.size(), pto->GetId()); diff --git a/src/net_processing.h b/src/net_processing.h index 667bf913b548e..1c070160a3e82 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -84,4 +84,8 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats); void Misbehaving(NodeId nodeid, int howmuch, const std::string& message=""); bool IsBanned(NodeId nodeid); +void EraseObjectRequest(NodeId nodeId, const CInv& inv); +void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds current_time, bool fForce=false); +size_t GetRequestedObjectCount(NodeId nodeId); + #endif // BITCOIN_NET_PROCESSING_H diff --git a/src/random.cpp b/src/random.cpp index 0a354adc5888f..b562df6dc1ad8 100644 --- a/src/random.cpp +++ b/src/random.cpp @@ -364,6 +364,11 @@ uint64_t GetRand(uint64_t nMax) return (nRand % nMax); } +std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept +{ + return std::chrono::microseconds{GetRand(duration_max.count())}; +} + int GetRandInt(int nMax) { return GetRand(nMax); diff --git a/src/random.h b/src/random.h index ff009047a1c4d..51c1568d9e879 100644 --- a/src/random.h +++ b/src/random.h @@ -10,7 +10,8 @@ #include #include -#include +#include // For std::chrono::microseconds +#include /* Seed OpenSSL PRNG with additional entropy data */ void RandAddSeed(); @@ -20,6 +21,7 @@ void RandAddSeed(); */ void GetRandBytes(unsigned char* buf, int num); uint64_t GetRand(uint64_t nMax); +std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept; int GetRandInt(int nMax); uint256 GetRandHash(); diff --git a/src/spork.cpp b/src/spork.cpp index acbb1d5fb821a..f1c65133d2c21 100644 --- a/src/spork.cpp +++ b/src/spork.cpp @@ -131,7 +131,7 @@ void CSporkManager::ProcessSpork(CNode* pfrom, const std::string& strCommand, CD std::string strLogMsg; { LOCK(cs_main); - connman.RemoveAskFor(hash); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_SPORK, hash)); if(!chainActive.Tip()) return; strLogMsg = strprintf("SPORK -- hash: %s id: %d value: %10d bestHeight: %d peer=%d", hash.ToString(), spork.nSporkID, spork.nValue, chainActive.Height(), pfrom->GetId()); } diff --git a/src/test/util_tests.cpp b/src/test/util_tests.cpp index 1254391e01818..f7726420e2ee4 100644 --- a/src/test/util_tests.cpp +++ b/src/test/util_tests.cpp @@ -354,6 +354,27 @@ BOOST_AUTO_TEST_CASE(gettime) BOOST_CHECK((GetTime() & ~0xFFFFFFFFLL) == 0); } +BOOST_AUTO_TEST_CASE(util_time_GetTime) +{ + SetMockTime(111); + // Check that mock time does not change after a sleep + for (const auto& num_sleep : {0, 1}) { + MilliSleep(num_sleep); + BOOST_CHECK_EQUAL(111, GetTime()); // Deprecated time getter + BOOST_CHECK_EQUAL(111, GetTime().count()); + BOOST_CHECK_EQUAL(111000, GetTime().count()); + BOOST_CHECK_EQUAL(111000000, GetTime().count()); + } + + SetMockTime(0); + // Check that system time changes after a sleep + const auto ms_0 = GetTime(); + const auto us_0 = GetTime(); + MilliSleep(1); + BOOST_CHECK(ms_0 < GetTime()); + BOOST_CHECK(us_0 < GetTime()); +} + BOOST_AUTO_TEST_CASE(test_ParseInt32) { int32_t n; diff --git a/src/utiltime.cpp b/src/utiltime.cpp index 735495dca0fd4..1027a69459e87 100644 --- a/src/utiltime.cpp +++ b/src/utiltime.cpp @@ -1,5 +1,5 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto -// Copyright (c) 2009-2015 The Bitcoin Core developers +// Copyright (c) 2009-2019 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -29,6 +29,20 @@ int64_t GetTime() return now; } +template +T GetTime() +{ + const std::chrono::seconds mocktime{nMockTime.load(std::memory_order_relaxed)}; + + return std::chrono::duration_cast( + mocktime.count() ? + mocktime : + std::chrono::microseconds{GetTimeMicros()}); +} +template std::chrono::seconds GetTime(); +template std::chrono::milliseconds GetTime(); +template std::chrono::microseconds GetTime(); + void SetMockTime(int64_t nMockTimeIn) { nMockTime.store(nMockTimeIn, std::memory_order_relaxed); diff --git a/src/utiltime.h b/src/utiltime.h index 8ae8540b89c70..b5e6b1d39f772 100644 --- a/src/utiltime.h +++ b/src/utiltime.h @@ -1,5 +1,5 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto -// Copyright (c) 2009-2015 The Bitcoin Core developers +// Copyright (c) 2009-2019 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -8,25 +8,32 @@ #include #include +#include /** - * GetTimeMicros() and GetTimeMillis() both return the system time, but in - * different units. GetTime() returns the system time in seconds, but also - * supports mocktime, where the time can be specified by the user, eg for - * testing (eg with the setmocktime rpc, or -mocktime argument). - * - * TODO: Rework these functions to be type-safe (so that we don't inadvertently - * compare numbers with different units, or compare a mocktime to system time). + * DEPRECATED + * Use either GetSystemTimeInSeconds (not mockable) or GetTime (mockable) */ - int64_t GetTime(); + +/** Returns the system time (not mockable) */ int64_t GetTimeMillis(); +/** Returns the system time (not mockable) */ int64_t GetTimeMicros(); +/** Returns the system time (not mockable) */ int64_t GetSystemTimeInSeconds(); // Like GetTime(), but not mockable + +/** For testing. Set e.g. with the setmocktime rpc, or -mocktime argument */ void SetMockTime(int64_t nMockTimeIn); +/** For testing */ int64_t GetMockTime(); + void MilliSleep(int64_t n); +/** Return system time (or mocked time, if set) */ +template +T GetTime(); + std::string DateTimeStrFormat(const char* pszFormat, int64_t nTime); #endif // BITCOIN_UTILTIME_H diff --git a/test/functional/p2p-instantsend.py b/test/functional/p2p-instantsend.py index 02f60115b978a..c8f3815dc6b60 100755 --- a/test/functional/p2p-instantsend.py +++ b/test/functional/p2p-instantsend.py @@ -57,7 +57,7 @@ def test_block_doublespend(self): # wait for the transaction to propagate connected_nodes = self.nodes.copy() del connected_nodes[self.isolated_idx] - sync_mempools(connected_nodes) + sync_mempools(connected_nodes, wait=0.1, wait_func=lambda: self.bump_mocktime(3, True)) for node in connected_nodes: self.wait_for_instantlock(is_id, node) # send doublespend transaction to isolated node @@ -119,7 +119,7 @@ def test_mempool_doublespend(self): receiver_addr = receiver.getnewaddress() is_id = sender.sendtoaddress(receiver_addr, 0.9) # wait for the transaction to propagate - sync_mempools(self.nodes) + sync_mempools(self.nodes, wait=0.1, wait_func=lambda: self.bump_mocktime(3, True)) for node in self.nodes: self.wait_for_instantlock(is_id, node) assert_raises_rpc_error(-5, "No such mempool or blockchain transaction", isolated.getrawtransaction, dblspnd_txid) diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index 94e37adc9ff24..30ae7979c2053 100755 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -352,13 +352,15 @@ def sync_all(self, node_groups=None): for group in node_groups: sync_blocks(group) - sync_mempools(group) + sync_mempools(group, wait=0.1, wait_func=lambda: self.bump_mocktime(3, True)) def disable_mocktime(self): self.mocktime = 0 - def bump_mocktime(self, t): + def bump_mocktime(self, t, update_nodes=False): self.mocktime += t + if update_nodes: + set_node_times(self.nodes, self.mocktime) def set_cache_mocktime(self): # For backwared compatibility of the python scripts diff --git a/test/functional/test_framework/util.py b/test/functional/test_framework/util.py index 2f5cfba790cc0..7ddc9a18234b8 100644 --- a/test/functional/test_framework/util.py +++ b/test/functional/test_framework/util.py @@ -435,7 +435,7 @@ def sync_chain(rpc_connections, *, wait=1, timeout=60): timeout -= wait raise AssertionError("Chain sync failed: Best block hashes don't match") -def sync_mempools(rpc_connections, *, wait=1, timeout=60): +def sync_mempools(rpc_connections, *, wait=1, timeout=60, wait_func=None): """ Wait until everybody has the same transactions in their memory pools @@ -448,6 +448,8 @@ def sync_mempools(rpc_connections, *, wait=1, timeout=60): num_match = num_match + 1 if num_match == len(rpc_connections): return + if wait_func is not None: + wait_func() time.sleep(wait) timeout -= wait raise AssertionError("Mempool sync failed") diff --git a/test/functional/walletbackup.py b/test/functional/walletbackup.py index 6c79a0eb1a2fd..8798c92fe8965 100755 --- a/test/functional/walletbackup.py +++ b/test/functional/walletbackup.py @@ -70,7 +70,7 @@ def do_one_round(self): # Have the miner (node3) mine a block. # Must sync mempools before mining. - sync_mempools(self.nodes) + sync_mempools(self.nodes, wait=0.1, wait_func=lambda: self.bump_mocktime(3, True)) self.nodes[3].generate(1) sync_blocks(self.nodes)