Skip to content

Commit

Permalink
Always accept incoming connections from trusted peers (paradigmxyz#7140)
Browse files Browse the repository at this point in the history
  • Loading branch information
AbnerZheng authored and Ruteri committed Apr 17, 2024
1 parent 225de5f commit 4445cf7
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 61 deletions.
58 changes: 45 additions & 13 deletions crates/net/network/src/peers/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,14 @@ impl PeersManager {

/// Invoked when a new _incoming_ tcp connection is accepted.
///
/// returns an error if the inbound ip address is on the ban list or
/// we have reached our limit for max inbound connections
/// returns an error if the inbound ip address is on the ban list
pub(crate) fn on_incoming_pending_session(
&mut self,
addr: IpAddr,
) -> Result<(), InboundConnectionError> {
if self.ban_list.is_banned_ip(&addr) {
return Err(InboundConnectionError::IpBanned)
}
if !self.connection_info.has_in_capacity() {
return Err(InboundConnectionError::ExceedsLimit(self.connection_info.max_inbound))
}
// keep track of new connection
self.connection_info.inc_in();
Ok(())
}
Expand Down Expand Up @@ -284,6 +279,14 @@ impl PeersManager {
return
}
value.state = PeerConnectionState::In;
// if a peer is not trusted and we don't have capacity for more inbound connections,
// disconnecting the peer
if !value.is_trusted() && !self.connection_info.has_in_capacity() {
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id,
reason: Some(DisconnectReason::TooManyPeers),
});
}
}
Entry::Vacant(entry) => {
// peer is missing in the table, we add it but mark it as to be removed after
Expand All @@ -292,6 +295,14 @@ impl PeersManager {
peer.remove_after_disconnect = true;
entry.insert(peer);
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));

