Skip to content

Commit

Permalink
Merge bitcoin#19829: net processing: Move block inventory state to ne…
Browse files Browse the repository at this point in the history
…t_processing

3002b4a [net processing] Guard m_continuation_block with m_block_inv_mutex (John Newbery)
184557e [net processing] Move hashContinue to net processing (John Newbery)
c853ef0 scripted-diff: rename vBlockHashesToAnnounce and vInventoryBlockToSend (John Newbery)
53b7ac1 [net processing] Move block inventory data to Peer (John Newbery)
78040f9 [net processing] Rename nStartingHeight to m_starting_height (John Newbery)
77a2c2f [net processing] Move nStartingHeight to Peer (John Newbery)
717a374 [net processing] Improve documentation for Peer destruction/locking (John Newbery)

Pull request description:

  This continues the work of moving application layer data into net_processing, by moving all block inventory state into the new Peer object added in bitcoin#19607.

  For motivation, see bitcoin#19398.

ACKs for top commit:
  jonatack:
    re-ACK 3002b4a per `git diff 9aad3e4 3002b4a`
  Sjors:
    Code review re-ACK 3002b4a
  MarcoFalke:
    re-ACK 3002b4a 🌓

Tree-SHA512: eb2b474b73b025791ee3e6e41809926b332b48468763219f31638ca390f427632f05902dfc6a2c6bdc1ce47b215782f67874ddbf05b97d77d5897b7e2abfe4d9
  • Loading branch information
