Skip to content

net processing: Move block inventory state to net_processing #19829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,6 @@ void CNode::copyStats(CNodeStats &stats, const std::vector<bool> &m_asmap)
stats.m_manual_connection = IsManualConn();
X(m_bip152_highbandwidth_to);
X(m_bip152_highbandwidth_from);
X(nStartingHeight);
{
LOCK(cs_vSend);
X(mapSendBytesPerMsgCmd);
Expand Down Expand Up @@ -2956,7 +2955,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
{
hSocket = hSocketIn;
addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn;
hashContinue = uint256();
if (conn_type_in != ConnectionType::BLOCK_RELAY) {
m_tx_relay = MakeUnique<TxRelay>();
}
Expand Down
13 changes: 1 addition & 12 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ class CNodeStats
bool m_manual_connection;
bool m_bip152_highbandwidth_to;
bool m_bip152_highbandwidth_from;
int nStartingHeight;
int m_starting_height;
uint64_t nSendBytes;
mapMsgCmdSize mapSendBytesPerMsgCmd;
uint64_t nRecvBytes;
Expand Down Expand Up @@ -993,8 +993,6 @@ class CNode
mapMsgCmdSize mapRecvBytesPerMsgCmd GUARDED_BY(cs_vRecv);

public:
uint256 hashContinue;
std::atomic<int> nStartingHeight{-1};
// We selected peer as (compact blocks) high-bandwidth peer (BIP152)
std::atomic<bool> m_bip152_highbandwidth_to{false};
// Peer selected us as (compact blocks) high-bandwidth peer (BIP152)
Expand All @@ -1007,12 +1005,6 @@ class CNode
std::chrono::microseconds m_next_addr_send GUARDED_BY(cs_sendProcessing){0};
std::chrono::microseconds m_next_local_addr_send GUARDED_BY(cs_sendProcessing){0};

// List of block ids we still have announce.
// There is no final sorting before sending, as they are always sent immediately
// and in the order requested.
std::vector<uint256> vInventoryBlockToSend GUARDED_BY(cs_inventory);
Mutex cs_inventory;

struct TxRelay {
mutable RecursiveMutex cs_filter;
// We use fRelayTxes for two purposes -
Expand Down Expand Up @@ -1043,9 +1035,6 @@ class CNode
// m_tx_relay == nullptr if we're not relaying transactions with this peer
std::unique_ptr<TxRelay> m_tx_relay;

// Used for headers announcements - unfiltered blocks to relay
std::vector<uint256> vBlockHashesToAnnounce GUARDED_BY(cs_inventory);

/** UNIX epoch time of the last block received from this peer that we had
* not yet seen (e.g. not already received from another peer), that passed
* preliminary validity checks and was saved to disk, even if we don't
Expand Down
105 changes: 60 additions & 45 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,11 @@ void PeerManager::FinalizeNode(const CNode& node, bool& fUpdateConnectionTime) {
LOCK(cs_main);
int misbehavior{0};
{
// We remove the PeerRef from g_peer_map here, but we don't always
// destruct the Peer. Sometimes another thread is still holding a
// PeerRef, so the refcount is >= 1. Be careful not to do any
// processing here that assumes Peer won't be changed before it's
// destructed.
PeerRef peer = RemovePeer(nodeid);
assert(peer != nullptr);
misbehavior = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score);
Expand Down Expand Up @@ -870,6 +875,7 @@ bool PeerManager::GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
PeerRef peer = GetPeerRef(nodeid);
if (peer == nullptr) return false;
stats.m_misbehavior_score = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score);
stats.m_starting_height = peer->m_starting_height;

return true;
}
Expand Down Expand Up @@ -1309,13 +1315,17 @@ void PeerManager::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInde
}
}

// Relay to all peers
m_connman.ForEachNode([&vHashes](CNode* pnode) {
LOCK(pnode->cs_inventory);
for (const uint256& hash : reverse_iterate(vHashes)) {
pnode->vBlockHashesToAnnounce.push_back(hash);
{
LOCK(m_peer_mutex);
for (auto& it : m_peer_map) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a50dee5 : ForEachNode performs an additional check NodeFullyConnected (pnode->fSuccessfullyConnected && !pnode->fDisconnect;).

Not sure if that matters much here. Could we prematurely send headers before VERACK?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent observation. Block hashes could be pushed into m_blocks_for_headers_relay before the version/verack handshake is complete. However, we'll never send any blocks inventory before we're fully connected due to the guard clause at the top of SendMessages():

bitcoin/src/net_processing.cpp

Lines 4077 to 4078 in c434e2c

if (!pto->fSuccessfullyConnected || pto->fDisconnect)
return true;

I think this behaviour is ok. The version-verack handshake usually lasts no more than a second or two, and at the worst case times out after a minute, so we're never going to push many block hashes onto this vector.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a50dee5

Any reason to not have a ForEachPeer member function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had one but removed it because it didn't seem worth it. We can revisit that decision later if it seems useful.

ForEachNode was useful because it allowed passing a lambda into CConnman which would be executed while the cs_vNodes lock was held. Here, the m_peer_mutex is in PeerMan, so it can be held directly.

Peer& peer = *it.second;
LOCK(peer.m_block_inv_mutex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the first place where we are acquiring a lock while already holding m_peer_mutex. Any thoughts on what is the best way to document this design, so that future code writers know that acquiring m_peer_mutex while holding a peer's m_block_inv_mutex is not allowed?

Or, would it be better to first copy all the Peer objects to a temporary place, release the m_peer_mutex lock, and then grab the per-peer m_block_inv_mutex locks to avoid holding both at the same time? (I don't know or have an opinion; just trying to figure out the locking design here.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a comment next to m_peer_mutex suffice here? This design (take m_peer_mutex first, then lock the mutexes protecting the individual data members) is analogous to ForEachNode() was doing here before (take cs_vNodes first, then lock the mutexes protecting the individual data members in the lambda - here cs_inventory).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments to the first commit. Let me know if you think more is required.

for (const uint256& hash : reverse_iterate(vHashes)) {
peer.m_blocks_for_headers_relay.push_back(hash);
}
}
});
}

m_connman.WakeMessageHandler();
}

Expand Down Expand Up @@ -1465,7 +1475,7 @@ static void RelayAddress(const CNode& originator,
connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc));
}

void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, const CInv& inv, CConnman& connman)
void static ProcessGetBlockData(CNode& pfrom, Peer& peer, const CChainParams& chainparams, const CInv& inv, CConnman& connman)
{
bool send = false;
std::shared_ptr<const CBlock> a_recent_block;
Expand Down Expand Up @@ -1605,16 +1615,18 @@ void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, c
}
}

// Trigger the peer node to send a getblocks request for the next batch of inventory
if (inv.hash == pfrom.hashContinue)
{
// Send immediately. This must send even if redundant,
// and we want it right after the last block so they don't
// wait for other stuff first.
std::vector<CInv> vInv;
vInv.push_back(CInv(MSG_BLOCK, ::ChainActive().Tip()->GetBlockHash()));
connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::INV, vInv));
pfrom.hashContinue.SetNull();
LOCK(peer.m_block_inv_mutex);
// Trigger the peer node to send a getblocks request for the next batch of inventory
if (inv.hash == peer.m_continuation_block) {
// Send immediately. This must send even if redundant,
// and we want it right after the last block so they don't
// wait for other stuff first.
std::vector<CInv> vInv;
vInv.push_back(CInv(MSG_BLOCK, ::ChainActive().Tip()->GetBlockHash()));
connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::INV, vInv));
peer.m_continuation_block.SetNull();
}
}
}
}
Expand Down Expand Up @@ -1714,7 +1726,7 @@ void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainpa
if (it != peer.m_getdata_requests.end() && !pfrom.fPauseSend) {
const CInv &inv = *it++;
if (inv.IsGenBlkMsg()) {
ProcessGetBlockData(pfrom, chainparams, inv, connman);
ProcessGetBlockData(pfrom, peer, chainparams, inv, connman);
}
// else: If the first item on the queue is an unknown type, we erase it
// and continue processing the queue on the next call.
Expand Down Expand Up @@ -1764,7 +1776,9 @@ void PeerManager::SendBlockTransactions(CNode& pfrom, const CBlock& block, const
m_connman.PushMessage(&pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp));
}

void PeerManager::ProcessHeadersMessage(CNode& pfrom, const std::vector<CBlockHeader>& headers, bool via_compact_block)
void PeerManager::ProcessHeadersMessage(CNode& pfrom, const Peer& peer,
const std::vector<CBlockHeader>& headers,
bool via_compact_block)
{
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());
size_t nCount = headers.size();
Expand Down Expand Up @@ -1854,7 +1868,8 @@ void PeerManager::ProcessHeadersMessage(CNode& pfrom, const std::vector<CBlockHe
// Headers message had its maximum size; the peer may have more headers.
// TODO: optimize: if pindexLast is an ancestor of ::ChainActive().Tip or pindexBestHeader, continue
// from there instead.
LogPrint(BCLog::NET, "more getheaders (%d) to end to peer=%d (startheight:%d)\n", pindexLast->nHeight, pfrom.GetId(), pfrom.nStartingHeight);
LogPrint(BCLog::NET, "more getheaders (%d) to end to peer=%d (startheight:%d)\n",
pindexLast->nHeight, pfrom.GetId(), peer.m_starting_height);
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexLast), uint256()));
}

Expand Down Expand Up @@ -2280,7 +2295,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
ServiceFlags nServices;
int nVersion;
std::string cleanSubVer;
int nStartingHeight = -1;
int starting_height = -1;
bool fRelay = true;

vRecv >> nVersion >> nServiceInt >> nTime >> addrMe;
Expand Down Expand Up @@ -2311,7 +2326,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
cleanSubVer = SanitizeString(strSubVer);
}
if (!vRecv.empty()) {
vRecv >> nStartingHeight;
vRecv >> starting_height;
}
if (!vRecv.empty())
vRecv >> fRelay;
Expand Down Expand Up @@ -2360,7 +2375,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
LOCK(pfrom.cs_SubVer);
pfrom.cleanSubVer = cleanSubVer;
}
pfrom.nStartingHeight = nStartingHeight;
peer->m_starting_height = starting_height;

// set nodes not relaying blocks and tx and not serving (parts) of the historical blockchain as "clients"
pfrom.fClient = (!(nServices & NODE_NETWORK) && !(nServices & NODE_NETWORK_LIMITED));
Expand Down Expand Up @@ -2440,7 +2455,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat

LogPrint(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, peer=%d%s\n",
cleanSubVer, pfrom.nVersion,
pfrom.nStartingHeight, addrMe.ToString(), pfrom.GetId(),
peer->m_starting_height, addrMe.ToString(), pfrom.GetId(),
remoteAddr);

int64_t nTimeOffset = nTime - GetTime();
Expand Down Expand Up @@ -2474,7 +2489,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat

if (!pfrom.IsInboundConn()) {
LogPrintf("New outbound peer connected: version: %d, blocks=%d, peer=%d%s (%s)\n",
pfrom.nVersion.load(), pfrom.nStartingHeight,
pfrom.nVersion.load(), peer->m_starting_height,
pfrom.GetId(), (fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToString()) : ""),
pfrom.ConnectionTypeAsString());
}
Expand Down Expand Up @@ -2786,13 +2801,12 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
LogPrint(BCLog::NET, " getblocks stopping, pruned or too old block at %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString());
break;
}
WITH_LOCK(pfrom.cs_inventory, pfrom.vInventoryBlockToSend.push_back(pindex->GetBlockHash()));
if (--nLimit <= 0)
{
WITH_LOCK(peer->m_block_inv_mutex, peer->m_blocks_for_inv_relay.push_back(pindex->GetBlockHash()));
if (--nLimit <= 0) {
// When this block is requested, we'll send an inv that'll
// trigger the peer to getblocks the next batch of inventory.
LogPrint(BCLog::NET, " getblocks stopping at limit %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString());
pfrom.hashContinue = pindex->GetBlockHash();
WITH_LOCK(peer->m_block_inv_mutex, {peer->m_continuation_block = pindex->GetBlockHash();});
break;
}
}
Expand Down Expand Up @@ -3316,7 +3330,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// the peer if the header turns out to be for an invalid block.
// Note that if a peer tries to build on an invalid chain, that
// will be detected and the peer will be disconnected/discouraged.
return ProcessHeadersMessage(pfrom, {cmpctblock.header}, /*via_compact_block=*/true);
return ProcessHeadersMessage(pfrom, *peer, {cmpctblock.header}, /*via_compact_block=*/true);
}

if (fBlockReconstructed) {
Expand Down Expand Up @@ -3459,7 +3473,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
ReadCompactSize(vRecv); // ignore tx count; assume it is 0.
}

return ProcessHeadersMessage(pfrom, headers, /*via_compact_block=*/false);
return ProcessHeadersMessage(pfrom, *peer, headers, /*via_compact_block=*/false);
}