// disconnect the peer if we don't have capacity for more inbound connections
if !self.connection_info.has_in_capacity() {
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id,
reason: Some(DisconnectReason::TooManyPeers),
});
}
}
}
}
Expand Down Expand Up @@ -573,7 +584,10 @@ impl PeersManager {
/// to us at the same time and this connection is already established.
pub(crate) fn on_already_connected(&mut self, direction: Direction) {
match direction {
Direction::Incoming => {}
Direction::Incoming => {
// need to decrement the ingoing counter
self.connection_info.decr_in();
}
Direction::Outgoing(_) => {
// need to decrement the outgoing counter
self.connection_info.decr_out();
Expand Down Expand Up @@ -875,7 +889,7 @@ impl ConnectionInfo {

/// Returns `true` if there's still capacity for a new incoming connection.
fn has_in_capacity(&self) -> bool {
self.num_inbound < self.max_inbound
self.num_inbound <= self.max_inbound
}

fn decr_state(&mut self, state: PeerConnectionState) {
Expand Down Expand Up @@ -1420,7 +1434,6 @@ impl Default for PeerBackoffDurations {

#[derive(Debug, Error)]
pub enum InboundConnectionError {
ExceedsLimit(usize),
IpBanned,
}

Expand Down Expand Up @@ -1449,7 +1462,7 @@ mod tests {
DisconnectReason,
};
use reth_net_common::ban_list::BanList;
use reth_network_api::ReputationChangeKind;
use reth_network_api::{Direction, ReputationChangeKind};
use reth_primitives::{PeerId, B512};
use std::{
collections::HashSet,
Expand Down Expand Up @@ -1989,6 +2002,28 @@ mod tests {
}
}

#[tokio::test]
async fn test_already_connected() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
peers.on_incoming_session_established(peer, socket_addr);

// peer should have been added and num_inbound should have been increased
let p = peers.peers.get_mut(&peer).expect("peer not found");
assert_eq!(p.addr, socket_addr);
assert_eq!(peers.connection_info.num_inbound, 1);

assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
peers.on_already_connected(Direction::Incoming);

// peer should not be connected and num_inbound should not have been increased
let p = peers.peers.get_mut(&peer).expect("peer not found");
assert_eq!(p.addr, socket_addr);
assert_eq!(peers.connection_info.num_inbound, 1);
}

#[tokio::test]
async fn test_reputation_change_trusted_peer() {
let peer = PeerId::random();
Expand Down Expand Up @@ -2166,9 +2201,6 @@ mod tests {
Ok(_) => panic!(),
Err(err) => match err {
super::InboundConnectionError::IpBanned {} => {}
super::InboundConnectionError::ExceedsLimit { .. } => {
panic!()
}
},
}
}
Expand Down
39 changes: 0 additions & 39 deletions crates/net/network/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ pub use handle::{
use reth_eth_wire::multiplex::RlpxProtocolMultiplexer;
pub use reth_network_api::{Direction, PeerInfo};

/// Maximum allowed graceful disconnects at a time.
const MAX_GRACEFUL_DISCONNECTS: usize = 15;

/// Internal identifier for active sessions.
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
pub struct SessionId(usize);
Expand Down Expand Up @@ -113,8 +110,6 @@ pub struct SessionManager {
bandwidth_meter: BandwidthMeter,
/// Metrics for the session manager.
metrics: SessionManagerMetrics,
/// Tracks the number of active graceful disconnects for incoming connections.
graceful_disconnects_counter: GracefulDisconnects,
}

// === impl SessionManager ===
Expand Down Expand Up @@ -156,7 +151,6 @@ impl SessionManager {
bandwidth_meter,
extra_protocols,
metrics: Default::default(),
graceful_disconnects_counter: Default::default(),
}
}

Expand Down Expand Up @@ -310,27 +304,6 @@ impl SessionManager {
}
}

/// Sends a disconnect message to the peer with the given [DisconnectReason].
pub(crate) fn disconnect_incoming_connection(
&mut self,
stream: TcpStream,
reason: DisconnectReason,
) {
let counter = self.graceful_disconnects_counter.clone();
if counter.exceeds_limit() {
// simply drop the connection if there are too many active disconnects already
return
}
let secret_key = self.secret_key;

self.spawn(async move {
if let Ok(stream) = get_eciess_stream(stream, secret_key, Direction::Incoming).await {
let _ = UnauthedP2PStream::new(stream).send_disconnect(reason).await;
}
drop(counter)
});
}

/// Initiates a shutdown of all sessions.
///
/// It will trigger the disconnect on all the session tasks to gracefully terminate. The result
Expand Down Expand Up @@ -635,18 +608,6 @@ impl SessionManager {
}
}

/// Keep track of graceful disconnects for incoming connections.
#[derive(Debug, Clone, Default)]
struct GracefulDisconnects(Arc<()>);

impl GracefulDisconnects {
/// Returns true if the number of graceful disconnects exceeds the limit
/// [MAX_GRACEFUL_DISCONNECTS]
fn exceeds_limit(&self) -> bool {
Arc::strong_count(&self.0) > MAX_GRACEFUL_DISCONNECTS
}
}

/// Events produced by the [`SessionManager`]
#[derive(Debug)]
pub enum SessionEvent {
Expand Down
11 changes: 2 additions & 9 deletions crates/net/network/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::Stream;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
errors::EthStreamError,
DisconnectReason, EthVersion, Status,
EthVersion, Status,
};
use reth_primitives::PeerId;
use reth_provider::{BlockNumReader, BlockReader};
Expand All @@ -29,7 +29,7 @@ use tracing::trace;
///
/// A swarm emits [`SwarmEvent`]s when polled.
///
/// The manages the [`ConnectionListener`] and delegates new incoming connections to the
/// It manages the [`ConnectionListener`] and delegates new incoming connections to the
/// [`SessionManager`]. Outgoing connections are either initiated on demand or triggered by the
/// [`NetworkState`] and also delegated to the [`NetworkState`].
///
Expand Down Expand Up @@ -203,13 +203,6 @@ where
InboundConnectionError::IpBanned => {
trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
}
InboundConnectionError::ExceedsLimit(limit) => {
trace!(target: "net", %limit, ?remote_addr, "Exceeded incoming connection limit; disconnecting");
self.sessions.disconnect_incoming_connection(
stream,
DisconnectReason::TooManyPeers,
);
}
}
return None
}
Expand Down
117 changes: 117 additions & 0 deletions crates/net/network/tests/it/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,123 @@ async fn test_disconnect_incoming_when_exceeded_incoming_connections() {
net_handle.terminate().await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_always_accept_incoming_connections_from_trusted_peers() {
reth_tracing::init_test_tracing();
let other_peer1 = new_random_peer(10, HashSet::new()).await;
let other_peer2 = new_random_peer(10, HashSet::new()).await;
let other_peer3 = new_random_peer(0, HashSet::new()).await;

// setup the peer with max_inbound = 1, and add other_peer_3 as trust nodes
let peer = new_random_peer(
1,
HashSet::from([NodeRecord::new(other_peer3.local_addr(), *other_peer3.peer_id())]),
)
.await;

let handle = peer.handle().clone();
let other_peer_handle1 = other_peer1.handle().clone();
let other_peer_handle2 = other_peer2.handle().clone();
let other_peer_handle3 = other_peer3.handle().clone();

tokio::task::spawn(peer);
tokio::task::spawn(other_peer1);
tokio::task::spawn(other_peer2);
tokio::task::spawn(other_peer3);

let mut events = NetworkEventStream::new(handle.event_listener());
let mut events2 = NetworkEventStream::new(other_peer_handle2.event_listener());

// though we added other_peer3 as a trust node, the incoming connection should fail because
// peer3 doesn't allow inbound connections
let (peer_id, reason) = events.next_session_closed().await.unwrap();
assert_eq!(peer_id, *other_peer_handle3.peer_id());
assert_eq!(reason, Some(DisconnectReason::TooManyPeers));

// incoming connection should succeed
other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr());
let peer_id = events.next_session_established().await.unwrap();
assert_eq!(peer_id, *other_peer_handle1.peer_id());
assert_eq!(handle.num_connected_peers(), 1);

// incoming connection should fail because exceeding max_inbound
other_peer_handle2.add_peer(*handle.peer_id(), handle.local_addr());
let (peer_id, reason) = events.next_session_closed().await.unwrap();
assert_eq!(peer_id, *other_peer_handle2.peer_id());
// fixme: this should be `Some(DisconnectReason::TooManyPeers)` but `None`
assert_eq!(reason, None);

let (peer_id, reason) = events2.next_session_closed().await.unwrap();
assert_eq!(peer_id, *handle.peer_id());
assert_eq!(reason, Some(DisconnectReason::TooManyPeers));

// outbound connection from `other_peer3` should succeed
other_peer_handle3.add_peer(*handle.peer_id(), handle.local_addr());
let peer_id = events.next_session_established().await.unwrap();
assert_eq!(peer_id, *other_peer_handle3.peer_id());

// sleep is needed because the disconnect event happened after session_established event
tokio::time::sleep(Duration::from_secs(3)).await;
assert_eq!(handle.num_connected_peers(), 2);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_rejected_by_already_connect() {
reth_tracing::init_test_tracing();
let other_peer1 = new_random_peer(10, HashSet::new()).await;
let other_peer2 = new_random_peer(10, HashSet::new()).await;

// setup the peer with max_inbound = 2
let peer = new_random_peer(2, HashSet::new()).await;

let handle = peer.handle().clone();
let other_peer_handle1 = other_peer1.handle().clone();
let other_peer_handle2 = other_peer2.handle().clone();

tokio::task::spawn(peer);
tokio::task::spawn(other_peer1);
tokio::task::spawn(other_peer2);

let mut events = NetworkEventStream::new(handle.event_listener());

// incoming connection should succeed
other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr());
let peer_id = events.next_session_established().await.unwrap();
assert_eq!(peer_id, *other_peer_handle1.peer_id());
assert_eq!(handle.num_connected_peers(), 1);

// incoming connection from the same peer should be rejected by already connected
// and num_inbount should still be 1
other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr());
tokio::time::sleep(Duration::from_secs(1)).await;

// incoming connection from other_peer2 should succeed
other_peer_handle2.add_peer(*handle.peer_id(), handle.local_addr());
let peer_id = events.next_session_established().await.unwrap();
assert_eq!(peer_id, *other_peer_handle2.peer_id());

// wait 2 seconds and check that other_peer2 is not rejected by TooManyPeers
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(handle.num_connected_peers(), 2);
}

async fn new_random_peer(
max_in_bound: usize,
trusted_nodes: HashSet<NodeRecord>,
) -> NetworkManager<NoopProvider> {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let peers_config =
PeersConfig::default().with_max_inbound(max_in_bound).with_trusted_nodes(trusted_nodes);

let config = NetworkConfigBuilder::new(secret_key)
.listener_port(0)
.disable_discovery()
.peer_config(peers_config)
.build(NoopProvider::default());

NetworkManager::new(config).await.unwrap()
}

#[tokio::test(flavor = "multi_thread")]
async fn test_connect_many() {
reth_tracing::init_test_tracing();
Expand Down

0 comments on commit 4445cf7

Please sign in to comment.