MarcoFalke committed Dec 22, 2020
2 parents 209974d + 3002b4a commit df127ec
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 64 deletions.
2 changes: 0 additions & 2 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,6 @@ void CNode::copyStats(CNodeStats &stats, const std::vector<bool> &m_asmap)
stats.m_manual_connection = IsManualConn();
X(m_bip152_highbandwidth_to);
X(m_bip152_highbandwidth_from);
X(nStartingHeight);
{
LOCK(cs_vSend);
X(mapSendBytesPerMsgCmd);
Expand Down Expand Up @@ -2956,7 +2955,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
{
hSocket = hSocketIn;
addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn;
hashContinue = uint256();
if (conn_type_in != ConnectionType::BLOCK_RELAY) {
m_tx_relay = MakeUnique<TxRelay>();
}
Expand Down
13 changes: 1 addition & 12 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ class CNodeStats
bool m_manual_connection;
bool m_bip152_highbandwidth_to;
bool m_bip152_highbandwidth_from;
int nStartingHeight;
int m_starting_height;
uint64_t nSendBytes;
mapMsgCmdSize mapSendBytesPerMsgCmd;
uint64_t nRecvBytes;
Expand Down Expand Up @@ -993,8 +993,6 @@ class CNode
mapMsgCmdSize mapRecvBytesPerMsgCmd GUARDED_BY(cs_vRecv);

public:
uint256 hashContinue;
std::atomic<int> nStartingHeight{-1};
// We selected peer as (compact blocks) high-bandwidth peer (BIP152)
std::atomic<bool> m_bip152_highbandwidth_to{false};
// Peer selected us as (compact blocks) high-bandwidth peer (BIP152)
Expand All @@ -1007,12 +1005,6 @@ class CNode
std::chrono::microseconds m_next_addr_send GUARDED_BY(cs_sendProcessing){0};
std::chrono::microseconds m_next_local_addr_send GUARDED_BY(cs_sendProcessing){0};

// List of block ids we still have announce.
// There is no final sorting before sending, as they are always sent immediately
// and in the order requested.
std::vector<uint256> vInventoryBlockToSend GUARDED_BY(cs_inventory);
Mutex cs_inventory;

struct TxRelay {
mutable RecursiveMutex cs_filter;
// We use fRelayTxes for two purposes -
Expand Down Expand Up @@ -1043,9 +1035,6 @@ class CNode
// m_tx_relay == nullptr if we're not relaying transactions with this peer
std::unique_ptr<TxRelay> m_tx_relay;

// Used for headers announcements - unfiltered blocks to relay
std::vector<uint256> vBlockHashesToAnnounce GUARDED_BY(cs_inventory);

/** UNIX epoch time of the last block received from this peer that we had
* not yet seen (e.g. not already received from another peer), that passed
* preliminary validity checks and was saved to disk, even if we don't
Expand Down
105 changes: 60 additions & 45 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,11 @@ void PeerManager::FinalizeNode(const CNode& node, bool& fUpdateConnectionTime) {
LOCK(cs_main);
int misbehavior{0};
{
// We remove the PeerRef from g_peer_map here, but we don't always
// destruct the Peer. Sometimes another thread is still holding a
// PeerRef, so the refcount is >= 1. Be careful not to do any
// processing here that assumes Peer won't be changed before it's
// destructed.
PeerRef peer = RemovePeer(nodeid);
assert(peer != nullptr);
misbehavior = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score);
Expand Down Expand Up @@ -870,6 +875,7 @@ bool PeerManager::GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
PeerRef peer = GetPeerRef(nodeid);
if (peer == nullptr) return false;
stats.m_misbehavior_score = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score);
stats.m_starting_height = peer->m_starting_height;

return true;
}
Expand Down Expand Up @@ -1309,13 +1315,17 @@ void PeerManager::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInde
}
}

// Relay to all peers
m_connman.ForEachNode([&vHashes](CNode* pnode) {
LOCK(pnode->cs_inventory);
for (const uint256& hash : reverse_iterate(vHashes)) {
pnode->vBlockHashesToAnnounce.push_back(hash);
{
LOCK(m_peer_mutex);
for (auto& it : m_peer_map) {
Peer& peer = *it.second;
LOCK(peer.m_block_inv_mutex);
for (const uint256& hash : reverse_iterate(vHashes)) {
peer.m_blocks_for_headers_relay.push_back(hash);
}
}
});
}

m_connman.WakeMessageHandler();
}

Expand Down Expand Up @@ -1465,7 +1475,7 @@ static void RelayAddress(const CNode& originator,
connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc));
}

void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, const CInv& inv, CConnman& connman)
void static ProcessGetBlockData(CNode& pfrom, Peer& peer, const CChainParams& chainparams, const CInv& inv, CConnman& connman)
{
bool send = false;
std::shared_ptr<const CBlock> a_recent_block;
Expand Down Expand Up @@ -1605,16 +1615,18 @@ void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, c
}
}

// Trigger the peer node to send a getblocks request for the next batch of inventory
if (inv.hash == pfrom.hashContinue)
{
// Send immediately. This must send even if redundant,
// and we want it right after the last block so they don't
// wait for other stuff first.
std::vector<CInv> vInv;
vInv.push_back(CInv(MSG_BLOCK, ::ChainActive().Tip()->GetBlockHash()));
connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::INV, vInv));
pfrom.hashContinue.SetNull();
LOCK(peer.m_block_inv_mutex);
// Trigger the peer node to send a getblocks request for the next batch of inventory
if (inv.hash == peer.m_continuation_block) {
// Send immediately. This must send even if redundant,
// and we want it right after the last block so they don't
// wait for other stuff first.
std::vector<CInv> vInv;
vInv.push_back(CInv(MSG_BLOCK, ::ChainActive().Tip()->GetBlockHash()));
connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::INV, vInv));
peer.m_continuation_block.SetNull();
}
}
}
}
Expand Down Expand Up @@ -1714,7 +1726,7 @@ void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainpa
if (it != peer.m_getdata_requests.end() && !pfrom.fPauseSend) {
const CInv &inv = *it++;
if (inv.IsGenBlkMsg()) {
ProcessGetBlockData(pfrom, chainparams, inv, connman);
ProcessGetBlockData(pfrom, peer, chainparams, inv, connman);
}
// else: If the first item on the queue is an unknown type, we erase it
// and continue processing the queue on the next call.
Expand Down Expand Up @@ -1764,7 +1776,9 @@ void PeerManager::SendBlockTransactions(CNode& pfrom, const CBlock& block, const
m_connman.PushMessage(&pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp));
}

void PeerManager::ProcessHeadersMessage(CNode& pfrom, const std::vector<CBlockHeader>& headers, bool via_compact_block)
void PeerManager::ProcessHeadersMessage(CNode& pfrom, const Peer& peer,
const std::vector<CBlockHeader>& headers,
bool via_compact_block)
{
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());
size_t nCount = headers.size();
Expand Down Expand Up @@ -1854,7 +1868,8 @@ void PeerManager::ProcessHeadersMessage(CNode& pfrom, const std::vector<CBlockHe
// Headers message had its maximum size; the peer may have more headers.
// TODO: optimize: if pindexLast is an ancestor of ::ChainActive().Tip or pindexBestHeader, continue
// from there instead.
LogPrint(BCLog::NET, "more getheaders (%d) to end to peer=%d (startheight:%d)\n", pindexLast->nHeight, pfrom.GetId(), pfrom.nStartingHeight);
LogPrint(BCLog::NET, "more getheaders (%d) to end to peer=%d (startheight:%d)\n",
pindexLast->nHeight, pfrom.GetId(), peer.m_starting_height);
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexLast), uint256()));
}

Expand Down Expand Up @@ -2280,7 +2295,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
ServiceFlags nServices;
int nVersion;
std::string cleanSubVer;
int nStartingHeight = -1;
int starting_height = -1;
bool fRelay = true;

vRecv >> nVersion >> nServiceInt >> nTime >> addrMe;
Expand Down Expand Up @@ -2311,7 +2326,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
cleanSubVer = SanitizeString(strSubVer);
}
if (!vRecv.empty()) {
vRecv >> nStartingHeight;
vRecv >> starting_height;
}
if (!vRecv.empty())
vRecv >> fRelay;
Expand Down Expand Up @@ -2360,7 +2375,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
LOCK(pfrom.cs_SubVer);
pfrom.cleanSubVer = cleanSubVer;
}
pfrom.nStartingHeight = nStartingHeight;
peer->m_starting_height = starting_height;

// set nodes not relaying blocks and tx and not serving (parts) of the historical blockchain as "clients"
pfrom.fClient = (!(nServices & NODE_NETWORK) && !(nServices & NODE_NETWORK_LIMITED));
Expand Down Expand Up @@ -2440,7 +2455,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat

LogPrint(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, peer=%d%s\n",
cleanSubVer, pfrom.nVersion,
pfrom.nStartingHeight, addrMe.ToString(), pfrom.GetId(),
peer->m_starting_height, addrMe.ToString(), pfrom.GetId(),
remoteAddr);

int64_t nTimeOffset = nTime - GetTime();
Expand Down Expand Up @@ -2474,7 +2489,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat

if (!pfrom.IsInboundConn()) {
LogPrintf("New outbound peer connected: version: %d, blocks=%d, peer=%d%s (%s)\n",
pfrom.nVersion.load(), pfrom.nStartingHeight,
pfrom.nVersion.load(), peer->m_starting_height,
pfrom.GetId(), (fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToString()) : ""),
pfrom.ConnectionTypeAsString());
}
Expand Down Expand Up @@ -2786,13 +2801,12 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
LogPrint(BCLog::NET, " getblocks stopping, pruned or too old block at %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString());
break;
}
WITH_LOCK(pfrom.cs_inventory, pfrom.vInventoryBlockToSend.push_back(pindex->GetBlockHash()));
if (--nLimit <= 0)
{
WITH_LOCK(peer->m_block_inv_mutex, peer->m_blocks_for_inv_relay.push_back(pindex->GetBlockHash()));
if (--nLimit <= 0) {
// When this block is requested, we'll send an inv that'll
// trigger the peer to getblocks the next batch of inventory.
LogPrint(BCLog::NET, " getblocks stopping at limit %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString());
pfrom.hashContinue = pindex->GetBlockHash();
WITH_LOCK(peer->m_block_inv_mutex, {peer->m_continuation_block = pindex->GetBlockHash();});
break;
}
}
Expand Down Expand Up @@ -3316,7 +3330,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// the peer if the header turns out to be for an invalid block.
// Note that if a peer tries to build on an invalid chain, that
// will be detected and the peer will be disconnected/discouraged.
return ProcessHeadersMessage(pfrom, {cmpctblock.header}, /*via_compact_block=*/true);
return ProcessHeadersMessage(pfrom, *peer, {cmpctblock.header}, /*via_compact_block=*/true);
}

if (fBlockReconstructed) {
Expand Down Expand Up @@ -3459,7 +3473,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
ReadCompactSize(vRecv); // ignore tx count; assume it is 0.
}

return ProcessHeadersMessage(pfrom, headers, /*via_compact_block=*/false);
return ProcessHeadersMessage(pfrom, *peer, headers, /*via_compact_block=*/false);
}

