Skip to content

Commit

Permalink
Rate limit the processing of incoming addr messages
Browse files Browse the repository at this point in the history
Summary:
> While limitations on the influence of attackers on addrman already
> exist (affected buckets are restricted to a subset based on incoming
> IP / network group), there is no reason to permit them to let them
> feed us addresses at more than a multiple of the normal network
> rate.
>
> This commit introduces a "token bucket" rate limiter for the
> processing of addresses in incoming ADDR and ADDRV2 messages.
> Every connection gets an associated token bucket. Processing an
> address in an ADDR or ADDRV2 message from non-whitelisted peers
> consumes a token from the bucket. If the bucket is empty, the
> address is ignored (it is not forwarded or processed). The token
> counter increases at a rate of 0.1 tokens per second, and will
> accrue up to a maximum of 1000 tokens (the maximum we accept in a
> single ADDR or ADDRV2). When a GETADDR is sent to a peer, it
> immediately gets 1000 additional tokens, as we actively desire many
> addresses from such peers (this may temporarily cause the token
> count to exceed 1000).
>
> The rate limit of 0.1 addr/s was chosen based on observation of
> honest nodes on the network. Activity in general from most nodes
> is either 0, or up to a maximum around 0.025 addr/s for recent
> Bitcoin Core nodes. A few (self-identified, through subver) crawler
> nodes occasionally exceed 0.1 addr/s.

> Randomize the order of addr processing

> Functional tests for addr rate limiting

> Add logging and addr rate limiting statistics
>
> Includes logging improvements by Vasil Dimov and John Newbery.

> Improve addr relay tests using statistics

