Skip to content

Commit

Permalink
Rate limit the processing of rumoured addresses
Browse files Browse the repository at this point in the history
Set for every instance and fix test peer, remove var
  • Loading branch information
Liquid369 committed May 10, 2023
1 parent e1e376e commit 3717978
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 16 deletions.
15 changes: 15 additions & 0 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ static const unsigned int MAX_INV_SZ = 50000;
static const unsigned int MAX_LOCATOR_SZ = 101;
/** The maximum number of addresses from our addrman to return in response to a getaddr message. */
static constexpr size_t MAX_ADDR_TO_SEND = 1000;
/** The maximum rate of address records we're willing to process on average. */
static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1};
/** The soft limit of the address processing token bucket (the regular MAX_ADDR_RATE_PER_SECOND
* based increments won't go above this, but the MAX_ADDR_TO_SEND increment following GETADDR
* is exempt from this limit. */
static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND};
/** Maximum length of incoming protocol messages (no message over 2 MiB is currently acceptable). */
static const unsigned int MAX_PROTOCOL_MESSAGE_LENGTH = 2 * 1024 * 1024;
/** Maximum length of strSubVer in `version` message */
Expand Down Expand Up @@ -722,6 +728,15 @@ class CNode
std::set<uint256> setKnown;
std::chrono::microseconds m_next_addr_send GUARDED_BY(cs_sendProcessing){0};
std::chrono::microseconds m_next_local_addr_send GUARDED_BY(cs_sendProcessing){0};
/** Number of addresses that can be processed from this peer. Start at 1 to
* permit self-announcement. */
double m_addr_token_bucket{1.0};
/** When m_addr_token_bucket was last updated */
std::chrono::microseconds m_addr_token_timestamp{GetTime<std::chrono::microseconds>()};
/** Total number of addresses that were dropped due to rate limiting. */
std::atomic<uint64_t> m_addr_rate_limited{0};
/** Total number of addresses that were processed (excludes rate limited ones). */
std::atomic<uint64_t> m_addr_processed{0};

// inventory based relay
CRollingBloomFilter filterInventoryKnown;
Expand Down
54 changes: 52 additions & 2 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
#include "util/validation.h"
#include "validation.h"

#include <chrono>

using namespace std::chrono_literals;

int64_t nTimeBestReceived = 0; // Used only to inform the wallet of when we last received a block

static const uint64_t RANDOMIZER_ID_ADDRESS_RELAY = 0x3cac0035b5866b90ULL; // SHA256("main address relay")[0:8]
Expand Down Expand Up @@ -220,6 +224,10 @@ struct CNodeState {
int nBlocksInFlight;
//! Whether we consider this a preferred download peer.
bool fPreferredDownload;
//! Addresses processed
uint64_t amt_addr_processed = 0;
//! Addresses rate limited
uint64_t amt_addr_rate_limited = 0;

CNodeBlocks nodeBlocks;

Expand Down Expand Up @@ -494,6 +502,9 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats)
if (queue.pindex)
stats.vHeightInFlight.push_back(queue.pindex->nHeight);
}

stats.m_addr_processed = state->amt_addr_processed;
stats.m_addr_rate_limited = state->amt_addr_rate_limited;
return true;
}

Expand Down Expand Up @@ -1346,6 +1357,9 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
// Get recent addresses
connman->PushMessage(pfrom, CNetMsgMaker(nSendVersion).Make(NetMsgType::GETADDR));
pfrom->fGetAddr = true;
// When requesting a getaddr, accept an additional MAX_ADDR_TO_SEND addresses in response
// (bypassing the MAX_ADDR_PROCESSING_TOKEN_BUCKET limit).
pfrom->m_addr_token_bucket += MAX_ADDR_TO_SEND;
connman->MarkAddressGood(pfrom->addr);
}

Expand Down Expand Up @@ -1494,16 +1508,42 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
std::vector<CAddress> vAddrOk;
int64_t nNow = GetAdjustedTime();
int64_t nSince = nNow - 10 * 60;