if (msg_type == NetMsgType::BLOCK)
Expand Down Expand Up @@ -4067,6 +4081,7 @@ class CompareInvMempoolOrder

bool PeerManager::SendMessages(CNode* pto)
{
PeerRef peer = GetPeerRef(pto->GetId());
const Consensus::Params& consensusParams = m_chainparams.GetConsensus();

// We must call MaybeDiscourageAndDisconnect first, to ensure that we'll
Expand Down Expand Up @@ -4192,7 +4207,7 @@ bool PeerManager::SendMessages(CNode* pto)
got back an empty response. */
if (pindexStart->pprev)
pindexStart = pindexStart->pprev;
LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), pto->nStartingHeight);
LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), peer->m_starting_height);
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexStart), uint256()));
}
}
Expand All @@ -4208,11 +4223,11 @@ bool PeerManager::SendMessages(CNode* pto)
// If no header would connect, or if we have too many
// blocks, or if the peer doesn't want headers, just
// add all to the inv queue.
LOCK(pto->cs_inventory);
LOCK(peer->m_block_inv_mutex);
std::vector<CBlock> vHeaders;
bool fRevertToInv = ((!state.fPreferHeaders &&
(!state.fPreferHeaderAndIDs || pto->vBlockHashesToAnnounce.size() > 1)) ||
pto->vBlockHashesToAnnounce.size() > MAX_BLOCKS_TO_ANNOUNCE);
(!state.fPreferHeaderAndIDs || peer->m_blocks_for_headers_relay.size() > 1)) ||
peer->m_blocks_for_headers_relay.size() > MAX_BLOCKS_TO_ANNOUNCE);
const CBlockIndex *pBestIndex = nullptr; // last header queued for delivery
ProcessBlockAvailability(pto->GetId()); // ensure pindexBestKnownBlock is up-to-date