if (msg_type == NetMsgType::BLOCK)
Expand Down Expand Up @@ -4067,6 +4081,7 @@ class CompareInvMempoolOrder

bool PeerManager::SendMessages(CNode* pto)
{
PeerRef peer = GetPeerRef(pto->GetId());
const Consensus::Params& consensusParams = m_chainparams.GetConsensus();

// We must call MaybeDiscourageAndDisconnect first, to ensure that we'll
Expand Down Expand Up @@ -4192,7 +4207,7 @@ bool PeerManager::SendMessages(CNode* pto)
got back an empty response. */
if (pindexStart->pprev)
pindexStart = pindexStart->pprev;
LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), pto->nStartingHeight);
LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), peer->m_starting_height);
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexStart), uint256()));
}
}
Expand All @@ -4208,11 +4223,11 @@ bool PeerManager::SendMessages(CNode* pto)
// If no header would connect, or if we have too many
// blocks, or if the peer doesn't want headers, just
// add all to the inv queue.
LOCK(pto->cs_inventory);
LOCK(peer->m_block_inv_mutex);
std::vector<CBlock> vHeaders;
bool fRevertToInv = ((!state.fPreferHeaders &&
(!state.fPreferHeaderAndIDs || pto->vBlockHashesToAnnounce.size() > 1)) ||
pto->vBlockHashesToAnnounce.size() > MAX_BLOCKS_TO_ANNOUNCE);
(!state.fPreferHeaderAndIDs || peer->m_blocks_for_headers_relay.size() > 1)) ||
peer->m_blocks_for_headers_relay.size() > MAX_BLOCKS_TO_ANNOUNCE);
const CBlockIndex *pBestIndex = nullptr; // last header queued for delivery
ProcessBlockAvailability(pto->GetId()); // ensure pindexBestKnownBlock is up-to-date

