Skip to content

Commit

Permalink
p2p: Cache inbound reconciling peers count
Browse files Browse the repository at this point in the history
It helps to avoid recomputing every time we consider
a transaction for fanout/reconciliation.
  • Loading branch information
naumenkogs committed Jan 22, 2024
1 parent baf722b commit 82126d3
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1610,7 +1610,7 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
}
m_orphanage.EraseForPeer(nodeid);
m_txrequest.DisconnectedPeer(nodeid);
if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid);
if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid, node.IsInboundConn());
m_num_preferred_download_peers -= state->fPreferredDownload;
m_peers_downloading_from -= (!state->vBlocksInFlight.empty());
assert(m_peers_downloading_from >= 0);
Expand Down Expand Up @@ -3623,7 +3623,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// We could have optimistically pre-registered/registered the peer. In that case,
// we should forget about the reconciliation state here if this wasn't followed
// by WTXIDRELAY (since WTXIDRELAY can't be announced later).
m_txreconciliation->ForgetPeer(pfrom.GetId());
m_txreconciliation->ForgetPeer(pfrom.GetId(), pfrom.IsInboundConn());
}
}

Expand Down
35 changes: 22 additions & 13 deletions src/node/txreconciliation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ class TxReconciliationTracker::Impl
*/
std::unordered_map<NodeId, std::variant<uint64_t, TxReconciliationState>> m_states GUARDED_BY(m_txreconciliation_mutex);

/*
* Keeps track of how many of the registered peers are inbound. Updated on registering or
* forgetting peers.
*/
size_t m_inbounds_count GUARDED_BY(m_txreconciliation_mutex){0};

/*
* A least-recently-added cache tracking which peers we should fanout a transaction to.
*
Expand Down Expand Up @@ -170,6 +176,9 @@ class TxReconciliationTracker::Impl
m_states.erase(recon_state);
m_states.emplace(peer_id, std::move(new_state));

if (is_peer_inbound && m_inbounds_count < std::numeric_limits<size_t>::max()) {
++m_inbounds_count;
}
return ReconciliationRegisterResult::SUCCESS;
}

Expand Down Expand Up @@ -214,13 +223,20 @@ class TxReconciliationTracker::Impl
return peer_state->m_local_set.erase(wtxid_to_remove) > 0;
}

void ForgetPeer(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
void ForgetPeer(NodeId peer_id, bool is_peer_inbound) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
LOCK(m_txreconciliation_mutex);
if (m_states.erase(peer_id)) {
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Forget txreconciliation state of peer=%d\n", peer_id);
const auto peer = m_states.find(peer_id);
if (peer == m_states.end()) return;
const bool registered = std::holds_alternative<TxReconciliationState>(peer->second);
m_states.erase(peer);

if (registered && is_peer_inbound) {
Assert(m_inbounds_count > 0);
--m_inbounds_count;
}
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Forget txreconciliation state of peer=%d\n", peer_id);
}

/**
Expand Down Expand Up @@ -315,16 +331,9 @@ class TxReconciliationTracker::Impl
if (peer_state->m_we_initiate) {
destinations = OUTBOUND_FANOUT_DESTINATIONS - outbounds_nonrcncl_tx_relay;
} else {
const size_t inbound_rcncl_peers = std::count_if(m_states.begin(), m_states.end(),
[](const auto& indexed_state) {
const auto* cur_state = std::get_if<TxReconciliationState>(&indexed_state.second);
if (cur_state) return !cur_state->m_we_initiate;
return false;
});

// Since we use the fraction for inbound peers, we first need to compute the total
// number of inbound targets.
const double inbound_targets = (inbounds_nonrcncl_tx_relay + inbound_rcncl_peers) * INBOUND_FANOUT_DESTINATIONS_FRACTION;
const double inbound_targets = (inbounds_nonrcncl_tx_relay + m_inbounds_count) * INBOUND_FANOUT_DESTINATIONS_FRACTION;
destinations = inbound_targets - inbounds_nonrcncl_tx_relay;
}

Expand Down Expand Up @@ -363,9 +372,9 @@ bool TxReconciliationTracker::TryRemovingFromSet(NodeId peer_id, const Wtxid& wt
return m_impl->TryRemovingFromSet(peer_id, wtxid_to_remove);
}

void TxReconciliationTracker::ForgetPeer(NodeId peer_id)
void TxReconciliationTracker::ForgetPeer(NodeId peer_id, bool is_peer_inbound)
{
m_impl->ForgetPeer(peer_id);
m_impl->ForgetPeer(peer_id, is_peer_inbound);
}

bool TxReconciliationTracker::IsPeerRegistered(NodeId peer_id) const
Expand Down
2 changes: 1 addition & 1 deletion src/node/txreconciliation.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class TxReconciliationTracker
* Attempts to forget txreconciliation-related state of the peer (if we previously stored any).
* After this, we won't be able to reconcile transactions with the peer.
*/
void ForgetPeer(NodeId peer_id);
void ForgetPeer(NodeId peer_id, bool is_peer_inbound);

