From 3f419029837d670722a898627f84cddeeee0d39c Mon Sep 17 00:00:00 2001 From: Anthony Towns Date: Mon, 3 Feb 2020 21:04:30 +1000 Subject: [PATCH] net_processing: Retry notfounds with more urgency Anytime we see a NOTFOUND in response to a request for a tx, look through each of our peers for anyone else who announced the tx, find one who doesn't already have its inflight tx count maxed out, and of those, make the one who'd look at it first, look at it asap. Github-Pull: #18238 Rebased-From: a204d1586ca9e8f2d7b3951358fa5e63faa1810b --- src/net_processing.cpp | 45 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 34d349e8e9633..1812b9cffc400 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -73,6 +73,8 @@ static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::se static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}}; /** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}}; +/** Delay between receiving a NOTFOUND and trying the next peer. */ +static constexpr std::chrono::microseconds MAX_NOTFOUND_RETRY_RANDOM_DELAY{std::chrono::seconds{2}}; /** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */ static constexpr std::chrono::microseconds TX_EXPIRY_INTERVAL{GETDATA_TX_INTERVAL * 10}; static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, @@ -342,8 +344,10 @@ struct CNodeState { */ std::multimap m_tx_process_time; - //! Store all the transactions a peer has recently announced - std::set m_tx_announced; + /* Store all the transactions a peer has recently announced, + * along with their process time + */ + std::map m_tx_announced; //! Store transactions which were requested by us, with timestamp std::map m_tx_in_flight; @@ -729,6 +733,37 @@ std::chrono::microseconds CalculateTxGetDataTime(const uint256& txid, std::chron return process_time; } +static void RetryProcessTx(CConnman& connman, const uint256& txid, const std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + CNodeState::TxDownloadState* best_d = nullptr; + std::chrono::microseconds best; + + for (auto& el : mapNodeState) { + CNodeState::TxDownloadState* d = &el.second.m_tx_download; + if (d->m_tx_in_flight.size() >= MAX_PEER_TX_IN_FLIGHT) continue; + auto it = d->m_tx_announced.find(txid); + if (it != d->m_tx_announced.end()) { + if (best_d == nullptr || (it->second != std::chrono::microseconds::zero() && it->second < best)) { + best_d = d; + best = it->second; + } + } + } + + std::chrono::microseconds process_time = current_time + GetRandMicros(MAX_NOTFOUND_RETRY_RANDOM_DELAY); + if (best_d != nullptr && process_time < best) { + auto end = best_d->m_tx_process_time.end(); + for (auto it = best_d->m_tx_process_time.lower_bound(best); it != end && it->first == best; ++it) { + if (it->second == txid) { + best_d->m_tx_process_time.erase(it); + best_d->m_tx_announced[txid] = process_time; + best_d->m_tx_process_time.emplace(process_time, txid); + break; + } + } + } +} + void RequestTx(CNodeState* state, const uint256& txid, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; @@ -739,12 +774,12 @@ void RequestTx(CNodeState* state, const uint256& txid, std::chrono::microseconds // this announcement return; } - peer_download_state.m_tx_announced.insert(txid); // Calculate the time to try requesting this transaction. Use // fPreferredDownload as a proxy for outbound peers. const auto process_time = CalculateTxGetDataTime(txid, current_time, !state->fPreferredDownload); + peer_download_state.m_tx_announced.emplace(txid, process_time); peer_download_state.m_tx_process_time.emplace(process_time, txid); } @@ -3215,6 +3250,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr std::vector vInv; vRecv >> vInv; if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + const auto current_time = GetTime(); for (CInv &inv : vInv) { if (inv.type == MSG_TX || inv.type == MSG_WITNESS_TX) { // If we receive a NOTFOUND message for a txid we requested, erase @@ -3227,6 +3263,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr } state->m_tx_download.m_tx_in_flight.erase(in_flight_it); state->m_tx_download.m_tx_announced.erase(inv.hash); + RetryProcessTx(*connman, inv.hash, current_time); } } } @@ -4063,6 +4100,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) // Erase this entry from tx_process_time (it may be added back for // processing at a later time, see below) tx_process_time.erase(tx_process_time.begin()); + state.m_tx_download.m_tx_announced[txid] = std::chrono::microseconds::zero(); CInv inv(MSG_TX | GetFetchFlags(pto), txid); if (!AlreadyHave(inv)) { // If this transaction was last requested more than 1 minute ago, @@ -4084,6 +4122,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) // requests to outbound peers). const auto next_process_time = CalculateTxGetDataTime(txid, current_time, !state.fPreferredDownload); tx_process_time.emplace(next_process_time, txid); + state.m_tx_download.m_tx_announced[txid] = next_process_time; } } else { // We have already seen this transaction, no need to download.