Expand All @@ -4221,7 +4236,7 @@ bool PeerManager::SendMessages(CNode* pto)
// Try to find first header that our peer doesn't have, and
// then send all headers past that one. If we come across any
// headers that aren't on ::ChainActive(), give up.
for (const uint256 &hash : pto->vBlockHashesToAnnounce) {
for (const uint256& hash : peer->m_blocks_for_headers_relay) {
const CBlockIndex* pindex = LookupBlockIndex(hash);
assert(pindex);
if (::ChainActive()[pindex->nHeight] != pindex) {
Expand All @@ -4238,7 +4253,7 @@ bool PeerManager::SendMessages(CNode* pto)
// which should be caught by the prior check), but one
// way this could happen is by using invalidateblock /
// reconsiderblock repeatedly on the tip, causing it to
// be added multiple times to vBlockHashesToAnnounce.
// be added multiple times to m_blocks_for_headers_relay.
// Robustly deal with this rare situation by reverting
// to an inv.
fRevertToInv = true;
Expand Down Expand Up @@ -4310,10 +4325,10 @@ bool PeerManager::SendMessages(CNode* pto)
}
if (fRevertToInv) {
// If falling back to using an inv, just try to inv the tip.
// The last entry in vBlockHashesToAnnounce was our tip at some point
// The last entry in m_blocks_for_headers_relay was our tip at some point
// in the past.
if (!pto->vBlockHashesToAnnounce.empty()) {
const uint256 &hashToAnnounce = pto->vBlockHashesToAnnounce.back();
if (!peer->m_blocks_for_headers_relay.empty()) {
const uint256& hashToAnnounce = peer->m_blocks_for_headers_relay.back();
const CBlockIndex* pindex = LookupBlockIndex(hashToAnnounce);
assert(pindex);

Expand All @@ -4327,32 +4342,32 @@ bool PeerManager::SendMessages(CNode* pto)

// If the peer's chain has this block, don't inv it back.
if (!PeerHasHeader(&state, pindex)) {
pto->vInventoryBlockToSend.push_back(hashToAnnounce);
peer->m_blocks_for_inv_relay.push_back(hashToAnnounce);
LogPrint(BCLog::NET, "%s: sending inv peer=%d hash=%s\n", __func__,
pto->GetId(), hashToAnnounce.ToString());
}
}
}
pto->vBlockHashesToAnnounce.clear();
peer->m_blocks_for_headers_relay.clear();
}

//
// Message: inventory
//
std::vector<CInv> vInv;
{
LOCK(pto->cs_inventory);
vInv.reserve(std::max<size_t>(pto->vInventoryBlockToSend.size(), INVENTORY_BROADCAST_MAX));
LOCK(peer->m_block_inv_mutex);
vInv.reserve(std::max<size_t>(peer->m_blocks_for_inv_relay.size(), INVENTORY_BROADCAST_MAX));

// Add blocks
for (const uint256& hash : pto->vInventoryBlockToSend) {
for (const uint256& hash : peer->m_blocks_for_inv_relay) {
vInv.push_back(CInv(MSG_BLOCK, hash));
if (vInv.size() == MAX_INV_SZ) {
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
vInv.clear();
}
}
pto->vInventoryBlockToSend.clear();
peer->m_blocks_for_inv_relay.clear();

if (pto->m_tx_relay != nullptr) {
LOCK(pto->m_tx_relay->cs_tx_inventory);
Expand Down
Loading

0 comments on commit df127ec

Please sign in to comment.