Skip to content

Commit

Permalink
Guard vRecvGetData (now in net processing) with its own mutex
Browse files Browse the repository at this point in the history
Summary:
This requires slightly reorganizing the logic in GETBLOCKTXN to
maintain locking order.

This is a backport of [[bitcoin/bitcoin#19911 | core#19911]] [5/6]
bitcoin/bitcoin@ba95181

see D10187 for the removal of the `!cs_main` negative lock

Depends on D10529

Test Plan:
```
cmake .. -GNinja -DCMAKE_C_COMPILER=clang -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_BUILD_TYPE=Debug -DENABLE_SANITIZERS=thread
ninja && ninja check check-functional
```

Reviewers: #bitcoin_abc, Fabien

Reviewed By: #bitcoin_abc, Fabien

Subscribers: Fabien

Differential Revision: https://reviews.bitcoinabc.org/D10530
  • Loading branch information
narula authored and PiRK committed Nov 25, 2021
1 parent 0ee7f5a commit 95cde14
Showing 1 changed file with 72 additions and 54 deletions.
126 changes: 72 additions & 54 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,10 @@ struct Peer {
*/
std::set<TxId> m_orphan_work_set GUARDED_BY(g_cs_orphans);

/** Protects vRecvGetData **/
Mutex m_getdata_requests_mutex;
/** Work queue of items requested by this peer **/
std::deque<CInv> vRecvGetData;
std::deque<CInv> vRecvGetData GUARDED_BY(m_getdata_requests_mutex);

Peer(NodeId id) : m_id(id) {}
};
Expand Down Expand Up @@ -2164,18 +2166,14 @@ FindProofForGetData(const CNode &peer, const avalanche::ProofId &proofid,
return nullptr;
}

static void ProcessGetData(const Config &config, CNode &pfrom,
static void ProcessGetData(const Config &config, CNode &pfrom, Peer &peer,
CConnman &connman, CTxMemPool &mempool,
const std::atomic<bool> &interruptMsgProc)
LOCKS_EXCLUDED(cs_main) {
EXCLUSIVE_LOCKS_REQUIRED(peer.m_getdata_requests_mutex)
LOCKS_EXCLUDED(::cs_main) {
AssertLockNotHeld(cs_main);

PeerRef peer = GetPeerRef(pfrom.GetId());
if (peer == nullptr) {
return;
}

std::deque<CInv>::iterator it = peer->vRecvGetData.begin();
std::deque<CInv>::iterator it = peer.vRecvGetData.begin();
std::vector<CInv> vNotFound;
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());

Expand All @@ -2189,7 +2187,7 @@ static void ProcessGetData(const Config &config, CNode &pfrom,
// Process as many TX or AVA_PROOF items from the front of the getdata
// queue as possible, since they're common and it's efficient to batch
// process them.
while (it != peer->vRecvGetData.end()) {
while (it != peer.vRecvGetData.end()) {
if (interruptMsgProc) {
return;
}
Expand Down Expand Up @@ -2277,7 +2275,7 @@ static void ProcessGetData(const Config &config, CNode &pfrom,

// Only process one BLOCK item per call, since they're uncommon and can be
// expensive to process.
if (it != peer->vRecvGetData.end() && !pfrom.fPauseSend) {
if (it != peer.vRecvGetData.end() && !pfrom.fPauseSend) {
const CInv &inv = *it++;
if (inv.IsGenBlkMsg()) {
ProcessGetBlockData(config, pfrom, inv, connman, interruptMsgProc);
Expand All @@ -2286,7 +2284,7 @@ static void ProcessGetData(const Config &config, CNode &pfrom,
// and continue processing the queue on the next call.
}

peer->vRecvGetData.erase(peer->vRecvGetData.begin(), it);
peer.vRecvGetData.erase(peer.vRecvGetData.begin(), it);

if (!vNotFound.empty()) {
// Let the peer know that we didn't find what it asked for, so it
Expand Down Expand Up @@ -3411,9 +3409,14 @@ void PeerManager::ProcessMessage(const Config &config, CNode &pfrom,
vInv[0].ToString(), pfrom.GetId());
}

peer->vRecvGetData.insert(peer->vRecvGetData.end(), vInv.begin(),
vInv.end());
ProcessGetData(config, pfrom, m_connman, m_mempool, interruptMsgProc);
{
LOCK(peer->m_getdata_requests_mutex);
peer->vRecvGetData.insert(peer->vRecvGetData.end(), vInv.begin(),
vInv.end());
ProcessGetData(config, pfrom, *peer, m_connman, m_mempool,
interruptMsgProc);
}

return;
}

Expand Down Expand Up @@ -3518,43 +3521,47 @@ void PeerManager::ProcessMessage(const Config &config, CNode &pfrom,
return;
}

LOCK(cs_main);
{
LOCK(cs_main);

const CBlockIndex *pindex = LookupBlockIndex(req.blockhash);
if (!pindex || !pindex->nStatus.hasData()) {
LogPrint(
BCLog::NET,
"Peer %d sent us a getblocktxn for a block we don't have\n",
pfrom.GetId());
return;
}
const CBlockIndex *pindex = LookupBlockIndex(req.blockhash);
if (!pindex || !pindex->nStatus.hasData()) {
LogPrint(
BCLog::NET,
"Peer %d sent us a getblocktxn for a block we don't have\n",
pfrom.GetId());
return;
}

if (pindex->nHeight < ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) {
// If an older block is requested (should never happen in practice,
// but can happen in tests) send a block response instead of a
// blocktxn response. Sending a full block response instead of a
// small blocktxn response is preferable in the case where a peer
// might maliciously send lots of getblocktxn requests to trigger
// expensive disk reads, because it will require the peer to
// actually receive all the data read from disk over the network.
LogPrint(BCLog::NET,
"Peer %d sent us a getblocktxn for a block > %i deep\n",
pfrom.GetId(), MAX_BLOCKTXN_DEPTH);
CInv inv;
inv.type = MSG_BLOCK;
inv.hash = req.blockhash;
peer->vRecvGetData.push_back(inv);
// The message processing loop will go around again (without
// pausing) and we'll respond then (without cs_main)
return;
}
if (pindex->nHeight >=
::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) {
CBlock block;
bool ret = ReadBlockFromDisk(block, pindex,
m_chainparams.GetConsensus());
assert(ret);

CBlock block;
bool ret =
ReadBlockFromDisk(block, pindex, m_chainparams.GetConsensus());
assert(ret);
SendBlockTransactions(pfrom, block, req);
return;
}
}

SendBlockTransactions(pfrom, block, req);
// If an older block is requested (should never happen in practice,
// but can happen in tests) send a block response instead of a
// blocktxn response. Sending a full block response instead of a
// small blocktxn response is preferable in the case where a peer
// might maliciously send lots of getblocktxn requests to trigger
// expensive disk reads, because it will require the peer to
// actually receive all the data read from disk over the network.
LogPrint(BCLog::NET,
"Peer %d sent us a getblocktxn for a block > %i deep\n",
pfrom.GetId(), MAX_BLOCKTXN_DEPTH);
CInv inv;
inv.type = MSG_BLOCK;
inv.hash = req.blockhash;
WITH_LOCK(peer->m_getdata_requests_mutex,
peer->vRecvGetData.push_back(inv));
// The message processing loop will go around again (without pausing)
// and we'll respond then (without cs_main)
return;
}

Expand Down Expand Up @@ -4864,8 +4871,12 @@ bool PeerManager::ProcessMessages(const Config &config, CNode *pfrom,
return false;
}

if (!peer->vRecvGetData.empty()) {
ProcessGetData(config, *pfrom, m_connman, m_mempool, interruptMsgProc);
{
LOCK(peer->m_getdata_requests_mutex);
if (!peer->vRecvGetData.empty()) {
ProcessGetData(config, *pfrom, *peer, m_connman, m_mempool,
interruptMsgProc);
}
}

{
Expand All @@ -4881,9 +4892,13 @@ bool PeerManager::ProcessMessages(const Config &config, CNode *pfrom,

// this maintains the order of responses and prevents vRecvGetData from
// growing unbounded
if (!peer->vRecvGetData.empty()) {
return true;
{
LOCK(peer->m_getdata_requests_mutex);
if (!peer->vRecvGetData.empty()) {
return true;
}
}

{
LOCK(g_cs_orphans);
if (!peer->m_orphan_work_set.empty()) {
Expand Down Expand Up @@ -4961,8 +4976,11 @@ bool PeerManager::ProcessMessages(const Config &config, CNode *pfrom,
return false;
}

if (!peer->vRecvGetData.empty()) {
fMoreWork = true;
{
LOCK(peer->m_getdata_requests_mutex);
if (!peer->vRecvGetData.empty()) {
fMoreWork = true;
}
}
} catch (const std::exception &e) {
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n",
Expand Down

0 comments on commit 95cde14

Please sign in to comment.