Skip to content

Commit

Permalink
Interrupt orphan processing after every transaction
Browse files Browse the repository at this point in the history
This makes orphan processing work like handling getdata messages:
After every actual transaction validation attempt, interrupt
processing to deal with messages arriving from other peers.
  • Loading branch information
sipa committed Mar 22, 2019
1 parent 5314a48 commit ebf3331
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
4 changes: 4 additions & 0 deletions src/net.h
Expand Up @@ -16,6 +16,7 @@
#include <limitedmap.h>
#include <netaddress.h>
#include <policy/feerate.h>
#include <primitives/transaction.h>
#include <protocol.h>
#include <random.h>
#include <streams.h>
Expand Down Expand Up @@ -739,6 +740,9 @@ class CNode
CAmount lastSentFeeFilter{0};
int64_t nextSendTimeFeeFilter{0};

std::deque<std::set<CTransactionRef>::iterator> orphan_work_queue;
std::set<CTransactionRef> orphan_work_set;

CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn = "", bool fInboundIn = false);
~CNode();
CNode(const CNode&) = delete;
Expand Down
23 changes: 16 additions & 7 deletions src/net_processing.cpp
Expand Up @@ -1718,7 +1718,8 @@ void static ProcessOrphanTx(CConnman* connman, std::deque<std::set<CTransactionR
AssertLockHeld(cs_main);
AssertLockHeld(g_cs_orphans);
std::set<NodeId> setMisbehaving;
while (!orphan_work_queue.empty()) {
bool done = false;
while (!done && !orphan_work_queue.empty()) {
auto workset_it = orphan_work_queue.front();
orphan_work_queue.pop_front();
CTransactionRef porphanTx = *workset_it;
Expand Down Expand Up @@ -1750,6 +1751,7 @@ void static ProcessOrphanTx(CConnman* connman, std::deque<std::set<CTransactionR
}
}
EraseOrphanTx(orphanHash);
done = true;
} else if (!fMissingInputs2) {
int nDos = 0;
if (stateDummy.IsInvalid(nDos) && nDos > 0) {
Expand All @@ -1769,6 +1771,7 @@ void static ProcessOrphanTx(CConnman* connman, std::deque<std::set<CTransactionR
recentRejects->insert(orphanHash);
}
EraseOrphanTx(orphanHash);
done = true;
}
mempool.check(pcoinsTip.get());
}
Expand Down Expand Up @@ -2403,9 +2406,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
return true;
}

std::set<CTransactionRef> orphan_work_set;
std::deque<std::set<CTransactionRef>::iterator> orphan_work_queue;

CTransactionRef ptx;
vRecv >> ptx;
const CTransaction& tx = *ptx;
Expand Down Expand Up @@ -2433,8 +2433,8 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
auto ins = orphan_work_set.insert(elem->second.tx);
if (ins.second) orphan_work_queue.push_back(ins.first);
auto ins = pfrom->orphan_work_set.insert(elem->second.tx);
if (ins.second) pfrom->orphan_work_queue.push_back(ins.first);
}
}
}
Expand All @@ -2447,7 +2447,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.size(), mempool.DynamicMemoryUsage() / 1000);

// Recursively process any orphan transactions that depended on this one
ProcessOrphanTx(connman, orphan_work_queue, orphan_work_set, lRemovedTxn);
ProcessOrphanTx(connman, pfrom->orphan_work_queue, pfrom->orphan_work_set, lRemovedTxn);
}
else if (fMissingInputs)
{
Expand Down Expand Up @@ -3179,11 +3179,20 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams, connman, interruptMsgProc);

if (!pfrom->orphan_work_queue.empty()) {
std::list<CTransactionRef> removed_txn;
ProcessOrphanTx(connman, pfrom->orphan_work_queue, pfrom->orphan_work_set, removed_txn);
for (const CTransactionRef& removedTx : removed_txn) {
AddToCompactExtraTransactions(removedTx);
}
}

if (pfrom->fDisconnect)
return false;

// this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return true;
if (!pfrom->orphan_work_queue.empty()) return true;

// Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend)
Expand Down

0 comments on commit ebf3331

Please sign in to comment.