Skip to content

Commit

Permalink
p2p: Add transactions to reconciliation sets
Browse files Browse the repository at this point in the history
Transactions eligible for reconciliation are added to the
reconciliation sets. For the remaining txs, low-fanout is used.

Co-authored-by: Martin Zumsande <mzumsande@gmail.com>
  • Loading branch information
naumenkogs and mzumsande committed Nov 2, 2023
1 parent 8c658b9 commit 983f8c6
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 2 deletions.
63 changes: 61 additions & 2 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1};
static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND};
/** The compactblocks version we support. See BIP 152. */
static constexpr uint64_t CMPCTBLOCKS_VERSION{2};
/** Used to determine whether to use low-fanout flooding (or reconciliation) for a tx relay event. */
static const uint64_t RANDOMIZER_ID_FANOUTTARGET = 0xbac89af818407b6aULL; // SHA256("fanouttarget")[0:8]

// Internal stuff
namespace {
Expand Down Expand Up @@ -3911,6 +3913,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
if (!fAlreadyHave && !m_chainman.IsInitialBlockDownload()) {
AddTxAnnouncement(pfrom, gtxid, current_time);
}
if (m_txreconciliation && gtxid.IsWtxid()) {
m_txreconciliation->TryRemovingFromSet(pfrom.GetId(), gtxid.GetHash());
}
} else {
LogPrint(BCLog::NET, "Unknown inv type \"%s\" received from peer=%d\n", inv.ToString(), pfrom.GetId());
}
Expand Down Expand Up @@ -4197,6 +4202,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
m_txrequest.ReceivedResponse(pfrom.GetId(), txid);
if (tx.HasWitness()) m_txrequest.ReceivedResponse(pfrom.GetId(), wtxid);

if (m_txreconciliation) m_txreconciliation->TryRemovingFromSet(pfrom.GetId(), wtxid);

// We do the AlreadyHaveTx() check using wtxid, rather than txid - in the
// absence of witness malleation, this is strictly better, because the
// recent rejects filter may contain the wtxid but rarely contains
Expand Down Expand Up @@ -5752,9 +5759,12 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
}

if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
// Lock way before it's used to maintain lock ordering.
LOCK2(m_mempool.cs, m_peer_mutex);
LOCK(tx_relay->m_tx_inventory_mutex);
// Check whether periodic sends should happen
bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan);
const bool reconciles_txs = m_txreconciliation && m_txreconciliation->IsPeerRegistered(pto->GetId());
if (tx_relay->m_next_inv_send_time < current_time) {
fSendTrickle = true;
if (pto->IsInboundConn()) {
Expand Down Expand Up @@ -5814,6 +5824,28 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// No reason to drain out at many times the network's capacity,
// especially since we have many peers and some will draw much shorter delays.
unsigned int nRelayedTransactions = 0;

size_t inbounds_nonrcncl_tx_relay = 0, outbounds_nonrcncl_tx_relay = 0;
if (m_txreconciliation) {
for (auto [cur_peer_id, cur_peer] : m_peer_map) {
// Skip the source of the transaction.
if (cur_peer_id == pto->GetId()) continue;
const auto cur_state{State(cur_peer_id)};
if (!cur_state) continue;
if (auto peer_tx_relay = cur_peer->GetTxRelay()) {
LOCK(peer_tx_relay->m_bloom_filter_mutex);
// When we consider to which (and how many) Erlay peers
// we should fanout a tx, we must know to how
// many peers we would certainly announce this tx
// (non-Erlay peers).
if (peer_tx_relay->m_relay_txs && !m_txreconciliation->IsPeerRegistered(cur_peer_id)) {
inbounds_nonrcncl_tx_relay += cur_state->m_is_inbound;
outbounds_nonrcncl_tx_relay += !cur_state->m_is_inbound;
}
}
}
}

LOCK(tx_relay->m_bloom_filter_mutex);
size_t broadcast_max{INVENTORY_BROADCAST_TARGET + (tx_relay->m_tx_inventory_to_send.size()/1000)*5};
broadcast_max = std::min<size_t>(INVENTORY_BROADCAST_MAX, broadcast_max);
Expand Down Expand Up @@ -5841,7 +5873,35 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
}
if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
// Send
vInv.push_back(inv);
bool fanout = true;
const auto wtxid = txinfo.tx->GetWitnessHash();
if (reconciles_txs) {
auto txiter = m_mempool.GetIter(txinfo.tx->GetHash());
if (txiter) {
if ((*txiter)->GetCountWithDescendants() > 1) {
// If a transaction has in-mempool children, always fanout it.
// Until package relay is implemented, this is needed to avoid
// breaking parent+child relay expectations in some cases.
//
// Potentially reconciling parent+child would mean that for every
// child we need to to check if any of the parents is currently
// reconciled so that the child isn't fanouted ahead. But then
// it gets tricky when reconciliation sets are full: a) the child
// can't just be added; b) removing parents from reconciliation
// sets for this one child is not good either.
fanout = true;
} else {
auto fanout_randomizer = m_connman.GetDeterministicRandomizer(RANDOMIZER_ID_FANOUTTARGET);
fanout = m_txreconciliation->ShouldFanoutTo(wtxid, fanout_randomizer, pto->GetId(),
inbounds_nonrcncl_tx_relay, outbounds_nonrcncl_tx_relay);
}
}
}

if (fanout || !m_txreconciliation->AddToSet(pto->GetId(), wtxid)) {
vInv.push_back(inv);
}

nRelayedTransactions++;
if (vInv.size() == MAX_INV_SZ) {
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
Expand All @@ -5851,7 +5911,6 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
}

// Ensure we'll respond to GETDATA requests for anything we've just announced
LOCK(m_mempool.cs);
tx_relay->m_last_inv_sequence = m_mempool.GetSequence();
}
}
Expand Down
104 changes: 104 additions & 0 deletions src/node/txreconciliation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
#include <common/system.h>
#include <logging.h>
#include <util/check.h>
#include <util/hasher.h>

