Skip to content

Commit

Permalink
p2p: Cache fanout candidates to optimize txreconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
naumenkogs committed Jan 19, 2024
1 parent 4b9e5dc commit baf722b
Showing 1 changed file with 45 additions and 12 deletions.
57 changes: 45 additions & 12 deletions src/node/txreconciliation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ constexpr size_t OUTBOUND_FANOUT_DESTINATIONS = 1;
*/
constexpr size_t MAX_SET_SIZE = 3000;

/**
* Maximum number of transactions for which we store assigned fanout targets.
*/
constexpr size_t FANOUT_TARGETS_PER_TX_CACHE_SIZE = 3000;
/**
* Salt (specified by BIP-330) constructed from contributions from both peers. It is used
* to compute transaction short IDs, which are then used to construct a sketch representing a set
Expand Down Expand Up @@ -91,7 +95,19 @@ class TxReconciliationTracker::Impl
*/
std::unordered_map<NodeId, std::variant<uint64_t, TxReconciliationState>> m_states GUARDED_BY(m_txreconciliation_mutex);

// Used for randomly choosing fanout targets.
/*
* A least-recently-added cache tracking which peers we should fanout a transaction to.
*
* Since the time between cache accesses is on the order of seconds, returning an outdated
* set of peers is not a concern (especially since we fanout to outbound peers, which should
* be hard to manipulate).
*
* No need to use LRU (bump transaction order upon access) because in most cases
* transactions are processed almost-sequentially.
*/
std::deque<Wtxid> m_tx_fanout_targets_cache_order;
std::map<Wtxid, std::vector<NodeId>> m_tx_fanout_targets_cache_data GUARDED_BY(m_txreconciliation_mutex);
// Used for randomly choosing fanout targets and determines the corresponding cache.
CSipHasher m_deterministic_randomizer;

TxReconciliationState* GetRegisteredPeerState(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
Expand Down Expand Up @@ -219,8 +235,14 @@ class TxReconciliationTracker::Impl
std::holds_alternative<TxReconciliationState>(recon_state->second));
}

bool IsFanoutTarget(const Wtxid& wtxid, NodeId peer_id, bool we_initiate, double limit) const EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
// Not const because of caching.
bool IsFanoutTarget(const Wtxid& wtxid, NodeId peer_id, bool we_initiate, double limit) EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
{
auto fanout_candidates = m_tx_fanout_targets_cache_data.find(wtxid);
if (fanout_candidates != m_tx_fanout_targets_cache_data.end()) {
return std::binary_search(fanout_candidates->second.begin(), fanout_candidates->second.end(), peer_id);
}

// We use the pre-determined randomness to give a consistent result per transaction,
// thus making sure that no transaction gets "unlucky" if every per-peer roll fails.
CSipHasher deterministic_randomizer{m_deterministic_randomizer};
Expand Down Expand Up @@ -248,20 +270,31 @@ class TxReconciliationTracker::Impl
}
}

// Sort by the assigned key.
std::sort(best_peers.begin(), best_peers.end());

auto it = best_peers.begin();
for (size_t i = 0; i < targets_size && it != best_peers.end(); ++i, ++it) {
if (it->second == peer_id) return true;
best_peers.resize(targets_size);

std::vector<NodeId> new_fanout_candidates;
new_fanout_candidates.reserve(targets_size);
for_each(best_peers.begin(), best_peers.end(),
[&new_fanout_candidates](auto& keyed_peer) { new_fanout_candidates.push_back(keyed_peer.second); });

// Sort by NodeId.
std::sort(new_fanout_candidates.begin(), new_fanout_candidates.end());
bool found = std::binary_search(new_fanout_candidates.begin(), new_fanout_candidates.end(), peer_id);

// If the cache is full, make room for the new entry.
if (m_tx_fanout_targets_cache_order.size() == FANOUT_TARGETS_PER_TX_CACHE_SIZE) {
auto expired_tx = m_tx_fanout_targets_cache_order.front();
m_tx_fanout_targets_cache_data.erase(expired_tx);
m_tx_fanout_targets_cache_order.pop_front();
}
return false;
m_tx_fanout_targets_cache_data.emplace(wtxid, std::move(new_fanout_candidates));
m_tx_fanout_targets_cache_order.push_back(wtxid);

return found;
}

/*
* TODO. This is currently not marked const, because it would require adding a const version
* of GetRegisteredPeerState. However, this function becomes non-const in the next commit anyway
* due to caching. Thus, we can save us adding that extra function for now.
*/
bool ShouldFanoutTo(const Wtxid& wtxid, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay)
EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
Expand Down

0 comments on commit baf722b

Please sign in to comment.