diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index ad92e36d4..c81136967 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -14,7 +14,7 @@ use std::{ }; use tokio::net::UdpSocket; use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender}; -use tokio::time::timeout; +use tokio::time::{sleep, timeout}; use tracing::Instrument; use super::{ConnectionError, EventLoopNotificationsReceiver, NetworkBridge}; @@ -1275,30 +1275,57 @@ impl P2pConnManager { ); } - // If we already have a transport channel, reuse it instead of dialing again. This covers - // transient->normal promotion without tripping duplicate connection errors. + // If a transient transport already exists, promote it without dialing anew. if self.connections.contains_key(&peer) { tracing::info!( tx = %tx, remote = %peer, courtesy, - "connect_peer: reusing existing transport" + "connect_peer: reusing existing transport / promoting transient if present" ); let connection_manager = &self.bridge.op_manager.ring.connection_manager; if let Some(entry) = connection_manager.drop_transient(&peer) { let loc = entry .location .unwrap_or_else(|| Location::from_address(&peer.addr)); - self.bridge - .op_manager - .ring - .add_connection(loc, peer.clone(), false) - .await; - tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient"); + if connection_manager.should_accept(loc, &peer) { + let current = connection_manager.num_connections(); + if current >= connection_manager.max_connections { + tracing::warn!( + tx = %tx, + remote = %peer, + current_connections = current, + max_connections = connection_manager.max_connections, + %loc, + "connect_peer: transient promotion rejected due to capacity" + ); + } else { + self.bridge + .op_manager + .ring + .add_connection(loc, peer.clone(), true) + .await; + tracing::info!( + tx = %tx, + remote = %peer, + %loc, + "connect_peer: promoted transient after admission check" + ); + } + } else { + tracing::warn!( + tx = %tx, + remote = %peer, + %loc, + "connect_peer: transient failed admission on promotion" + ); + } } + // Return the remote peer we are connected to (not our own peer key). + let resolved_peer_id = peer.clone(); callback - .send_result(Ok((peer.clone(), None))) + .send_result(Ok((resolved_peer_id, None))) .await .inspect_err(|err| { tracing::debug!( @@ -1617,9 +1644,14 @@ impl P2pConnManager { current = connection_manager.transient_count(), "Transient connection budget exhausted; dropping inbound connection" ); + if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) { + for mut cb in callbacks { + let _ = cb.send_result(Err(())).await; + } + } + state.awaiting_connection_txs.remove(&peer_id.addr); return Ok(()); } - let pending_txs = state .awaiting_connection_txs .remove(&peer_id.addr) @@ -1682,6 +1714,19 @@ impl P2pConnManager { // Only insert if connection doesn't already exist to avoid dropping existing channel let mut newly_inserted = false; if !self.connections.contains_key(&peer_id) { + if is_transient { + let cm = &self.bridge.op_manager.ring.connection_manager; + let current = cm.transient_count(); + if current >= cm.transient_budget() { + tracing::warn!( + remote = %peer_id.addr, + budget = cm.transient_budget(), + current, + "Transient connection budget exhausted; dropping inbound connection before insert" + ); + return Ok(()); + } + } let (tx, rx) = mpsc::channel(10); tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] INSERT: OutboundConnectionSuccessful - adding to connections HashMap"); self.connections.insert(peer_id.clone(), tx); @@ -1718,14 +1763,18 @@ impl P2pConnManager { let cm = connection_manager.clone(); let peer = peer_id.clone(); tokio::spawn(async move { - tokio::time::sleep(ttl).await; + sleep(ttl).await; if cm.drop_transient(&peer).is_some() { tracing::info!(%peer, "Transient connection expired; dropping"); if let Err(err) = drop_tx .send(Right(NodeEvent::DropConnection(peer.clone()))) .await { - tracing::warn!(%peer, ?err, "Failed to dispatch DropConnection for expired transient"); + tracing::warn!( + %peer, + ?err, + "Failed to dispatch DropConnection for expired transient" + ); } } }); diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 488e62cdf..a1a1cf2b8 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -2,17 +2,19 @@ use dashmap::DashMap; use parking_lot::Mutex; use rand::prelude::IndexedRandom; use std::collections::{btree_map::Entry, BTreeMap}; +use std::net::SocketAddr; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::{Duration, Instant}; use crate::topology::{Limits, TopologyManager}; use super::*; -use std::time::{Duration, Instant}; #[derive(Clone)] pub(crate) struct TransientEntry { - #[allow(dead_code)] - pub opened_at: Instant, + /// Entry tracking a transient connection that hasn't been added to the ring topology yet. + /// Transient connections are typically unsolicited inbound connections to gateways. + /// Advertised location for the transient peer, if known at admission time. pub location: Option, } @@ -21,7 +23,6 @@ pub(crate) struct ConnectionManager { open_connections: Arc, reserved_connections: Arc, pub(super) location_for_peer: Arc>>, - pending_locations: Arc>>, pub(super) topology_manager: Arc>, connections_by_location: Arc>>>, /// Interim connections ongoing handshake or successfully open connections @@ -121,7 +122,6 @@ impl ConnectionManager { Self { connections_by_location: Arc::new(RwLock::new(BTreeMap::new())), location_for_peer: Arc::new(RwLock::new(BTreeMap::new())), - pending_locations: Arc::new(RwLock::new(BTreeMap::new())), open_connections: Arc::new(AtomicUsize::new(0)), reserved_connections: Arc::new(AtomicUsize::new(0)), topology_manager, @@ -164,16 +164,6 @@ impl ConnectionManager { "should_accept: evaluating direct acceptance guard" ); - if self.has_connection_or_pending(peer_id) { - tracing::debug!( - %peer_id, - open, - reserved_before, - "Peer already connected; rejecting duplicate reservation" - ); - return false; - } - if self.is_gateway && (open > 0 || reserved_before > 0) { tracing::info!( %peer_id, @@ -230,51 +220,59 @@ impl ConnectionManager { } }; - let accepted = if open == 0 { + if open == 0 { tracing::debug!(%peer_id, "should_accept: first connection -> accepting"); - true - } else { - const GATEWAY_DIRECT_ACCEPT_LIMIT: usize = 2; - if self.is_gateway { - let direct_total = open + reserved_before; - if direct_total >= GATEWAY_DIRECT_ACCEPT_LIMIT { - tracing::info!( - %peer_id, - open, - reserved_before, - limit = GATEWAY_DIRECT_ACCEPT_LIMIT, - "Gateway reached direct-accept limit; forwarding join request instead" - ); - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - tracing::info!(%peer_id, "should_accept: gateway direct-accept limit hit, forwarding instead"); - return false; - } - } - - if total_conn < self.min_connections { - tracing::info!(%peer_id, total_conn, "should_accept: accepted (below min connections)"); - true - } else if total_conn >= self.max_connections { - tracing::info!(%peer_id, total_conn, "should_accept: rejected (max connections reached)"); - false - } else { - let accepted = self - .topology_manager - .write() - .evaluate_new_connection(location, Instant::now()) - .unwrap_or(true); + self.record_pending_location(peer_id, location); + return true; + } + const GATEWAY_DIRECT_ACCEPT_LIMIT: usize = 2; + if self.is_gateway { + let direct_total = open + reserved_before; + if direct_total >= GATEWAY_DIRECT_ACCEPT_LIMIT { tracing::info!( %peer_id, - total_conn, - accepted, - "should_accept: topology manager decision" + open, + reserved_before, + limit = GATEWAY_DIRECT_ACCEPT_LIMIT, + "Gateway reached direct-accept limit; forwarding join request instead" ); - accepted + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + tracing::info!(%peer_id, "should_accept: gateway direct-accept limit hit, forwarding instead"); + return false; } - }; + } + if self.location_for_peer.read().get(peer_id).is_some() { + // We've already accepted this peer (pending or active); treat as a no-op acceptance. + tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + return true; + } + + let accepted = if total_conn < self.min_connections { + tracing::info!(%peer_id, total_conn, "should_accept: accepted (below min connections)"); + true + } else if total_conn >= self.max_connections { + tracing::info!(%peer_id, total_conn, "should_accept: rejected (max connections reached)"); + false + } else { + let accepted = self + .topology_manager + .write() + .evaluate_new_connection(location, Instant::now()) + .unwrap_or(true); + + tracing::info!( + %peer_id, + total_conn, + accepted, + "should_accept: topology manager decision" + ); + accepted + }; tracing::info!( %peer_id, accepted, @@ -297,11 +295,13 @@ impl ConnectionManager { /// Record the advertised location for a peer that we have decided to accept. /// - /// Pending peers are tracked separately so that other operations cannot route through them - /// until the handshake is fully complete. Once the connection is established the entry is - /// removed automatically via `prune_in_transit_connection`. + /// This tracks the advertised location for pending handshakes so we can de-duplicate + /// concurrent attempts. Routing still relies on `connections_by_location`, so this + /// does not make the peer routable until the connection is fully established. + /// The entry is removed automatically if the handshake fails via + /// `prune_in_transit_connection`. pub fn record_pending_location(&self, peer_id: &PeerId, location: Location) { - let mut locations = self.pending_locations.write(); + let mut locations = self.location_for_peer.write(); let entry = locations.entry(peer_id.clone()); match entry { Entry::Occupied(_) => { @@ -381,13 +381,8 @@ impl ConnectionManager { } let key = peer.clone(); - self.transient_connections.insert( - peer, - TransientEntry { - opened_at: Instant::now(), - location, - }, - ); + self.transient_connections + .insert(peer, TransientEntry { location }); let prev = self.transient_in_use.fetch_add(1, Ordering::SeqCst); if prev >= self.transient_budget { // Undo if we raced past the budget. @@ -412,26 +407,22 @@ impl ConnectionManager { removed } - #[allow(dead_code)] + /// Check whether a peer is currently tracked as transient. pub fn is_transient(&self, peer: &PeerId) -> bool { self.transient_connections.contains_key(peer) } - #[allow(dead_code)] - pub fn is_transient_addr(&self, addr: &SocketAddr) -> bool { - self.transient_connections - .iter() - .any(|entry| entry.key().addr == *addr) - } - + /// Current number of tracked transient connections. pub fn transient_count(&self) -> usize { self.transient_in_use.load(Ordering::Acquire) } + /// Maximum transient slots allowed. pub fn transient_budget(&self) -> usize { self.transient_budget } + /// Time-to-live for transients before automatic drop. pub fn transient_ttl(&self) -> Duration { self.transient_ttl } @@ -458,10 +449,6 @@ impl ConnectionManager { pub fn add_connection(&self, loc: Location, peer: PeerId, was_reserved: bool) { tracing::info!(%peer, %loc, %was_reserved, "Adding connection to topology"); debug_assert!(self.get_peer_key().expect("should be set") != peer); - { - let mut pending = self.pending_locations.write(); - pending.remove(&peer); - } if was_reserved { let old = self .reserved_connections @@ -540,35 +527,26 @@ impl ConnectionManager { let connection_type = if is_alive { "active" } else { "in transit" }; tracing::debug!(%peer, "Pruning {} connection", connection_type); - let loc = if is_alive { - let mut locations_for_peer = self.location_for_peer.write(); - match locations_for_peer.remove(peer) { - Some(loc) => { - let conns = &mut *self.connections_by_location.write(); - if let Some(conns) = conns.get_mut(&loc) { - if let Some(pos) = conns.iter().position(|c| &c.location.peer == peer) { - conns.swap_remove(pos); - } - } - loc - } - None => { - tracing::debug!("no location found for peer, skip pruning"); - return None; - } - } - } else { - match self.pending_locations.write().remove(peer) { - Some(loc) => loc, - None => { - tracing::debug!("no pending location found for peer, skip pruning"); - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - return None; - } + let mut locations_for_peer = self.location_for_peer.write(); + + let Some(loc) = locations_for_peer.remove(peer) else { + if is_alive { + tracing::debug!("no location found for peer, skip pruning"); + return None; + } else { + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); } + return None; }; + let conns = &mut *self.connections_by_location.write(); + if let Some(conns) = conns.get_mut(&loc) { + if let Some(pos) = conns.iter().position(|c| &c.location.peer == peer) { + conns.swap_remove(pos); + } + } + if is_alive { self.open_connections .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); @@ -609,6 +587,9 @@ impl ConnectionManager { let connections = self.connections_by_location.read(); let peers = connections.values().filter_map(|conns| { let conn = conns.choose(&mut rand::rng())?; + if self.is_transient(&conn.location.peer) { + return None; + } if let Some(requester) = requesting { if requester == &conn.location.peer { return None; @@ -637,118 +618,6 @@ impl ConnectionManager { } pub fn has_connection_or_pending(&self, peer: &PeerId) -> bool { - if self.location_for_peer.read().contains_key(peer) { - return true; - } - self.pending_locations.read().contains_key(peer) - } - - #[cfg(test)] - pub(crate) fn is_pending_connection(&self, peer: &PeerId) -> bool { - self.pending_locations.read().contains_key(peer) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::topology::rate::Rate; - use crate::transport::TransportKeypair; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use std::sync::atomic::{AtomicU64, Ordering}; - use std::time::Duration; - - fn make_connection_manager() -> ConnectionManager { - let keypair = TransportKeypair::new(); - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4100); - let own_location = Location::from_address(&addr); - let atomic_loc = AtomicU64::new(u64::from_le_bytes(own_location.as_f64().to_le_bytes())); - let self_peer = PeerId::new(addr, keypair.public().clone()); - ConnectionManager::init( - Rate::new_per_second(10_000.0), - Rate::new_per_second(10_000.0), - 1, - 32, - 4, - (keypair.public().clone(), Some(self_peer), atomic_loc), - false, - 4, - Duration::from_secs(30), - ) - } - - fn make_peer(port: u16) -> PeerId { - let keypair = TransportKeypair::new(); - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - PeerId::new(addr, keypair.public().clone()) - } - - #[test] - fn pending_connections_hidden_from_known_locations() { - let manager = make_connection_manager(); - let peer = make_peer(4200); - let location = Location::from_address(&peer.addr); - - assert!(manager.should_accept(location, &peer)); - assert!(manager.is_pending_connection(&peer)); - assert!( - !manager.get_known_locations().contains_key(&peer), - "pending connection leaked into established pool" - ); - - let restored = manager - .prune_in_transit_connection(&peer) - .expect("pending location should exist"); - assert_eq!(restored, location); - - manager.add_connection(restored, peer.clone(), false); - assert!( - !manager.is_pending_connection(&peer), - "pending slot should be cleared after promotion" - ); - - let known = manager.get_known_locations(); - assert_eq!(known.get(&peer), Some(&location)); - } - - #[test] - fn should_accept_does_not_leak_reservations_for_duplicate_peer() { - let keypair = TransportKeypair::new(); - let peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 20_000); - let peer_id = PeerId::new(peer_addr, keypair.public().clone()); - let location = Location::from_address(&peer_addr); - - let manager = ConnectionManager::init( - Rate::new_per_second(1_000_000.0), - Rate::new_per_second(1_000_000.0), - Ring::DEFAULT_MIN_CONNECTIONS, - Ring::DEFAULT_MAX_CONNECTIONS, - Ring::DEFAULT_RAND_WALK_ABOVE_HTL, - (keypair.public().clone(), None, AtomicU64::new(0)), - false, - 32, - Duration::from_secs(30), - ); - - assert!(manager.should_accept(location, &peer_id)); - let after_first = manager.reserved_connections.load(Ordering::SeqCst); - assert_eq!(after_first, 1); - { - assert!( - manager.is_pending_connection(&peer_id), - "pending connection should be tracked separately after initial acceptance" - ); - } - - // Second attempt for the same peer should be rejected immediately. - assert!( - !manager.should_accept(location, &peer_id), - "duplicate peer should be rejected by should_accept" - ); - assert_eq!( - manager.reserved_connections.load(Ordering::SeqCst), - after_first, - "repeat should_accept calls should not leak reservations for an existing peer" - ); + self.location_for_peer.read().contains_key(peer) } } diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index ee9afcb57..6233d71e2 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -4,7 +4,6 @@ //! and routes requests to the optimal peers. use std::collections::{BTreeSet, HashSet}; -use std::net::SocketAddr; use std::{ sync::{atomic::AtomicU64, Arc, Weak}, time::{Duration, Instant}, diff --git a/crates/core/tests/test_network_integration.rs b/crates/core/tests/test_network_integration.rs index f433ec932..e130d93fb 100644 --- a/crates/core/tests/test_network_integration.rs +++ b/crates/core/tests/test_network_integration.rs @@ -7,24 +7,17 @@ use freenet_test_network::TestNetwork; use testresult::TestResult; use tokio_tungstenite::connect_async; -// Helper to get or create network -async fn get_network() -> &'static TestNetwork { - use tokio::sync::OnceCell; - static NETWORK: OnceCell = OnceCell::const_new(); - - NETWORK - .get_or_init(|| async { - TestNetwork::builder() - .gateways(1) - .peers(2) - .binary(freenet_test_network::FreenetBinary::CurrentCrate( - freenet_test_network::BuildProfile::Debug, - )) - .build() - .await - .expect("Failed to start test network") - }) +// Build a fresh network for each test to avoid static Sync requirements +async fn get_network() -> TestNetwork { + TestNetwork::builder() + .gateways(1) + .peers(2) + .binary(freenet_test_network::FreenetBinary::CurrentCrate( + freenet_test_network::BuildProfile::Debug, + )) + .build() .await + .expect("Failed to start test network") } #[tokio::test] diff --git a/crates/core/tests/token_expiration.rs b/crates/core/tests/token_expiration.rs index e51f10cf1..ff37cc72a 100644 --- a/crates/core/tests/token_expiration.rs +++ b/crates/core/tests/token_expiration.rs @@ -38,6 +38,8 @@ async fn create_test_config( network_api: NetworkArgs { address: Some(Ipv4Addr::LOCALHOST.into()), network_port: Some(network_socket.local_addr()?.port()), + transient_budget: None, + transient_ttl_secs: None, ..Default::default() }, config_paths: freenet::config::ConfigPathsArgs { @@ -130,6 +132,8 @@ async fn test_default_token_configuration() -> TestResult { network_api: NetworkArgs { address: Some(Ipv4Addr::LOCALHOST.into()), network_port: Some(network_socket.local_addr()?.port()), + transient_budget: None, + transient_ttl_secs: None, ..Default::default() }, config_paths: freenet::config::ConfigPathsArgs {