#include <cmath>
#include <unordered_map>
#include <variant>

Expand All @@ -17,6 +19,16 @@ namespace {
/** Static salt component used to compute short txids for sketch construction, see BIP-330. */
const std::string RECON_STATIC_SALT = "Tx Relay Salting";
const HashWriter RECON_SALT_HASHER = TaggedHash(RECON_STATIC_SALT);
/**
* Announce transactions via full wtxid to a limited number of inbound and outbound peers.
* Justification for these values are provided here:
* https://github.com/naumenkogs/txrelaysim/issues/7#issuecomment-902165806 */
constexpr double INBOUND_FANOUT_DESTINATIONS_FRACTION = 0.1;
constexpr size_t OUTBOUND_FANOUT_DESTINATIONS = 1;
/**
* Maximum number of wtxids stored in a peer local set, bounded to protect the memory use of
* reconciliation sets and short ids mappings, and CPU used for sketch computation.
*/
constexpr size_t MAX_SET_SIZE = 3000;
/**
* Salt (specified by BIP-330) constructed from contributions from both peers. It is used
Expand Down Expand Up @@ -73,6 +85,11 @@ class TxReconciliationTracker::Impl
private:
mutable Mutex m_txreconciliation_mutex;

/**
* ReconciliationTracker-wide randomness to choose fanout targets for a given txid.
*/
const SaltedTxidHasher m_txid_hasher;

// Local protocol version
uint32_t m_recon_version;

Expand Down Expand Up @@ -193,6 +210,86 @@ class TxReconciliationTracker::Impl
LOCK(m_txreconciliation_mutex);
return IsPeerRegistered(peer_id);
}