// Update/increment addr rate limiting bucket.
// TODO: Slight time improvement calculation, continue backporting
const auto current_time = GetTime<std::chrono::microseconds>();
if (pfrom->m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
// Don't increment bucket if it's already full
const auto time_diff = std::max(current_time - pfrom->m_addr_token_timestamp, 0us);
const double increment = CountSecondsDouble(time_diff) * MAX_ADDR_RATE_PER_SECOND;
pfrom->m_addr_token_bucket = std::min<double>(pfrom->m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET);
}
pfrom->m_addr_token_timestamp = current_time;

uint64_t num_proc = 0;
uint64_t num_rate_limit = 0;
Shuffle(vAddr.begin(), vAddr.end(), FastRandomContext());

for (CAddress& addr : vAddr) {
if (interruptMsgProc)
return true;

// Apply rate limiting.
if (pfrom->m_addr_token_bucket < 1.0) {
++num_rate_limit;
continue;
} else {
pfrom->m_addr_token_bucket -= 1.0;
}

if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES)
continue;

if (addr.nTime <= 100000000 || addr.nTime > nNow + 10 * 60)
addr.nTime = nNow - 5 * 24 * 60 * 60;
pfrom->AddAddressKnown(addr);
if (connman->IsBanned(addr)) continue; // Do not process banned addresses beyond remembering we received them
++num_proc;
bool fReachable = IsReachable(addr);
if (addr.nTime > nSince && !pfrom->fGetAddr && vAddr.size() <= 10 && addr.IsRoutable()) {
// Relay to a limited number of other nodes
Expand All @@ -1513,6 +1553,13 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
if (fReachable)
vAddrOk.push_back(addr);
}
pfrom->m_addr_processed += num_proc;
pfrom->m_addr_rate_limited += num_rate_limit;
CNodeState* state = State(pfrom->GetId());
state->amt_addr_processed += num_proc;
state->amt_addr_rate_limited += num_rate_limit;
LogPrint(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n",
vAddr.size(), num_proc, num_rate_limit, pfrom->GetId());
connman->AddNewAddresses(vAddrOk, pfrom->addr, 2 * 60 * 60);
if (vAddr.size() < 1000)
pfrom->fGetAddr = false;
Expand Down Expand Up @@ -1962,8 +2009,11 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
pfrom->vAddrToSend.clear();
std::vector<CAddress> vAddr = connman->GetAddresses(MAX_ADDR_TO_SEND, MAX_PCT_ADDR_TO_SEND, /* network */ nullopt);
FastRandomContext insecure_rand;
for (const CAddress& addr : vAddr)
pfrom->PushAddress(addr, insecure_rand);
for (const CAddress& addr : vAddr) {
if (!connman->IsBanned(addr)) {
pfrom->PushAddress(addr, insecure_rand);
}
}
}


Expand Down
9 changes: 9 additions & 0 deletions src/net_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ struct CNodeStateStats {
int nSyncHeight;
int nCommonHeight;
std::vector<int> vHeightInFlight;
uint64_t m_addr_processed = 0;
uint64_t m_addr_rate_limited = 0;
};

/** Get statistics from node state */
Expand All @@ -72,4 +74,11 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats);
void Misbehaving(NodeId nodeid, int howmuch, const std::string& message="") EXCLUSIVE_LOCKS_REQUIRED(cs_main);
bool IsBanned(NodeId nodeid);


using SecondsDouble = std::chrono::duration<double, std::chrono::seconds::period>;
/**
* Helper to count the seconds in any std::chrono::duration type
*/
inline double CountSecondsDouble(SecondsDouble t) { return t.count(); }