/**
* Check if a peer is registered to reconcile transactions with us.
Expand Down
16 changes: 10 additions & 6 deletions src/test/txreconciliation_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,21 @@ BOOST_AUTO_TEST_CASE(ForgetPeerTest)
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher);
NodeId peer_id0 = 0;

// Removing peer which is not there works.
tracker.ForgetPeer(peer_id0, true);
tracker.ForgetPeer(peer_id0, false);

// Removing peer after pre-registring works and does not let to register the peer.
tracker.PreRegisterPeer(peer_id0);
tracker.ForgetPeer(peer_id0);
tracker.ForgetPeer(peer_id0, true);
BOOST_CHECK_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::NOT_FOUND);

// Removing peer after it is registered works.
tracker.PreRegisterPeer(peer_id0);
BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0));
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
BOOST_CHECK(tracker.IsPeerRegistered(peer_id0));
tracker.ForgetPeer(peer_id0);
tracker.ForgetPeer(peer_id0, true);
BOOST_CHECK(!tracker.IsPeerRegistered(peer_id0));
}

Expand All @@ -80,7 +84,7 @@ BOOST_AUTO_TEST_CASE(IsPeerRegisteredTest)
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
BOOST_CHECK(tracker.IsPeerRegistered(peer_id0));

tracker.ForgetPeer(peer_id0);
tracker.ForgetPeer(peer_id0, true);
BOOST_CHECK(!tracker.IsPeerRegistered(peer_id0));
}

Expand All @@ -102,7 +106,7 @@ BOOST_AUTO_TEST_CASE(AddToSetTest)

BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid));

tracker.ForgetPeer(peer_id0);
tracker.ForgetPeer(peer_id0, true);
Wtxid wtxid2{Wtxid::FromUint256(frc.rand256())};
BOOST_REQUIRE(!tracker.AddToSet(peer_id0, wtxid2));

Expand Down Expand Up @@ -138,7 +142,7 @@ BOOST_AUTO_TEST_CASE(TryRemovingFromSetTest)
BOOST_REQUIRE(!tracker.TryRemovingFromSet(peer_id0, wtxid));

BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid));
tracker.ForgetPeer(peer_id0);
tracker.ForgetPeer(peer_id0, true);
BOOST_REQUIRE(!tracker.TryRemovingFromSet(peer_id0, wtxid));
}

Expand Down Expand Up @@ -178,7 +182,7 @@ BOOST_AUTO_TEST_CASE(ShouldFanoutToTest)
/*inbounds_nonrcncl_tx_relay=*/0, /*outbounds_nonrcncl_tx_relay=*/1));
}

tracker.ForgetPeer(peer_id0);
tracker.ForgetPeer(peer_id0, false);
// A forgotten (reconciliation-wise) peer should be always selected for fanout again.
for (int i = 0; i < 100; ++i) {
BOOST_CHECK(tracker.ShouldFanoutTo(Wtxid::FromUint256(frc.rand256()), peer_id0,
Expand Down

0 comments on commit 82126d3

Please sign in to comment.