std::vector<NodeId> GetFanoutTargets(CSipHasher& deterministic_randomizer_with_wtxid,
bool we_initiate, double limit) const EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
{
// The algorithm works as follows. We iterate through the peers (of a given direction)
// hashing them with the given wtxid, and sort them by this hash.
// We then consider top `limit` peers to be low-fanout flood targets.
// The randomness should be seeded with wtxid to return consistent results for every call.

double integer_part;
double fractional_peer = std::modf(limit, &integer_part);
// Handle fractional value.
const bool add_extra = deterministic_randomizer_with_wtxid.Finalize() > fractional_peer * double(UINT64_MAX);
const size_t targets = add_extra ? size_t(integer_part): size_t(integer_part) + 1;

auto cmp_by_key = [](const std::pair<uint64_t, NodeId>& left, const std::pair<uint64_t, NodeId>& right) {
return left.first > right.first;
};

std::set<std::pair<uint64_t, NodeId>, decltype(cmp_by_key)> best_peers(cmp_by_key);

for (auto indexed_state : m_states) {
const auto cur_state = std::get_if<TxReconciliationState>(&indexed_state.second);
if (cur_state && cur_state->m_we_initiate == we_initiate) {
uint64_t hash_key = deterministic_randomizer_with_wtxid.Write(cur_state->m_k0).Finalize();
best_peers.insert(std::make_pair(hash_key, indexed_state.first));
}
}

std::vector<NodeId> result;
auto it = best_peers.begin();
for (size_t i = 0; i < targets && it != best_peers.end(); ++i, ++it) {
result.push_back(it->second);
}
return result;
}

bool ShouldFanoutTo(const uint256& wtxid, CSipHasher deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay)
const EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
LOCK(m_txreconciliation_mutex);
if (!IsPeerRegistered(peer_id)) return true;
// We use the pre-determined randomness to give a consistent result per transaction,
// thus making sure that no transaction gets "unlucky" if every per-peer roll fails.
deterministic_randomizer.Write(wtxid.GetUint64(0));
const auto& recon_state = std::get<TxReconciliationState>(m_states.find(peer_id)->second);
// We decide whether a particular peer is a low-fanout flood target differently
// based on its connection direction:
// - for outbounds we have a fixed number of flood destinations;
// - for inbounds we use a fraction of all inbound peers supporting tx relay.
//
// We first decide how many reconciling peers of a given direction we want to flood to,
// and then generate a list of peers of that size for a given transaction. We then see
// whether the given peer falls into this list.
double destinations;
if (recon_state.m_we_initiate) {
destinations = OUTBOUND_FANOUT_DESTINATIONS - outbounds_nonrcncl_tx_relay;
} else {
const size_t inbound_rcncl_peers = std::count_if(m_states.begin(), m_states.end(),
[](std::pair<NodeId, std::variant<uint64_t, TxReconciliationState>> indexed_state) {
const auto* cur_state = std::get_if<TxReconciliationState>(&indexed_state.second);
if (cur_state) return !cur_state->m_we_initiate;
return false;
});

// Since we use the fraction for inbound peers, we first need to compute the total
// number of inbound targets.
const double inbound_targets = (inbounds_nonrcncl_tx_relay + inbound_rcncl_peers) * INBOUND_FANOUT_DESTINATIONS_FRACTION;
destinations = inbound_targets - inbounds_nonrcncl_tx_relay;
}

if (destinations < 0.01) {
return false;
}

auto fanout_candidates = GetFanoutTargets(deterministic_randomizer, recon_state.m_we_initiate, destinations);
return std::count(fanout_candidates.begin(), fanout_candidates.end(), peer_id);
}
};

TxReconciliationTracker::TxReconciliationTracker(uint32_t recon_version) : m_impl{std::make_unique<TxReconciliationTracker::Impl>(recon_version)} {}
Expand Down Expand Up @@ -229,3 +326,10 @@ bool TxReconciliationTracker::IsPeerRegistered(NodeId peer_id) const
{
return m_impl->IsPeerRegisteredExternal(peer_id);
}

bool TxReconciliationTracker::ShouldFanoutTo(const uint256& wtxid, CSipHasher deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay) const
{
return m_impl->ShouldFanoutTo(wtxid, deterministic_randomizer, peer_id,
inbounds_nonrcncl_tx_relay, outbounds_nonrcncl_tx_relay);
}
6 changes: 6 additions & 0 deletions src/node/txreconciliation.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ class TxReconciliationTracker
* Check if a peer is registered to reconcile transactions with us.
*/
bool IsPeerRegistered(NodeId peer_id) const;