Expand All @@ -4221,7 +4236,7 @@ bool PeerManager::SendMessages(CNode* pto)
// Try to find first header that our peer doesn't have, and
// then send all headers past that one. If we come across any
// headers that aren't on ::ChainActive(), give up.
for (const uint256 &hash : pto->vBlockHashesToAnnounce) {
for (const uint256& hash : peer->m_blocks_for_headers_relay) {
const CBlockIndex* pindex = LookupBlockIndex(hash);
assert(pindex);
if (::ChainActive()[pindex->nHeight] != pindex) {
Expand All @@ -4238,7 +4253,7 @@ bool PeerManager::SendMessages(CNode* pto)
// which should be caught by the prior check), but one
// way this could happen is by using invalidateblock /
// reconsiderblock repeatedly on the tip, causing it to
// be added multiple times to vBlockHashesToAnnounce.
// be added multiple times to m_blocks_for_headers_relay.
// Robustly deal with this rare situation by reverting
// to an inv.
fRevertToInv = true;
Expand Down Expand Up @@ -4310,10 +4325,10 @@ bool PeerManager::SendMessages(CNode* pto)
}
if (fRevertToInv) {
// If falling back to using an inv, just try to inv the tip.
// The last entry in vBlockHashesToAnnounce was our tip at some point
// The last entry in m_blocks_for_headers_relay was our tip at some point
// in the past.
if (!pto->vBlockHashesToAnnounce.empty()) {
const uint256 &hashToAnnounce = pto->vBlockHashesToAnnounce.back();
if (!peer->m_blocks_for_headers_relay.empty()) {
const uint256& hashToAnnounce = peer->m_blocks_for_headers_relay.back();
const CBlockIndex* pindex = LookupBlockIndex(hashToAnnounce);
assert(pindex);

Expand All @@ -4327,32 +4342,32 @@ bool PeerManager::SendMessages(CNode* pto)

// If the peer's chain has this block, don't inv it back.
if (!PeerHasHeader(&state, pindex)) {
pto->vInventoryBlockToSend.push_back(hashToAnnounce);
peer->m_blocks_for_inv_relay.push_back(hashToAnnounce);
LogPrint(BCLog::NET, "%s: sending inv peer=%d hash=%s\n", __func__,
pto->GetId(), hashToAnnounce.ToString());
}
}
}
pto->vBlockHashesToAnnounce.clear();
peer->m_blocks_for_headers_relay.clear();
}

//
// Message: inventory
//
std::vector<CInv> vInv;
{
LOCK(pto->cs_inventory);
vInv.reserve(std::max<size_t>(pto->vInventoryBlockToSend.size(), INVENTORY_BROADCAST_MAX));
LOCK(peer->m_block_inv_mutex);
vInv.reserve(std::max<size_t>(peer->m_blocks_for_inv_relay.size(), INVENTORY_BROADCAST_MAX));

// Add blocks
for (const uint256& hash : pto->vInventoryBlockToSend) {
for (const uint256& hash : peer->m_blocks_for_inv_relay) {
vInv.push_back(CInv(MSG_BLOCK, hash));
if (vInv.size() == MAX_INV_SZ) {
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
vInv.clear();
}
}
pto->vInventoryBlockToSend.clear();
peer->m_blocks_for_inv_relay.clear();

if (pto->m_tx_relay != nullptr) {
LOCK(pto->m_tx_relay->cs_tx_inventory);
Expand Down
Loading