Note: this backport bypasses an intermediate change in `p2p_addrv2_relay.py` from [[bitcoin/bitcoin#22211 | core#22211]]

This is a backport of [[bitcoin/bitcoin#22387 | core#22387]]

Test Plan:
`ninja all check-all`
IBD tests

Reviewers: #bitcoin_abc, Fabien

Reviewed By: #bitcoin_abc, Fabien

Subscribers: Fabien

Differential Revision: https://reviews.bitcoinabc.org/D10941
  • Loading branch information
sipa authored and PiRK committed Feb 1, 2022
1 parent 216ecee commit a1509f7
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 9 deletions.
3 changes: 3 additions & 0 deletions doc/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ This release includes the following features and fixes:
- Add a `-fixedseeds` option which can be set to 0 to disable the hardcoded seeds.
This can be used in conjunction with `dsnssed=0` to create a trusted peer only setup.
In this case the nodes need to be added manually with the `-addnode` option or the `addnode` RPC.
- The node will now limit the rate at which the addresses received via p2p messages are processed.
This can be bypassed if needed by granting the `addr` permission to a peer(see the `-whitelist`
option for details).
3 changes: 2 additions & 1 deletion src/net_permissions.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ enum NetPermissionFlags {
PF_NOBAN = (1U << 4) | PF_DOWNLOAD,
// Can query the mempool
PF_MEMPOOL = (1U << 5),
// Can request addrs without hitting a privacy-preserving cache
// Can request addrs without hitting a privacy-preserving cache, and send us
// unlimited amounts of addrs.
PF_ADDR = (1U << 7),
// Bypass the limit on how many proof INVs are tracked from this peer as
// well as the delay penalty when reaching the the in-flight requests limit
Expand Down
69 changes: 69 additions & 0 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ static constexpr uint32_t MAX_GETCFHEADERS_SIZE = 2000;
static constexpr size_t MAX_PCT_ADDR_TO_SEND = 23;
/** The maximum number of address records permitted in an ADDR message. */
static constexpr size_t MAX_ADDR_TO_SEND{1000};
/**
* The maximum rate of address records we're willing to process on average. Can
* be bypassed using the NetPermissionFlags::Addr permission.
*/
static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1};
/**
* The soft limit of the address processing token bucket (the regular
* MAX_ADDR_RATE_PER_SECOND based increments won't go above this, but the
* MAX_ADDR_TO_SEND increment following GETADDR is exempt from this limit.
*/
static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND};

inline size_t GetMaxAddrToSend() {
return gArgs.GetArg("-maxaddrtosend", MAX_ADDR_TO_SEND);
Expand Down Expand Up @@ -413,6 +424,21 @@ struct Peer {
std::atomic_bool m_wants_addrv2{false};
/** Whether this peer has already sent us a getaddr message. */
bool m_getaddr_recvd{false};
/**
* Number of addr messages that can be processed from this peer. Start at 1
* to permit self-announcement.
*/
double m_addr_token_bucket{1.0};
/** When m_addr_token_bucket was last updated */
std::chrono::microseconds m_addr_token_timestamp{
GetTime<std::chrono::microseconds>()};
/** Total number of addresses that were dropped due to rate limiting. */
std::atomic<uint64_t> m_addr_rate_limited{0};
/**
* Total number of addresses that were processed (excludes rate limited
* ones).
*/
std::atomic<uint64_t> m_addr_processed{0};

/**
* Set of txids to reconsider once their parent transactions have been
Expand Down Expand Up @@ -1626,6 +1652,8 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
}

stats.m_ping_wait = ping_wait;
stats.m_addr_processed = peer->m_addr_processed.load();
stats.m_addr_rate_limited = peer->m_addr_rate_limited.load();
stats.m_addr_relay_enabled = peer->m_addr_relay_enabled.load();

return true;
Expand Down Expand Up @@ -3511,6 +3539,10 @@ void PeerManagerImpl::ProcessMessage(
m_connman.PushMessage(&pfrom, CNetMsgMaker(greatest_common_version)
.Make(NetMsgType::GETADDR));
peer->m_getaddr_sent = true;
// When requesting a getaddr, accept an additional MAX_ADDR_TO_SEND
// addresses in response (bypassing the
// MAX_ADDR_PROCESSING_TOKEN_BUCKET limit).
peer->m_addr_token_bucket += GetMaxAddrToSend();
}

if (!pfrom.IsInboundConn()) {
Expand Down Expand Up @@ -3674,11 +3706,40 @@ void PeerManagerImpl::ProcessMessage(
std::vector<CAddress> vAddrOk;
int64_t nNow = GetAdjustedTime();
int64_t nSince = nNow - 10 * 60;

// Update/increment addr rate limiting bucket.
const auto current_time = GetTime<std::chrono::microseconds>();
if (peer->m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
// Don't increment bucket if it's already full
const auto time_diff =
std::max(current_time - peer->m_addr_token_timestamp, 0us);
const double increment =
CountSecondsDouble(time_diff) * MAX_ADDR_RATE_PER_SECOND;
peer->m_addr_token_bucket =
std::min<double>(peer->m_addr_token_bucket + increment,
MAX_ADDR_PROCESSING_TOKEN_BUCKET);
}
peer->m_addr_token_timestamp = current_time;

const bool rate_limited =
!pfrom.HasPermission(NetPermissionFlags::PF_ADDR);
uint64_t num_proc = 0;
uint64_t num_rate_limit = 0;
Shuffle(vAddr.begin(), vAddr.end(), FastRandomContext());
for (CAddress &addr : vAddr) {
if (interruptMsgProc) {
return;
}

// Apply rate limiting.
if (rate_limited) {
if (peer->m_addr_token_bucket < 1.0) {
++num_rate_limit;
continue;
}
peer->m_addr_token_bucket -= 1.0;
}

// We only bother storing full nodes, though this may include things
// which we would not make an outbound connection to, in part
// because we may make feeler connections to them.
Expand All @@ -3697,6 +3758,7 @@ void PeerManagerImpl::ProcessMessage(
// remembering we received them
continue;
}
++num_proc;
bool fReachable = IsReachable(addr);
if (addr.nTime > nSince && !peer->m_getaddr_sent &&
vAddr.size() <= 10 && addr.IsRoutable()) {
Expand All @@ -3708,6 +3770,13 @@ void PeerManagerImpl::ProcessMessage(
vAddrOk.push_back(addr);
}
}
peer->m_addr_processed += num_proc;
peer->m_addr_rate_limited += num_rate_limit;
LogPrint(BCLog::NET,
"Received addr: %u addresses (%u processed, %u rate-limited) "
"from peer=%d%s\n",
vAddr.size(), num_proc, num_rate_limit, pfrom.GetId(),
fLogIPs ? ", peeraddr=" + pfrom.addr.ToString() : "");

m_connman.AddNewAddresses(vAddrOk, pfrom.addr, 2 * 60 * 60);
if (vAddr.size() < 1000) {
Expand Down
2 changes: 2 additions & 0 deletions src/net_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ struct CNodeStateStats {
int m_starting_height = -1;
std::chrono::microseconds m_ping_wait;
std::vector<int> vHeightInFlight;
uint64_t m_addr_processed = 0;
uint64_t m_addr_rate_limited = 0;
bool m_addr_relay_enabled{false};
};

Expand Down
3 changes: 3 additions & 0 deletions src/rpc/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ static RPCHelpMan getpeerinfo() {
heights.push_back(height);
}
obj.pushKV("inflight", heights);
obj.pushKV("addr_processed", statestats.m_addr_processed);
obj.pushKV("addr_rate_limited",
statestats.m_addr_rate_limited);
}
if (IsDeprecatedRPCEnabled(gArgs, "whitelisted")) {
// whitelisted is deprecated in v0.24.7 for removal in v0.25
Expand Down
111 changes: 109 additions & 2 deletions test/functional/p2p_addr_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Test addr relay
"""

import random
import time

from test_framework.messages import (
Expand All @@ -15,14 +16,15 @@
msg_getaddr,
msg_verack,
)
from test_framework.p2p import P2PInterface
from test_framework.p2p import P2PInterface, p2p_lock
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_greater_than


class AddrReceiver(P2PInterface):
num_ipv4_received = 0
test_addr_contents = False
_tokens = 1
send_getaddr = True

def __init__(self, test_addr_contents=False, send_getaddr=True):
Expand All @@ -42,6 +44,22 @@ def on_addr(self, message):
"Invalid addr.port of {} (8333-8342 expected)".format(addr.port))
assert addr.ip.startswith('123.123.123.')

def on_getaddr(self, message):
# When the node sends us a getaddr, it increments the addr relay tokens
# for the connection by 1000
self._tokens += 1000

@property
def tokens(self):
with p2p_lock:
return self._tokens

def increment_tokens(self, n):
# When we move mocktime forward, the node increments the addr relay
# tokens for its peers
with p2p_lock:
self._tokens += n

def addr_received(self):
return self.num_ipv4_received != 0

Expand All @@ -60,6 +78,7 @@ class AddrTest(BitcoinTestFramework):

def set_test_params(self):
self.num_nodes = 1
self.extra_args = [["-whitelist=addr@127.0.0.1"]]

def run_test(self):
self.oversized_addr_test()
Expand All @@ -70,6 +89,7 @@ def run_test(self):
# in subsequent tests
self.getaddr_tests()
self.blocksonly_mode_tests()
self.rate_limit_tests()

def setup_addr_msg(self, num):
addrs = []
Expand All @@ -86,6 +106,22 @@ def setup_addr_msg(self, num):
msg.addrs = addrs
return msg

def setup_rand_addr_msg(self, num):
addrs = []
for i in range(num):
addr = CAddress()
addr.time = self.mocktime + i
addr.nServices = NODE_NETWORK
addr.ip = (
f"{random.randrange(128,169)}.{random.randrange(1,255)}"
f".{random.randrange(1,255)}.{random.randrange(1,255)}"
)
addr.port = 8333
addrs.append(addr)
msg = msg_addr()
msg.addrs = addrs
return msg

def send_addr_msg(self, source, msg, receivers):
source.send_and_ping(msg)
# pop m_next_addr_send timer
Expand Down Expand Up @@ -305,7 +341,7 @@ def getaddr_tests(self):

def blocksonly_mode_tests(self):
self.log.info('Test addr relay in -blocksonly mode')
self.restart_node(0, ["-blocksonly"])
self.restart_node(0, ["-blocksonly", "-whitelist=addr@127.0.0.1"])
self.mocktime = int(time.time())

self.log.info('Check that we send getaddr messages')
Expand All @@ -322,6 +358,77 @@ def blocksonly_mode_tests(self):

self.nodes[0].disconnect_p2ps()

def send_addrs_and_test_rate_limiting(self, peer, no_relay, new_addrs,
total_addrs):
"""Send an addr message and check that the number of addresses processed
and rate-limited is as expected
"""
peer.send_and_ping(self.setup_rand_addr_msg(new_addrs))

peerinfo = self.nodes[0].getpeerinfo()[0]
addrs_processed = peerinfo['addr_processed']
addrs_rate_limited = peerinfo['addr_rate_limited']
self.log.debug(f"addrs_processed = {addrs_processed}, "
f"addrs_rate_limited = {addrs_rate_limited}")

if no_relay:
assert_equal(addrs_processed, 0)
assert_equal(addrs_rate_limited, 0)
else:
assert_equal(addrs_processed, min(total_addrs, peer.tokens))
assert_equal(addrs_rate_limited, max(0, total_addrs - peer.tokens))

def rate_limit_tests(self):
self.mocktime = int(time.time())
self.restart_node(0, [])
self.nodes[0].setmocktime(self.mocktime)

for contype, no_relay in [
("outbound-full-relay", False),
("block-relay-only", True),
("inbound", False)
]:
self.log.info(
f'Test rate limiting of addr processing for {contype} peers')
if contype == "inbound":
peer = self.nodes[0].add_p2p_connection(AddrReceiver())
else:
peer = self.nodes[0].add_outbound_p2p_connection(
AddrReceiver(), p2p_idx=0, connection_type=contype)

# Send 600 addresses. For all but the block-relay-only peer this
# should result in addresses being processed.
self.send_addrs_and_test_rate_limiting(peer, no_relay, 600, 600)

# Send 600 more addresses. For the outbound-full-relay peer (which
# we send a GETADDR, and thus will process up to 1001 incoming
# addresses), this means more addresses will be processed.
self.send_addrs_and_test_rate_limiting(peer, no_relay, 600, 1200)

# Send 10 more. As we reached the processing limit for all nodes,
# no more addresses should be procesesd.
self.send_addrs_and_test_rate_limiting(peer, no_relay, 10, 1210)

# Advance the time by 100 seconds, permitting the processing of 10
# more addresses.
# Send 200 and verify that 10 are processed.
self.mocktime += 100
self.nodes[0].setmocktime(self.mocktime)
peer.increment_tokens(10)

self.send_addrs_and_test_rate_limiting(peer, no_relay, 200, 1410)

# Advance the time by 1000 seconds, permitting the processing of 100
# more addresses.
# Send 200 and verify that 100 are processed.
self.mocktime += 1000
self.nodes[0].setmocktime(self.mocktime)
peer.increment_tokens(100)

self.send_addrs_and_test_rate_limiting(peer, no_relay, 200, 1610)

self.nodes[0].disconnect_p2ps()


if __name__ == '__main__':
AddrTest().main()
11 changes: 5 additions & 6 deletions test/functional/p2p_addrv2_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from test_framework.messages import NODE_NETWORK, CAddress, msg_addrv2
from test_framework.p2p import P2PInterface
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal

ADDRS = []
for i in range(10):
Expand All @@ -30,11 +29,10 @@ def __init__(self):
super().__init__(support_addrv2=True)

def on_addrv2(self, message):
for addr in message.addrs:
assert_equal(addr.nServices, NODE_NETWORK)
assert addr.ip.startswith('123.123.123.')
assert (8333 <= addr.port < 8343)
self.addrv2_received_and_checked = True
expected_set = set((addr.ip, addr.port) for addr in ADDRS)
received_set = set((addr.ip, addr.port) for addr in message.addrs)
if expected_set == received_set:
self.addrv2_received_and_checked = True

def wait_for_addrv2(self):
self.wait_until(lambda: "addrv2" in self.last_message)
Expand All @@ -44,6 +42,7 @@ class AddrTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
self.extra_args = [["-whitelist=addr@127.0.0.1"]]

def run_test(self):
self.log.info('Create connection that sends addrv2 messages')
Expand Down
1 change: 1 addition & 0 deletions test/functional/p2p_invalid_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class InvalidMessagesTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.extra_args = [["-whitelist=addr@127.0.0.1"]]

def run_test(self):
self.test_buffer()
Expand Down

0 comments on commit a1509f7

Please sign in to comment.