/**
* Returns whether the peer is chosen as a low-fanout destination for a given tx.
*/
bool ShouldFanoutTo(const uint256& wtxid, CSipHasher deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay) const;
};

#endif // BITCOIN_NODE_TXRECONCILIATION_H
71 changes: 71 additions & 0 deletions src/test/txreconciliation_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,75 @@ BOOST_AUTO_TEST_CASE(IsPeerRegisteredTest)
BOOST_CHECK(!tracker.IsPeerRegistered(peer_id0));
}

BOOST_AUTO_TEST_CASE(ShouldFanoutToTest)
{
TxReconciliationTracker tracker(1);
NodeId peer_id0 = 0;
CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);

// If peer is not registered for reconciliation, it should be always chosen for flooding.
BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0));
for (int i = 0; i < 100; ++i) {
BOOST_CHECK(tracker.ShouldFanoutTo(GetRandHash(), hasher, peer_id0,
/*inbounds_nonrcncl_tx_relay=*/0, /*outbounds_nonrcncl_tx_relay=*/0));
}

tracker.PreRegisterPeer(peer_id0);
BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0));
// Same after pre-registering.
for (int i = 0; i < 100; ++i) {
BOOST_CHECK(tracker.ShouldFanoutTo(GetRandHash(), hasher, peer_id0,
/*inbounds_nonrcncl_tx_relay=*/0, /*outbounds_nonrcncl_tx_relay=*/0));
}

// Once the peer is registered, it should be selected for flooding of some transactions.
// Since there is only one reconciling peer, it will be selected for all transactions.
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, /*is_peer_inbound=*/false, 1, 1), ReconciliationRegisterResult::SUCCESS);
for (int i = 0; i < 100; ++i) {
BOOST_CHECK(tracker.ShouldFanoutTo(GetRandHash(), hasher, peer_id0,
/*inbounds_nonrcncl_tx_relay=*/0, /*outbounds_nonrcncl_tx_relay=*/0));
}

// Don't select a fanout target if it was already fanouted sufficiently.
for (int i = 0; i < 100; ++i) {
BOOST_CHECK(!tracker.ShouldFanoutTo(GetRandHash(), hasher, peer_id0,
/*inbounds_nonrcncl_tx_relay=*/0, /*outbounds_nonrcncl_tx_relay=*/1));
}

tracker.ForgetPeer(peer_id0);
// A forgotten (reconciliation-wise) peer should be always selected for fanout again.
for (int i = 0; i < 100; ++i) {
BOOST_CHECK(tracker.ShouldFanoutTo(GetRandHash(), hasher, peer_id0,
/*inbounds_nonrcncl_tx_relay=*/0, /*outbounds_nonrcncl_tx_relay=*/0));
}

// Now for inbound connections.
for (int i = 1; i < 31; ++i) {
tracker.PreRegisterPeer(i);
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(i, /*is_peer_inbound=*/true, 1, 1), ReconciliationRegisterResult::SUCCESS);
}

// Relay to a fraction of registered inbound peers.
for (int j = 0; j < 100; ++j) {
size_t total_fanouted = 0;
auto wtxid = GetRandHash();
for (int i = 1; i < 31; ++i) {
total_fanouted += tracker.ShouldFanoutTo(wtxid, hasher, i,
/*inbounds_nonrcncl_tx_relay=*/0, /*outbounds_nonrcncl_tx_relay=*/0);
}
BOOST_CHECK_EQUAL(total_fanouted, 3);
}

// Don't relay if there is sufficient non-reconciling peers
for (int j = 0; j < 100; ++j) {
size_t total_fanouted = 0;
for (int i = 1; i < 31; ++i) {
total_fanouted += tracker.ShouldFanoutTo(GetRandHash(), hasher, i,
/*inbounds_nonrcncl_tx_relay=*/4, /*outbounds_nonrcncl_tx_relay=*/0);
}
BOOST_CHECK_EQUAL(total_fanouted, 0);
}

}

BOOST_AUTO_TEST_SUITE_END()

0 comments on commit 983f8c6

Please sign in to comment.