From fa7fea3654203bf7e7bd504589dd564af7fc749d Mon Sep 17 00:00:00 2001 From: MarcoFalke Date: Fri, 24 Jan 2020 14:00:57 -0500 Subject: [PATCH] refactor: Remove mempool global from net This refactor does two things: * Pass mempool in to PeerLogicValidation * Pass m_mempool around where needed --- src/init.cpp | 11 +++--- src/net_processing.cpp | 57 ++++++++++++++++-------------- src/net_processing.h | 9 +++-- src/test/denialofservice_tests.cpp | 10 +++--- src/test/fuzz/process_message.cpp | 4 +-- src/test/util/setup_common.cpp | 2 +- 6 files changed, 49 insertions(+), 44 deletions(-) diff --git a/src/init.cpp b/src/init.cpp index a637aac4d2079..97640b0658933 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -1329,8 +1329,12 @@ bool AppInitMain(NodeContext& node) node.banman = MakeUnique(GetDataDir() / "banlist.dat", &uiInterface, gArgs.GetArg("-bantime", DEFAULT_MISBEHAVING_BANTIME)); assert(!node.connman); node.connman = std::unique_ptr(new CConnman(GetRand(std::numeric_limits::max()), GetRand(std::numeric_limits::max()))); + // Make mempool generally available in the node context. For example the connection manager, wallet, or RPC threads, + // which are all started after this, may use it from the node context. + assert(!node.mempool); + node.mempool = &::mempool; - node.peer_logic.reset(new PeerLogicValidation(node.connman.get(), node.banman.get(), *node.scheduler)); + node.peer_logic.reset(new PeerLogicValidation(node.connman.get(), node.banman.get(), *node.scheduler, *node.mempool)); RegisterValidationInterface(node.peer_logic.get()); // sanitize comments per BIP-0014, format user agent and check total size @@ -1678,11 +1682,6 @@ bool AppInitMain(NodeContext& node) return false; } - // Now that the chain state is loaded, make mempool generally available in the node context. For example the - // connection manager, wallet, or RPC threads, which are all started after this, may use it from the node context. - assert(!node.mempool); - node.mempool = &::mempool; - fs::path est_path = GetDataDir() / FEE_ESTIMATES_FILENAME; CAutoFile est_filein(fsbridge::fopen(est_path, "rb"), SER_DISK, CLIENT_VERSION); // Allowed to fail as this file IS missing on first startup. diff --git a/src/net_processing.cpp b/src/net_processing.cpp index e4df9fb90e207..d9b048fd6510a 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -465,7 +465,7 @@ static bool MarkBlockAsReceived(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs // returns false, still setting pit, if the block was already in flight from the same peer // pit will only be valid as long as the same cs_main lock is being held -static bool MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const CBlockIndex* pindex = nullptr, std::list::iterator** pit = nullptr) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { +static bool MarkBlockAsInFlight(CTxMemPool& mempool, NodeId nodeid, const uint256& hash, const CBlockIndex* pindex = nullptr, std::list::iterator** pit = nullptr) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { CNodeState *state = State(nodeid); assert(state != nullptr); @@ -1102,8 +1102,11 @@ static bool BlockRequestAllowed(const CBlockIndex* pindex, const Consensus::Para (GetBlockProofEquivalentTime(*pindexBestHeader, *pindex, *pindexBestHeader, consensusParams) < STALE_RELAY_AGE_LIMIT); } -PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CScheduler& scheduler) - : connman(connmanIn), m_banman(banman), m_stale_tip_check_time(0) +PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CScheduler& scheduler, CTxMemPool& pool) + : connman(connmanIn), + m_banman(banman), + m_mempool(pool), + m_stale_tip_check_time(0) { // Initialize global variables that cannot be constructed at startup. recentRejects.reset(new CRollingBloomFilter(120000, 0.000001)); @@ -1314,7 +1317,7 @@ void PeerLogicValidation::BlockChecked(const CBlock& block, const BlockValidatio // -bool static AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +bool static AlreadyHave(const CInv& inv, const CTxMemPool& mempool) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { switch (inv.type) { @@ -1553,7 +1556,7 @@ void static ProcessGetBlockData(CNode* pfrom, const CChainParams& chainparams, c } } -void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, const std::atomic& interruptMsgProc) LOCKS_EXCLUDED(cs_main) +void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, const CTxMemPool& mempool, const std::atomic& interruptMsgProc) LOCKS_EXCLUDED(cs_main) { AssertLockNotHeld(cs_main); @@ -1666,7 +1669,7 @@ inline void static SendBlockTransactions(const CBlock& block, const BlockTransac connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp)); } -bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::vector& headers, const CChainParams& chainparams, bool via_compact_block) +bool static ProcessHeadersMessage(CNode* pfrom, CConnman* connman, CTxMemPool& mempool, const std::vector& headers, const CChainParams& chainparams, bool via_compact_block) { const CNetMsgMaker msgMaker(pfrom->GetSendVersion()); size_t nCount = headers.size(); @@ -1794,7 +1797,7 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve } uint32_t nFetchFlags = GetFetchFlags(pfrom); vGetData.push_back(CInv(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash())); - MarkBlockAsInFlight(pfrom->GetId(), pindex->GetBlockHash(), pindex); + MarkBlockAsInFlight(mempool, pfrom->GetId(), pindex->GetBlockHash(), pindex); LogPrint(BCLog::NET, "Requesting block %s from peer=%d\n", pindex->GetBlockHash().ToString(), pfrom->GetId()); } @@ -1848,7 +1851,7 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve return true; } -void static ProcessOrphanTx(CConnman* connman, std::set& orphan_work_set, std::list& removed_txn) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans) +void static ProcessOrphanTx(CConnman* connman, CTxMemPool& mempool, std::set& orphan_work_set, std::list& removed_txn) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans) { AssertLockHeld(cs_main); AssertLockHeld(g_cs_orphans); @@ -1908,7 +1911,7 @@ void static ProcessOrphanTx(CConnman* connman, std::set& orphan_work_se } } -bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, BanMan* banman, const std::atomic& interruptMsgProc) +bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CTxMemPool& mempool, CConnman* connman, BanMan* banman, const std::atomic& interruptMsgProc) { LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId()); if (gArgs.IsArgSet("-dropmessagestest") && GetRand(gArgs.GetArg("-dropmessagestest", 0)) == 0) @@ -2260,7 +2263,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vR if (interruptMsgProc) return true; - bool fAlreadyHave = AlreadyHave(inv); + bool fAlreadyHave = AlreadyHave(inv, mempool); LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom->GetId()); if (inv.type == MSG_TX) { @@ -2311,7 +2314,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vR } pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end()); - ProcessGetData(pfrom, chainparams, connman, interruptMsgProc); + ProcessGetData(pfrom, chainparams, connman, mempool, interruptMsgProc); return true; } @@ -2528,7 +2531,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vR std::list lRemovedTxn; - if (!AlreadyHave(inv) && + if (!AlreadyHave(inv, mempool) && AcceptToMemoryPool(mempool, state, ptx, &lRemovedTxn, false /* bypass_limits */, 0 /* nAbsurdFee */)) { mempool.check(&::ChainstateActive().CoinsTip()); RelayTransaction(tx.GetHash(), *connman); @@ -2549,7 +2552,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vR mempool.size(), mempool.DynamicMemoryUsage() / 1000); // Recursively process any orphan transactions that depended on this one - ProcessOrphanTx(connman, pfrom->orphan_work_set, lRemovedTxn); + ProcessOrphanTx(connman, mempool, pfrom->orphan_work_set, lRemovedTxn); } else if (state.GetResult() == TxValidationResult::TX_MISSING_INPUTS) { @@ -2567,7 +2570,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vR for (const CTxIn& txin : tx.vin) { CInv _inv(MSG_TX | nFetchFlags, txin.prevout.hash); pfrom->AddInventoryKnown(_inv); - if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, current_time); + if (!AlreadyHave(_inv, mempool)) RequestTx(State(pfrom->GetId()), _inv.hash, current_time); } AddOrphanTx(ptx, pfrom->GetId()); @@ -2742,7 +2745,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vR if ((!fAlreadyInFlight && nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || (fAlreadyInFlight && blockInFlightIt->second.first == pfrom->GetId())) { std::list::iterator* queuedBlockIt = nullptr; - if (!MarkBlockAsInFlight(pfrom->GetId(), pindex->GetBlockHash(), pindex, &queuedBlockIt)) { + if (!MarkBlockAsInFlight(mempool, pfrom->GetId(), pindex->GetBlockHash(), pindex, &queuedBlockIt)) { if (!(*queuedBlockIt)->partialBlock) (*queuedBlockIt)->partialBlock.reset(new PartiallyDownloadedBlock(&mempool)); else { @@ -2815,7 +2818,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vR } // cs_main if (fProcessBLOCKTXN) - return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman, banman, interruptMsgProc); + return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, mempool, connman, banman, interruptMsgProc); if (fRevertToHeaderProcessing) { // Headers received from HB compact block peers are permitted to be @@ -2823,7 +2826,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vR // the peer if the header turns out to be for an invalid block. // Note that if a peer tries to build on an invalid chain, that // will be detected and the peer will be banned. - return ProcessHeadersMessage(pfrom, connman, {cmpctblock.header}, chainparams, /*via_compact_block=*/true); + return ProcessHeadersMessage(pfrom, connman, mempool, {cmpctblock.header}, chainparams, /*via_compact_block=*/true); } if (fBlockReconstructed) { @@ -2967,7 +2970,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vR ReadCompactSize(vRecv); // ignore tx count; assume it is 0. } - return ProcessHeadersMessage(pfrom, connman, headers, chainparams, /*via_compact_block=*/false); + return ProcessHeadersMessage(pfrom, connman, mempool, headers, chainparams, /*via_compact_block=*/false); } if (strCommand == NetMsgType::BLOCK) @@ -3285,12 +3288,12 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic& inter bool fMoreWork = false; if (!pfrom->vRecvGetData.empty()) - ProcessGetData(pfrom, chainparams, connman, interruptMsgProc); + ProcessGetData(pfrom, chainparams, connman, m_mempool, interruptMsgProc); if (!pfrom->orphan_work_set.empty()) { std::list removed_txn; LOCK2(cs_main, g_cs_orphans); - ProcessOrphanTx(connman, pfrom->orphan_work_set, removed_txn); + ProcessOrphanTx(connman, m_mempool, pfrom->orphan_work_set, removed_txn); for (const CTransactionRef& removedTx : removed_txn) { AddToCompactExtraTransactions(removedTx); } @@ -3353,7 +3356,7 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic& inter bool fRet = false; try { - fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.m_time, chainparams, connman, m_banman, interruptMsgProc); + fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.m_time, chainparams, m_mempool, connman, m_banman, interruptMsgProc); if (interruptMsgProc) return false; if (!pfrom->vRecvGetData.empty()) @@ -3819,7 +3822,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) // Respond to BIP35 mempool requests if (fSendTrickle && pto->m_tx_relay->fSendMempool) { - auto vtxinfo = mempool.infoAll(); + auto vtxinfo = m_mempool.infoAll(); pto->m_tx_relay->fSendMempool = false; CFeeRate filterrate; { @@ -3865,7 +3868,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) } // Topologically and fee-rate sort the inventory we send for privacy and priority reasons. // A heap is used so that not all items need sorting if only a few are being sent. - CompareInvMempoolOrder compareInvMempoolOrder(&mempool); + CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool); std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder); // No reason to drain out at many times the network's capacity, // especially since we have many peers and some will draw much shorter delays. @@ -3884,7 +3887,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) continue; } // Not in the mempool anymore? don't bother sending it. - auto txinfo = mempool.info(hash); + auto txinfo = m_mempool.info(hash); if (!txinfo.tx) { continue; } @@ -3996,7 +3999,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) for (const CBlockIndex *pindex : vToDownload) { uint32_t nFetchFlags = GetFetchFlags(pto); vGetData.push_back(CInv(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash())); - MarkBlockAsInFlight(pto->GetId(), pindex->GetBlockHash(), pindex); + MarkBlockAsInFlight(m_mempool, pto->GetId(), pindex->GetBlockHash(), pindex); LogPrint(BCLog::NET, "Requesting block %s (%d) peer=%d\n", pindex->GetBlockHash().ToString(), pindex->nHeight, pto->GetId()); } @@ -4039,7 +4042,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) // processing at a later time, see below) tx_process_time.erase(tx_process_time.begin()); CInv inv(MSG_TX | GetFetchFlags(pto), txid); - if (!AlreadyHave(inv)) { + if (!AlreadyHave(inv, m_mempool)) { // If this transaction was last requested more than 1 minute ago, // then request. const auto last_request_time = GetTxRequestTime(inv.hash); @@ -4077,7 +4080,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) // We don't want white listed peers to filter txs to us if we have -whitelistforcerelay if (pto->m_tx_relay != nullptr && pto->nVersion >= FEEFILTER_VERSION && gArgs.GetBoolArg("-feefilter", DEFAULT_FEEFILTER) && !pto->HasPermission(PF_FORCERELAY)) { - CAmount currentFilter = mempool.GetMinFee(gArgs.GetArg("-maxmempool", DEFAULT_MAX_MEMPOOL_SIZE) * 1000000).GetFeePerK(); + CAmount currentFilter = m_mempool.GetMinFee(gArgs.GetArg("-maxmempool", DEFAULT_MAX_MEMPOOL_SIZE) * 1000000).GetFeePerK(); int64_t timeNow = GetTimeMicros(); if (timeNow > pto->m_tx_relay->nextSendTimeFeeFilter) { static CFeeRate default_feerate(DEFAULT_MIN_RELAY_TX_FEE); diff --git a/src/net_processing.h b/src/net_processing.h index 6f26abc209ee8..b73037722c44f 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -6,10 +6,12 @@ #ifndef BITCOIN_NET_PROCESSING_H #define BITCOIN_NET_PROCESSING_H -#include -#include #include +#include #include +#include + +class CTxMemPool; extern RecursiveMutex cs_main; @@ -23,11 +25,12 @@ class PeerLogicValidation final : public CValidationInterface, public NetEventsI private: CConnman* const connman; BanMan* const m_banman; + CTxMemPool& m_mempool; bool CheckIfBanned(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(cs_main); public: - PeerLogicValidation(CConnman* connman, BanMan* banman, CScheduler& scheduler); + PeerLogicValidation(CConnman* connman, BanMan* banman, CScheduler& scheduler, CTxMemPool& pool); /** * Overridden from CValidationInterface. diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp index e5d51ab83bf03..73bce6f789709 100644 --- a/src/test/denialofservice_tests.cpp +++ b/src/test/denialofservice_tests.cpp @@ -78,7 +78,7 @@ BOOST_FIXTURE_TEST_SUITE(denialofservice_tests, TestingSetup) BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) { auto connman = MakeUnique(0x1337, 0x1337); - auto peerLogic = MakeUnique(connman.get(), nullptr, *m_node.scheduler); + auto peerLogic = MakeUnique(connman.get(), nullptr, *m_node.scheduler, *m_node.mempool); // Mock an outbound peer CAddress addr1(ip(0xa0b0c001), NODE_NONE); @@ -148,7 +148,7 @@ static void AddRandomOutboundPeer(std::vector &vNodes, PeerLogicValidat BOOST_AUTO_TEST_CASE(stale_tip_peer_management) { auto connman = MakeUnique(0x1337, 0x1337); - auto peerLogic = MakeUnique(connman.get(), nullptr, *m_node.scheduler); + auto peerLogic = MakeUnique(connman.get(), nullptr, *m_node.scheduler, *m_node.mempool); const Consensus::Params& consensusParams = Params().GetConsensus(); constexpr int max_outbound_full_relay = 8; @@ -221,7 +221,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning) { auto banman = MakeUnique(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME); auto connman = MakeUnique(0x1337, 0x1337); - auto peerLogic = MakeUnique(connman.get(), banman.get(), *m_node.scheduler); + auto peerLogic = MakeUnique(connman.get(), banman.get(), *m_node.scheduler, *m_node.mempool); banman->ClearBanned(); CAddress addr1(ip(0xa0b0c001), NODE_NONE); @@ -276,7 +276,7 @@ BOOST_AUTO_TEST_CASE(DoS_banscore) { auto banman = MakeUnique(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME); auto connman = MakeUnique(0x1337, 0x1337); - auto peerLogic = MakeUnique(connman.get(), banman.get(), *m_node.scheduler); + auto peerLogic = MakeUnique(connman.get(), banman.get(), *m_node.scheduler, *m_node.mempool); banman->ClearBanned(); gArgs.ForceSetArg("-banscore", "111"); // because 11 is my favorite number @@ -323,7 +323,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) { auto banman = MakeUnique(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME); auto connman = MakeUnique(0x1337, 0x1337); - auto peerLogic = MakeUnique(connman.get(), banman.get(), *m_node.scheduler); + auto peerLogic = MakeUnique(connman.get(), banman.get(), *m_node.scheduler, *m_node.mempool); banman->ClearBanned(); int64_t nStartTime = GetTime(); diff --git a/src/test/fuzz/process_message.cpp b/src/test/fuzz/process_message.cpp index 934f741068498..dc49dd499a156 100644 --- a/src/test/fuzz/process_message.cpp +++ b/src/test/fuzz/process_message.cpp @@ -32,7 +32,7 @@ #include #include -bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, BanMan* banman, const std::atomic& interruptMsgProc); +bool ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CTxMemPool& mempool, CConnman* connman, BanMan* banman, const std::atomic& interruptMsgProc); namespace { @@ -85,7 +85,7 @@ void test_one_input(const std::vector& buffer) p2p_node.SetSendVersion(PROTOCOL_VERSION); g_setup->m_node.peer_logic->InitializeNode(&p2p_node); try { - (void)ProcessMessage(&p2p_node, random_message_type, random_bytes_data_stream, GetTimeMillis(), Params(), g_setup->m_node.connman.get(), g_setup->m_node.banman.get(), std::atomic{false}); + (void)ProcessMessage(&p2p_node, random_message_type, random_bytes_data_stream, GetTimeMillis(), Params(), *g_setup->m_node.mempool, g_setup->m_node.connman.get(), g_setup->m_node.banman.get(), std::atomic{false}); } catch (const std::ios_base::failure& e) { const std::string exception_message{e.what()}; const auto p = EXPECTED_DESERIALIZATION_EXCEPTIONS.find(exception_message); diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index ad3ff48860494..e19a96eafca1f 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -137,7 +137,7 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha m_node.mempool->setSanityCheck(1.0); m_node.banman = MakeUnique(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME); m_node.connman = MakeUnique(0x1337, 0x1337); // Deterministic randomness for tests. - m_node.peer_logic = MakeUnique(m_node.connman.get(), m_node.banman.get(), *m_node.scheduler); + m_node.peer_logic = MakeUnique(m_node.connman.get(), m_node.banman.get(), *m_node.scheduler, *m_node.mempool); } TestingSetup::~TestingSetup()