Skip to content

Commit

Permalink
Merge d3474b8 into merged_master (Bitcoin PR bitcoin/bitcoin#22387)
Browse files Browse the repository at this point in the history
  • Loading branch information
apoelstra committed Aug 3, 2021
2 parents b387aac + d3474b8 commit d4c7159
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/net_permissions.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ enum class NetPermissionFlags : uint32_t {
NoBan = (1U << 4) | Download,
// Can query the mempool
Mempool = (1U << 5),
// Can request addrs without hitting a privacy-preserving cache
// Can request addrs without hitting a privacy-preserving cache, and send us
// unlimited amounts of addrs.
Addr = (1U << 7),

// True if the user did not specifically set fine grained permissions
Expand Down
54 changes: 54 additions & 0 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ static constexpr uint32_t MAX_GETCFHEADERS_SIZE = 2000;
static constexpr size_t MAX_PCT_ADDR_TO_SEND = 23;
/** The maximum number of address records permitted in an ADDR message. */
static constexpr size_t MAX_ADDR_TO_SEND{1000};
/** The maximum rate of address records we're willing to process on average. Can be bypassed using
* the NetPermissionFlags::Addr permission. */
static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1};
/** The soft limit of the address processing token bucket (the regular MAX_ADDR_RATE_PER_SECOND
* based increments won't go above this, but the MAX_ADDR_TO_SEND increment following GETADDR
* is exempt from this limit. */
static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND};

// Internal stuff
namespace {
Expand Down Expand Up @@ -233,6 +240,15 @@ struct Peer {
std::atomic_bool m_wants_addrv2{false};
/** Whether this peer has already sent us a getaddr message. */
bool m_getaddr_recvd{false};
/** Number of addr messages that can be processed from this peer. Start at 1 to
* permit self-announcement. */
double m_addr_token_bucket{1.0};
/** When m_addr_token_bucket was last updated */
std::chrono::microseconds m_addr_token_timestamp{GetTime<std::chrono::microseconds>()};
/** Total number of addresses that were dropped due to rate limiting. */
std::atomic<uint64_t> m_addr_rate_limited{0};
/** Total number of addresses that were processed (excludes rate limited ones). */
std::atomic<uint64_t> m_addr_processed{0};

/** Set of txids to reconsider once their parent transactions have been accepted **/
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
Expand Down Expand Up @@ -1239,6 +1255,8 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) c
}

stats.m_ping_wait = ping_wait;
stats.m_addr_processed = peer->m_addr_processed.load();
stats.m_addr_rate_limited = peer->m_addr_rate_limited.load();

return true;
}
Expand Down Expand Up @@ -2583,6 +2601,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// Get recent addresses
m_connman.PushMessage(&pfrom, CNetMsgMaker(greatest_common_version).Make(NetMsgType::GETADDR));
peer->m_getaddr_sent = true;
// When requesting a getaddr, accept an additional MAX_ADDR_TO_SEND addresses in response
// (bypassing the MAX_ADDR_PROCESSING_TOKEN_BUCKET limit).
peer->m_addr_token_bucket += MAX_ADDR_TO_SEND;
}