#endif // BITCOIN_NET_PROCESSING_H
4 changes: 4 additions & 0 deletions src/rpc/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ UniValue getpeerinfo(const JSONRPCRequest& request)
" n, (numeric) The heights of blocks we're currently asking from this peer\n"
" ...\n"
" ]\n"
" \"addr_processed\": n, (numeric) The total number of addresses processed, excluding those dropped due to rate limiting\n"
" \"addr_rate_limited\": n, (numeric) The total number of addresses dropped due to rate limiting\n"
" \"bytessent_per_msg\": {\n"
" \"addr\": n, (numeric) The total bytes sent aggregated by message type\n"
" ...\n"
Expand Down Expand Up @@ -166,6 +168,8 @@ UniValue getpeerinfo(const JSONRPCRequest& request)
heights.push_back(height);
}
obj.pushKV("inflight", heights);
obj.pushKV("addr_processed", statestats.m_addr_processed);
obj.pushKV("addr_rate_limited", statestats.m_addr_rate_limited);
}
obj.pushKV("whitelisted", stats.fWhitelisted);

Expand Down
93 changes: 86 additions & 7 deletions test/functional/p2p_addr_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@
Test addr relay
"""

import time

from test_framework.messages import (
CAddress,
NODE_NETWORK,
msg_addr,
)
from test_framework.mininode import (
P2PInterface,
mininode_lock
)
from test_framework.test_framework import PivxTestFramework
from test_framework.util import (
assert_equal,
)
from test_framework.util import assert_equal

import random
import time

ADDRS = []
for i in range(10):
Expand All @@ -32,17 +32,33 @@


class AddrReceiver(P2PInterface):
_tokens = 1
def on_addr(self, message):
for addr in message.addrs:
assert_equal(addr.nServices, 1)
assert addr.ip.startswith('123.123.123.')
assert (8333 <= addr.port < 8343)

def on_getaddr(self, message):
# When the node sends us a getaddr, it increments the addr relay tokens for the connection by 1000
self._tokens += 1000

@property
def tokens(self):
with mininode_lock:
return self._tokens

def increment_tokens(self, n):
# When we move mocktime forward, the node increments the addr relay tokens for its peers
with mininode_lock:
self._tokens += n


class AddrTest(PivxTestFramework):
def set_test_params(self):
self.setup_clean_chain = False
self.num_nodes = 1
self.extra_args = [["-whitelist=127.0.0.1"]]

def run_test(self):
self.log.info('Create connection that sends addr messages')
Expand All @@ -58,13 +74,76 @@ def run_test(self):
addr_receiver = self.nodes[0].add_p2p_connection(AddrReceiver())
msg.addrs = ADDRS
with self.nodes[0].assert_debug_log([
'Added 10 addresses from 127.0.0.1: 0 tried',
'received: addr (301 bytes) peer=0',
'sending addr (301 bytes) peer=1',
]):
addr_source.send_and_ping(msg)
self.nodes[0].setmocktime(int(time.time()) + 30 * 60)
addr_receiver.sync_with_ping()
self.rate_limit_tests()

def setup_rand_addr_msg(self, num):
addrs = []
for i in range(num):
addr = CAddress()
addr.time = self.mocktime + i
addr.nServices = NODE_NETWORK
addr.ip = f"{random.randrange(128,169)}.{random.randrange(1,255)}.{random.randrange(1,255)}.{random.randrange(1,255)}"
addr.port = 8333
addrs.append(addr)
msg = msg_addr()
msg.addrs = addrs
return msg

def send_addrs_and_test_rate_limiting(self, peer, new_addrs, total_addrs):
"""Send an addr message and check that the number of addresses processed and rate-limited is as expected"""

peer.send_and_ping(self.setup_rand_addr_msg(new_addrs))

peerinfo = self.nodes[0].getpeerinfo()[2]
addrs_processed = peerinfo['addr_processed']
addrs_rate_limited = peerinfo['addr_rate_limited']
self.log.info(f'addrs_processed = {addrs_processed}, addrs_rate_limited = {addrs_rate_limited}')
self.log.info(f'peer_tokens = {peer.tokens}')

assert_equal(addrs_processed, min(total_addrs, peer.tokens))
assert_equal(addrs_rate_limited, max(0, total_addrs - peer.tokens))

def rate_limit_tests(self):

self.mocktime = int(time.time())
self.nodes[0].setmocktime(self.mocktime)

peer = self.nodes[0].add_p2p_connection(AddrReceiver())

self.log.info(f'Test rate limiting of addr processing for inbound peers')

# Send 600 addresses.
self.send_addrs_and_test_rate_limiting(peer, 600, 600)

# Send 400 more addresses.
self.send_addrs_and_test_rate_limiting(peer, 400, 1000)

# Send 10 more. As we reached the processing limit for nodes, no more addresses should be procesesd.
self.send_addrs_and_test_rate_limiting(peer, 10, 1010)

# Advance the time by 100 seconds, permitting the processing of 10 more addresses.
# Send 200 and verify that 10 are processed.
self.mocktime += 100
self.nodes[0].setmocktime(self.mocktime)
peer.increment_tokens(10)

self.send_addrs_and_test_rate_limiting(peer, 200, 1210)

# Advance the time by 1000 seconds, permitting the processing of 100 more addresses.
# Send 200 and verify that 100 are processed.
self.mocktime += 1000
self.nodes[0].setmocktime(self.mocktime)
peer.increment_tokens(100)

self.send_addrs_and_test_rate_limiting(peer, 200, 1410)

self.nodes[0].disconnect_p2ps()



if __name__ == '__main__':
Expand Down
6 changes: 2 additions & 4 deletions test/functional/p2p_addrv2_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def on_addrv2(self, message):
assert_equal(addr.nServices, 1) # NODE_NETWORK
assert addr.ip.startswith('123.123.123.')
assert (8333 <= addr.port < 8343)
self.addrv2_received_and_checked = True
self.addrv2_received_and_checked = True

def wait_for_addrv2(self):
self.wait_until(lambda: "addrv2" in self.last_message)
Expand All @@ -63,14 +63,12 @@ def run_test(self):
addr_receiver = self.nodes[0].add_p2p_connection(AddrReceiver())
msg.addrs = ADDRS
with self.nodes[0].assert_debug_log([
'Added 10 addresses from 127.0.0.1: 0 tried',
'received: addrv2 (131 bytes) peer=0',
'sending addrv2 (131 bytes) peer=1',
'sending addrv2 (14 bytes) peer=1',
]):
addr_source.send_and_ping(msg)
self.nodes[0].setmocktime(int(time.time()) + 30 * 60)
addr_receiver.wait_for_addrv2()

assert addr_receiver.addrv2_received_and_checked


Expand Down
4 changes: 2 additions & 2 deletions test/functional/p2p_invalid_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ def test_addrv2_unrecognized_network(self):
self.test_addrv2('unrecognized network',
[
'received: addrv2 (25 bytes)',
'IP 9.9.9.9 mapped',
'Added 1 addresses',
'Received addr: 2 addresses (1 processed, 1 rate-limited)',
'received: ping (8 bytes)',
],
hex_str_to_bytes(
'02' + # number of entries
Expand Down
2 changes: 1 addition & 1 deletion test/functional/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
'mempool_persist.py', # ~ 417 sec
'p2p_quorum_connect.py', # ~ 400 sec
'wallet_reorgsrestore.py', # ~ 391 sec
'p2p_addr_relay.py', # ~ 380 sec

# vv Tests less than 5m vv
'wallet_hd.py', # ~ 300 sec
Expand Down Expand Up @@ -135,7 +136,6 @@
'rpc_decodescript.py', # ~ 50 sec
'rpc_blockchain.py', # ~ 50 sec
'wallet_disable.py', # ~ 50 sec
'p2p_addr_relay.py', # ~ 49 sec
'p2p_addrv2_relay.py', # ~ 49 sec
'wallet_autocombine.py', # ~ 49 sec
'mining_v5_upgrade.py', # ~ 48 sec
Expand Down

0 comments on commit 3717978

Please sign in to comment.