if (!pfrom.IsInboundConn()) {
Expand Down Expand Up @@ -2777,11 +2798,34 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
std::vector<CAddress> vAddrOk;
int64_t nNow = GetAdjustedTime();
int64_t nSince = nNow - 10 * 60;

// Update/increment addr rate limiting bucket.
const auto current_time = GetTime<std::chrono::microseconds>();
if (peer->m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
// Don't increment bucket if it's already full
const auto time_diff = std::max(current_time - peer->m_addr_token_timestamp, 0us);
const double increment = CountSecondsDouble(time_diff) * MAX_ADDR_RATE_PER_SECOND;
peer->m_addr_token_bucket = std::min<double>(peer->m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET);
}
peer->m_addr_token_timestamp = current_time;

const bool rate_limited = !pfrom.HasPermission(NetPermissionFlags::Addr);
uint64_t num_proc = 0;
uint64_t num_rate_limit = 0;
Shuffle(vAddr.begin(), vAddr.end(), FastRandomContext());
for (CAddress& addr : vAddr)
{
if (interruptMsgProc)
return;

// Apply rate limiting.
if (rate_limited) {
if (peer->m_addr_token_bucket < 1.0) {
++num_rate_limit;
continue;
}
peer->m_addr_token_bucket -= 1.0;
}
// We only bother storing full nodes, though this may include
// things which we would not make an outbound connection to, in
// part because we may make feeler connections to them.
Expand All @@ -2795,6 +2839,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// Do not process banned/discouraged addresses beyond remembering we received them
continue;
}
++num_proc;
bool fReachable = IsReachable(addr);
if (addr.nTime > nSince && !peer->m_getaddr_sent && vAddr.size() <= 10 && addr.IsRoutable()) {
// Relay to a limited number of other nodes
Expand All @@ -2804,6 +2849,15 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
if (fReachable)
vAddrOk.push_back(addr);
}
peer->m_addr_processed += num_proc;
peer->m_addr_rate_limited += num_rate_limit;
LogPrint(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d%s\n",
vAddr.size(),
num_proc,
num_rate_limit,
pfrom.GetId(),
fLogIPs ? ", peeraddr=" + pfrom.addr.ToString() : "");

m_addrman.Add(vAddrOk, pfrom.addr, 2 * 60 * 60);
if (vAddr.size() < 1000) peer->m_getaddr_sent = false;
if (pfrom.IsAddrFetchConn()) {
Expand Down
2 changes: 2 additions & 0 deletions src/net_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ struct CNodeStateStats {
int m_starting_height = -1;
std::chrono::microseconds m_ping_wait;
std::vector<int> vHeightInFlight;
uint64_t m_addr_processed = 0;
uint64_t m_addr_rate_limited = 0;
};

class PeerManager : public CValidationInterface, public NetEventsInterface
Expand Down
2 changes: 2 additions & 0 deletions src/rpc/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ static RPCHelpMan getpeerinfo()
heights.push_back(height);
}
obj.pushKV("inflight", heights);
obj.pushKV("addr_processed", statestats.m_addr_processed);
obj.pushKV("addr_rate_limited", statestats.m_addr_rate_limited);
}
UniValue permissions(UniValue::VARR);
for (const auto& permission : NetPermissions::ToStrings(stats.m_permissionFlags)) {
Expand Down
99 changes: 94 additions & 5 deletions test/functional/p2p_addr_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@
msg_addr,
msg_getaddr
)
from test_framework.p2p import P2PInterface
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
from test_framework.p2p import (
P2PInterface,
p2p_lock,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal
import random
import time


class AddrReceiver(P2PInterface):
num_ipv4_received = 0
test_addr_contents = False
_tokens = 1

def __init__(self, test_addr_contents=False):
super().__init__()
Expand All @@ -40,6 +43,20 @@ def on_addr(self, message):
raise AssertionError("Invalid addr.port of {} (8333-8342 expected)".format(addr.port))
assert addr.ip.startswith('123.123.123.')

def on_getaddr(self, message):
# When the node sends us a getaddr, it increments the addr relay tokens for the connection by 1000
self._tokens += 1000

@property
def tokens(self):
with p2p_lock:
return self._tokens

def increment_tokens(self, n):
# When we move mocktime forward, the node increments the addr relay tokens for its peers
with p2p_lock:
self._tokens += n

def addr_received(self):
return self.num_ipv4_received != 0

Expand All @@ -53,12 +70,14 @@ class AddrTest(BitcoinTestFramework):

def set_test_params(self):
self.num_nodes = 1
self.extra_args = [["-whitelist=addr@127.0.0.1"]]

def run_test(self):
self.oversized_addr_test()
self.relay_tests()
self.getaddr_tests()
self.blocksonly_mode_tests()
self.rate_limit_tests()

def setup_addr_msg(self, num):
addrs = []
Expand All @@ -75,6 +94,19 @@ def setup_addr_msg(self, num):
msg.addrs = addrs
return msg

def setup_rand_addr_msg(self, num):
addrs = []
for i in range(num):
addr = CAddress()
addr.time = self.mocktime + i
addr.nServices = NODE_NETWORK | NODE_WITNESS
addr.ip = f"{random.randrange(128,169)}.{random.randrange(1,255)}.{random.randrange(1,255)}.{random.randrange(1,255)}"
addr.port = 8333
addrs.append(addr)
msg = msg_addr()
msg.addrs = addrs
return msg

def send_addr_msg(self, source, msg, receivers):
source.send_and_ping(msg)
# pop m_next_addr_send timer
Expand Down Expand Up @@ -191,7 +223,7 @@ def getaddr_tests(self):

def blocksonly_mode_tests(self):
self.log.info('Test addr relay in -blocksonly mode')
self.restart_node(0, ["-blocksonly"])
self.restart_node(0, ["-blocksonly", "-whitelist=addr@127.0.0.1"])
self.mocktime = int(time.time())

self.log.info('Check that we send getaddr messages')
Expand All @@ -207,6 +239,63 @@ def blocksonly_mode_tests(self):

self.nodes[0].disconnect_p2ps()

def send_addrs_and_test_rate_limiting(self, peer, no_relay, new_addrs, total_addrs):
"""Send an addr message and check that the number of addresses processed and rate-limited is as expected"""

peer.send_and_ping(self.setup_rand_addr_msg(new_addrs))

peerinfo = self.nodes[0].getpeerinfo()[0]
addrs_processed = peerinfo['addr_processed']
addrs_rate_limited = peerinfo['addr_rate_limited']
self.log.debug(f"addrs_processed = {addrs_processed}, addrs_rate_limited = {addrs_rate_limited}")

if no_relay:
assert_equal(addrs_processed, 0)
assert_equal(addrs_rate_limited, 0)
else:
assert_equal(addrs_processed, min(total_addrs, peer.tokens))
assert_equal(addrs_rate_limited, max(0, total_addrs - peer.tokens))

def rate_limit_tests(self):

self.mocktime = int(time.time())
self.restart_node(0, [])
self.nodes[0].setmocktime(self.mocktime)

for contype, no_relay in [("outbound-full-relay", False), ("block-relay-only", True), ("inbound", False)]:
self.log.info(f'Test rate limiting of addr processing for {contype} peers')
if contype == "inbound":
peer = self.nodes[0].add_p2p_connection(AddrReceiver())
else:
peer = self.nodes[0].add_outbound_p2p_connection(AddrReceiver(), p2p_idx=0, connection_type=contype)

# Send 600 addresses. For all but the block-relay-only peer this should result in addresses being processed.
self.send_addrs_and_test_rate_limiting(peer, no_relay, 600, 600)

# Send 600 more addresses. For the outbound-full-relay peer (which we send a GETADDR, and thus will
# process up to 1001 incoming addresses), this means more addresses will be processed.
self.send_addrs_and_test_rate_limiting(peer, no_relay, 600, 1200)

# Send 10 more. As we reached the processing limit for all nodes, no more addresses should be procesesd.
self.send_addrs_and_test_rate_limiting(peer, no_relay, 10, 1210)

# Advance the time by 100 seconds, permitting the processing of 10 more addresses.
# Send 200 and verify that 10 are processed.
self.mocktime += 100
self.nodes[0].setmocktime(self.mocktime)
peer.increment_tokens(10)

self.send_addrs_and_test_rate_limiting(peer, no_relay, 200, 1410)

# Advance the time by 1000 seconds, permitting the processing of 100 more addresses.
# Send 200 and verify that 100 are processed.
self.mocktime += 1000
self.nodes[0].setmocktime(self.mocktime)
peer.increment_tokens(100)

self.send_addrs_and_test_rate_limiting(peer, no_relay, 200, 1610)

self.nodes[0].disconnect_p2ps()

if __name__ == '__main__':
AddrTest().main()
5 changes: 4 additions & 1 deletion test/functional/p2p_addrv2_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ def __init__(self):
super().__init__(support_addrv2 = True)

def on_addrv2(self, message):
if ADDRS == message.addrs:
expected_set = set((addr.ip, addr.port) for addr in ADDRS)
received_set = set((addr.ip, addr.port) for addr in message.addrs)
if expected_set == received_set:
self.addrv2_received_and_checked = True

def wait_for_addrv2(self):
Expand All @@ -53,6 +55,7 @@ class AddrTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
self.extra_args = [["-whitelist=addr@127.0.0.1"]]

def run_test(self):
self.log.info('Create connection that sends addrv2 messages')
Expand Down
1 change: 1 addition & 0 deletions test/functional/p2p_invalid_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class InvalidMessagesTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.extra_args = [["-whitelist=addr@127.0.0.1"]]

def run_test(self):
self.test_buffer()
Expand Down

0 comments on commit d4c7159

Please sign in to comment.