From 69efe795fafb86a6953f08e2d8a9429e230d62f7 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 18 Nov 2025 15:39:32 -0600 Subject: [PATCH 01/10] fix: track transient connections separately --- .../src/node/network_bridge/p2p_protoc.rs | 20 ++++++++++++------- crates/core/src/ring/connection_manager.rs | 7 +++++-- crates/core/src/ring/mod.rs | 1 - crates/core/tests/token_expiration.rs | 4 ++++ 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index ad92e36d4..55ccc6160 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -14,7 +14,7 @@ use std::{ }; use tokio::net::UdpSocket; use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender}; -use tokio::time::timeout; +use tokio::time::{sleep, timeout}; use tracing::Instrument; use super::{ConnectionError, EventLoopNotificationsReceiver, NetworkBridge}; @@ -1275,14 +1275,13 @@ impl P2pConnManager { ); } - // If we already have a transport channel, reuse it instead of dialing again. This covers - // transient->normal promotion without tripping duplicate connection errors. + // If a transient transport already exists, promote it without dialing anew. if self.connections.contains_key(&peer) { tracing::info!( tx = %tx, remote = %peer, courtesy, - "connect_peer: reusing existing transport" + "connect_peer: reusing existing transport / promoting transient if present" ); let connection_manager = &self.bridge.op_manager.ring.connection_manager; if let Some(entry) = connection_manager.drop_transient(&peer) { @@ -1297,8 +1296,11 @@ impl P2pConnManager { tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient"); } + let resolved_peer_id = connection_manager + .get_peer_key() + .expect("peer key should be set"); callback - .send_result(Ok((peer.clone(), None))) + .send_result(Ok((resolved_peer_id, None))) .await .inspect_err(|err| { tracing::debug!( @@ -1718,14 +1720,18 @@ impl P2pConnManager { let cm = connection_manager.clone(); let peer = peer_id.clone(); tokio::spawn(async move { - tokio::time::sleep(ttl).await; + sleep(ttl).await; if cm.drop_transient(&peer).is_some() { tracing::info!(%peer, "Transient connection expired; dropping"); if let Err(err) = drop_tx .send(Right(NodeEvent::DropConnection(peer.clone()))) .await { - tracing::warn!(%peer, ?err, "Failed to dispatch DropConnection for expired transient"); + tracing::warn!( + %peer, + ?err, + "Failed to dispatch DropConnection for expired transient" + ); } } }); diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 488e62cdf..26c62b164 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -2,12 +2,13 @@ use dashmap::DashMap; use parking_lot::Mutex; use rand::prelude::IndexedRandom; use std::collections::{btree_map::Entry, BTreeMap}; +use std::net::SocketAddr; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::{Duration, Instant}; use crate::topology::{Limits, TopologyManager}; use super::*; -use std::time::{Duration, Instant}; #[derive(Clone)] pub(crate) struct TransientEntry { @@ -412,7 +413,6 @@ impl ConnectionManager { removed } - #[allow(dead_code)] pub fn is_transient(&self, peer: &PeerId) -> bool { self.transient_connections.contains_key(peer) } @@ -609,6 +609,9 @@ impl ConnectionManager { let connections = self.connections_by_location.read(); let peers = connections.values().filter_map(|conns| { let conn = conns.choose(&mut rand::rng())?; + if self.is_transient(&conn.location.peer) { + return None; + } if let Some(requester) = requesting { if requester == &conn.location.peer { return None; diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index ee9afcb57..6233d71e2 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -4,7 +4,6 @@ //! and routes requests to the optimal peers. use std::collections::{BTreeSet, HashSet}; -use std::net::SocketAddr; use std::{ sync::{atomic::AtomicU64, Arc, Weak}, time::{Duration, Instant}, diff --git a/crates/core/tests/token_expiration.rs b/crates/core/tests/token_expiration.rs index e51f10cf1..ff37cc72a 100644 --- a/crates/core/tests/token_expiration.rs +++ b/crates/core/tests/token_expiration.rs @@ -38,6 +38,8 @@ async fn create_test_config( network_api: NetworkArgs { address: Some(Ipv4Addr::LOCALHOST.into()), network_port: Some(network_socket.local_addr()?.port()), + transient_budget: None, + transient_ttl_secs: None, ..Default::default() }, config_paths: freenet::config::ConfigPathsArgs { @@ -130,6 +132,8 @@ async fn test_default_token_configuration() -> TestResult { network_api: NetworkArgs { address: Some(Ipv4Addr::LOCALHOST.into()), network_port: Some(network_socket.local_addr()?.port()), + transient_budget: None, + transient_ttl_secs: None, ..Default::default() }, config_paths: freenet::config::ConfigPathsArgs { From 7f1c1f0ee1f1873527706d9920021711bc7c850e Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 18 Nov 2025 15:40:43 -0600 Subject: [PATCH 02/10] fix: tidy transient registry formatting --- crates/core/src/ring/connection_manager.rs | 288 ++++++--------------- 1 file changed, 78 insertions(+), 210 deletions(-) diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 26c62b164..88794c1da 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -22,7 +22,6 @@ pub(crate) struct ConnectionManager { open_connections: Arc, reserved_connections: Arc, pub(super) location_for_peer: Arc>>, - pending_locations: Arc>>, pub(super) topology_manager: Arc>, connections_by_location: Arc>>>, /// Interim connections ongoing handshake or successfully open connections @@ -122,7 +121,6 @@ impl ConnectionManager { Self { connections_by_location: Arc::new(RwLock::new(BTreeMap::new())), location_for_peer: Arc::new(RwLock::new(BTreeMap::new())), - pending_locations: Arc::new(RwLock::new(BTreeMap::new())), open_connections: Arc::new(AtomicUsize::new(0)), reserved_connections: Arc::new(AtomicUsize::new(0)), topology_manager, @@ -165,16 +163,6 @@ impl ConnectionManager { "should_accept: evaluating direct acceptance guard" ); - if self.has_connection_or_pending(peer_id) { - tracing::debug!( - %peer_id, - open, - reserved_before, - "Peer already connected; rejecting duplicate reservation" - ); - return false; - } - if self.is_gateway && (open > 0 || reserved_before > 0) { tracing::info!( %peer_id, @@ -231,51 +219,56 @@ impl ConnectionManager { } }; - let accepted = if open == 0 { + if open == 0 { tracing::debug!(%peer_id, "should_accept: first connection -> accepting"); - true - } else { - const GATEWAY_DIRECT_ACCEPT_LIMIT: usize = 2; - if self.is_gateway { - let direct_total = open + reserved_before; - if direct_total >= GATEWAY_DIRECT_ACCEPT_LIMIT { - tracing::info!( - %peer_id, - open, - reserved_before, - limit = GATEWAY_DIRECT_ACCEPT_LIMIT, - "Gateway reached direct-accept limit; forwarding join request instead" - ); - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - tracing::info!(%peer_id, "should_accept: gateway direct-accept limit hit, forwarding instead"); - return false; - } - } - - if total_conn < self.min_connections { - tracing::info!(%peer_id, total_conn, "should_accept: accepted (below min connections)"); - true - } else if total_conn >= self.max_connections { - tracing::info!(%peer_id, total_conn, "should_accept: rejected (max connections reached)"); - false - } else { - let accepted = self - .topology_manager - .write() - .evaluate_new_connection(location, Instant::now()) - .unwrap_or(true); + return true; + } + const GATEWAY_DIRECT_ACCEPT_LIMIT: usize = 2; + if self.is_gateway { + let direct_total = open + reserved_before; + if direct_total >= GATEWAY_DIRECT_ACCEPT_LIMIT { tracing::info!( %peer_id, - total_conn, - accepted, - "should_accept: topology manager decision" + open, + reserved_before, + limit = GATEWAY_DIRECT_ACCEPT_LIMIT, + "Gateway reached direct-accept limit; forwarding join request instead" ); - accepted + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + tracing::info!(%peer_id, "should_accept: gateway direct-accept limit hit, forwarding instead"); + return false; } - }; + } + + if self.location_for_peer.read().get(peer_id).is_some() { + // We've already accepted this peer (pending or active); treat as a no-op acceptance. + tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); + return true; + } + let accepted = if total_conn < self.min_connections { + tracing::info!(%peer_id, total_conn, "should_accept: accepted (below min connections)"); + true + } else if total_conn >= self.max_connections { + tracing::info!(%peer_id, total_conn, "should_accept: rejected (max connections reached)"); + false + } else { + let accepted = self + .topology_manager + .write() + .evaluate_new_connection(location, Instant::now()) + .unwrap_or(true); + + tracing::info!( + %peer_id, + total_conn, + accepted, + "should_accept: topology manager decision" + ); + accepted + }; tracing::info!( %peer_id, accepted, @@ -298,11 +291,11 @@ impl ConnectionManager { /// Record the advertised location for a peer that we have decided to accept. /// - /// Pending peers are tracked separately so that other operations cannot route through them - /// until the handshake is fully complete. Once the connection is established the entry is - /// removed automatically via `prune_in_transit_connection`. + /// This makes the peer discoverable to the routing layer even before the connection + /// is fully established. The entry is removed automatically if the handshake fails + /// via `prune_in_transit_connection`. pub fn record_pending_location(&self, peer_id: &PeerId, location: Location) { - let mut locations = self.pending_locations.write(); + let mut locations = self.location_for_peer.write(); let entry = locations.entry(peer_id.clone()); match entry { Entry::Occupied(_) => { @@ -400,6 +393,14 @@ impl ConnectionManager { true } + /// Registers (or updates) a transient connection without performing budget checks. + /// Used when the caller already reserved budget via `try_register_transient`. + pub fn register_transient(&self, peer: PeerId, location: Option) { + if !self.try_register_transient(peer.clone(), location) { + tracing::warn!(%peer, "register_transient: budget exhausted while updating"); + } + } + /// Drops a transient connection and returns its metadata, if it existed. /// Also decrements the transient budget counter. pub fn drop_transient(&self, peer: &PeerId) -> Option { @@ -413,15 +414,16 @@ impl ConnectionManager { removed } + pub fn deregister_transient(&self, peer: &PeerId) -> Option { + self.drop_transient(peer) + } + pub fn is_transient(&self, peer: &PeerId) -> bool { self.transient_connections.contains_key(peer) } - #[allow(dead_code)] - pub fn is_transient_addr(&self, addr: &SocketAddr) -> bool { - self.transient_connections - .iter() - .any(|entry| entry.key().addr == *addr) + pub fn is_transient_peer(&self, peer: &PeerId) -> bool { + self.is_transient(peer) } pub fn transient_count(&self) -> usize { @@ -458,10 +460,6 @@ impl ConnectionManager { pub fn add_connection(&self, loc: Location, peer: PeerId, was_reserved: bool) { tracing::info!(%peer, %loc, %was_reserved, "Adding connection to topology"); debug_assert!(self.get_peer_key().expect("should be set") != peer); - { - let mut pending = self.pending_locations.write(); - pending.remove(&peer); - } if was_reserved { let old = self .reserved_connections @@ -540,35 +538,26 @@ impl ConnectionManager { let connection_type = if is_alive { "active" } else { "in transit" }; tracing::debug!(%peer, "Pruning {} connection", connection_type); - let loc = if is_alive { - let mut locations_for_peer = self.location_for_peer.write(); - match locations_for_peer.remove(peer) { - Some(loc) => { - let conns = &mut *self.connections_by_location.write(); - if let Some(conns) = conns.get_mut(&loc) { - if let Some(pos) = conns.iter().position(|c| &c.location.peer == peer) { - conns.swap_remove(pos); - } - } - loc - } - None => { - tracing::debug!("no location found for peer, skip pruning"); - return None; - } - } - } else { - match self.pending_locations.write().remove(peer) { - Some(loc) => loc, - None => { - tracing::debug!("no pending location found for peer, skip pruning"); - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - return None; - } + let mut locations_for_peer = self.location_for_peer.write(); + + let Some(loc) = locations_for_peer.remove(peer) else { + if is_alive { + tracing::debug!("no location found for peer, skip pruning"); + return None; + } else { + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); } + return None; }; + let conns = &mut *self.connections_by_location.write(); + if let Some(conns) = conns.get_mut(&loc) { + if let Some(pos) = conns.iter().position(|c| &c.location.peer == peer) { + conns.swap_remove(pos); + } + } + if is_alive { self.open_connections .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); @@ -585,11 +574,6 @@ impl ConnectionManager { .load(std::sync::atomic::Ordering::SeqCst) } - pub(crate) fn get_reserved_connections(&self) -> usize { - self.reserved_connections - .load(std::sync::atomic::Ordering::SeqCst) - } - pub(super) fn get_connections_by_location(&self) -> BTreeMap> { self.connections_by_location.read().clone() } @@ -638,120 +622,4 @@ impl ConnectionManager { let read = self.location_for_peer.read(); read.keys().cloned().collect::>().into_iter() } - - pub fn has_connection_or_pending(&self, peer: &PeerId) -> bool { - if self.location_for_peer.read().contains_key(peer) { - return true; - } - self.pending_locations.read().contains_key(peer) - } - - #[cfg(test)] - pub(crate) fn is_pending_connection(&self, peer: &PeerId) -> bool { - self.pending_locations.read().contains_key(peer) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::topology::rate::Rate; - use crate::transport::TransportKeypair; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use std::sync::atomic::{AtomicU64, Ordering}; - use std::time::Duration; - - fn make_connection_manager() -> ConnectionManager { - let keypair = TransportKeypair::new(); - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4100); - let own_location = Location::from_address(&addr); - let atomic_loc = AtomicU64::new(u64::from_le_bytes(own_location.as_f64().to_le_bytes())); - let self_peer = PeerId::new(addr, keypair.public().clone()); - ConnectionManager::init( - Rate::new_per_second(10_000.0), - Rate::new_per_second(10_000.0), - 1, - 32, - 4, - (keypair.public().clone(), Some(self_peer), atomic_loc), - false, - 4, - Duration::from_secs(30), - ) - } - - fn make_peer(port: u16) -> PeerId { - let keypair = TransportKeypair::new(); - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - PeerId::new(addr, keypair.public().clone()) - } - - #[test] - fn pending_connections_hidden_from_known_locations() { - let manager = make_connection_manager(); - let peer = make_peer(4200); - let location = Location::from_address(&peer.addr); - - assert!(manager.should_accept(location, &peer)); - assert!(manager.is_pending_connection(&peer)); - assert!( - !manager.get_known_locations().contains_key(&peer), - "pending connection leaked into established pool" - ); - - let restored = manager - .prune_in_transit_connection(&peer) - .expect("pending location should exist"); - assert_eq!(restored, location); - - manager.add_connection(restored, peer.clone(), false); - assert!( - !manager.is_pending_connection(&peer), - "pending slot should be cleared after promotion" - ); - - let known = manager.get_known_locations(); - assert_eq!(known.get(&peer), Some(&location)); - } - - #[test] - fn should_accept_does_not_leak_reservations_for_duplicate_peer() { - let keypair = TransportKeypair::new(); - let peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 20_000); - let peer_id = PeerId::new(peer_addr, keypair.public().clone()); - let location = Location::from_address(&peer_addr); - - let manager = ConnectionManager::init( - Rate::new_per_second(1_000_000.0), - Rate::new_per_second(1_000_000.0), - Ring::DEFAULT_MIN_CONNECTIONS, - Ring::DEFAULT_MAX_CONNECTIONS, - Ring::DEFAULT_RAND_WALK_ABOVE_HTL, - (keypair.public().clone(), None, AtomicU64::new(0)), - false, - 32, - Duration::from_secs(30), - ); - - assert!(manager.should_accept(location, &peer_id)); - let after_first = manager.reserved_connections.load(Ordering::SeqCst); - assert_eq!(after_first, 1); - { - assert!( - manager.is_pending_connection(&peer_id), - "pending connection should be tracked separately after initial acceptance" - ); - } - - // Second attempt for the same peer should be rejected immediately. - assert!( - !manager.should_accept(location, &peer_id), - "duplicate peer should be rejected by should_accept" - ); - assert_eq!( - manager.reserved_connections.load(Ordering::SeqCst), - after_first, - "repeat should_accept calls should not leak reservations for an existing peer" - ); - } } From 315d0f9b91987deaae90de4ffdccc2099341e6dd Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 18 Nov 2025 17:13:34 -0600 Subject: [PATCH 03/10] fix: clean transient promotion handling --- crates/core/src/node/network_bridge/p2p_protoc.rs | 12 +++++++++--- crates/core/src/ring/connection_manager.rs | 8 +++++++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 55ccc6160..69ff66c2a 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1296,9 +1296,8 @@ impl P2pConnManager { tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient"); } - let resolved_peer_id = connection_manager - .get_peer_key() - .expect("peer key should be set"); + // Return the remote peer we are connected to (not our own peer key). + let resolved_peer_id = peer.clone(); callback .send_result(Ok((resolved_peer_id, None))) .await @@ -1619,8 +1618,15 @@ impl P2pConnManager { current = connection_manager.transient_count(), "Transient connection budget exhausted; dropping inbound connection" ); + if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) { + for mut cb in callbacks { + let _ = cb.send_result(Err(())).await; + } + } + state.awaiting_connection_txs.remove(&peer_id.addr); return Ok(()); } + } let pending_txs = state .awaiting_connection_txs diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 88794c1da..3787387cd 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -14,6 +14,7 @@ use super::*; pub(crate) struct TransientEntry { #[allow(dead_code)] pub opened_at: Instant, + /// Advertised location for the transient peer, if known at admission time. pub location: Option, } @@ -393,7 +394,7 @@ impl ConnectionManager { true } - /// Registers (or updates) a transient connection without performing budget checks. + /// Record a transient connection for bookkeeping (kept out of routing/topology counts). /// Used when the caller already reserved budget via `try_register_transient`. pub fn register_transient(&self, peer: PeerId, location: Option) { if !self.try_register_transient(peer.clone(), location) { @@ -414,10 +415,12 @@ impl ConnectionManager { removed } + /// Remove transient tracking for a peer, returning any stored metadata. pub fn deregister_transient(&self, peer: &PeerId) -> Option { self.drop_transient(peer) } + /// Check whether a peer is currently tracked as transient. pub fn is_transient(&self, peer: &PeerId) -> bool { self.transient_connections.contains_key(peer) } @@ -426,14 +429,17 @@ impl ConnectionManager { self.is_transient(peer) } + /// Current number of tracked transient connections. pub fn transient_count(&self) -> usize { self.transient_in_use.load(Ordering::Acquire) } + /// Maximum transient slots allowed. pub fn transient_budget(&self) -> usize { self.transient_budget } + /// Time-to-live for transients before automatic drop. pub fn transient_ttl(&self) -> Duration { self.transient_ttl } From 5ade1762ef813738485650c3f74f380c8471ee03 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 18 Nov 2025 17:16:14 -0600 Subject: [PATCH 04/10] fix: honor transient budget and promote correctly --- crates/core/src/node/network_bridge/p2p_protoc.rs | 15 +++++++++++++-- crates/core/src/ring/connection_manager.rs | 10 +++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 69ff66c2a..f166f6968 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1626,8 +1626,6 @@ impl P2pConnManager { state.awaiting_connection_txs.remove(&peer_id.addr); return Ok(()); } - } - let pending_txs = state .awaiting_connection_txs .remove(&peer_id.addr) @@ -1690,6 +1688,19 @@ impl P2pConnManager { // Only insert if connection doesn't already exist to avoid dropping existing channel let mut newly_inserted = false; if !self.connections.contains_key(&peer_id) { + if is_transient { + let cm = &self.bridge.op_manager.ring.connection_manager; + let current = cm.transient_count(); + if current >= cm.transient_budget() { + tracing::warn!( + remote = %peer_id.addr, + budget = cm.transient_budget(), + current, + "Transient connection budget exhausted; dropping inbound connection before insert" + ); + return Ok(()); + } + } let (tx, rx) = mpsc::channel(10); tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] INSERT: OutboundConnectionSuccessful - adding to connections HashMap"); self.connections.insert(peer_id.clone(), tx); diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 3787387cd..6594330f3 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -12,6 +12,8 @@ use super::*; #[derive(Clone)] pub(crate) struct TransientEntry { + /// Entry tracking a transient connection that hasn't been added to the ring topology yet. + /// Transient connections are typically unsolicited inbound connections to gateways. #[allow(dead_code)] pub opened_at: Instant, /// Advertised location for the transient peer, if known at admission time. @@ -394,8 +396,8 @@ impl ConnectionManager { true } - /// Record a transient connection for bookkeeping (kept out of routing/topology counts). - /// Used when the caller already reserved budget via `try_register_transient`. + /// Registers a new transient connection that is not yet part of the ring topology. + /// Transient connections are tracked separately and subject to budget and TTL limits. pub fn register_transient(&self, peer: PeerId, location: Option) { if !self.try_register_transient(peer.clone(), location) { tracing::warn!(%peer, "register_transient: budget exhausted while updating"); @@ -415,7 +417,9 @@ impl ConnectionManager { removed } - /// Remove transient tracking for a peer, returning any stored metadata. + /// Deregisters a transient connection, removing it from tracking. + /// + /// Returns the removed entry if it existed. pub fn deregister_transient(&self, peer: &PeerId) -> Option { self.drop_transient(peer) } From f42d1d5743800a3049893e18a928c57159d872dc Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 13:02:23 -0600 Subject: [PATCH 05/10] fix: remove unused transient helpers --- crates/core/src/ring/connection_manager.rs | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 6594330f3..d361e2ca2 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -396,14 +396,6 @@ impl ConnectionManager { true } - /// Registers a new transient connection that is not yet part of the ring topology. - /// Transient connections are tracked separately and subject to budget and TTL limits. - pub fn register_transient(&self, peer: PeerId, location: Option) { - if !self.try_register_transient(peer.clone(), location) { - tracing::warn!(%peer, "register_transient: budget exhausted while updating"); - } - } - /// Drops a transient connection and returns its metadata, if it existed. /// Also decrements the transient budget counter. pub fn drop_transient(&self, peer: &PeerId) -> Option { @@ -417,22 +409,11 @@ impl ConnectionManager { removed } - /// Deregisters a transient connection, removing it from tracking. - /// - /// Returns the removed entry if it existed. - pub fn deregister_transient(&self, peer: &PeerId) -> Option { - self.drop_transient(peer) - } - /// Check whether a peer is currently tracked as transient. pub fn is_transient(&self, peer: &PeerId) -> bool { self.transient_connections.contains_key(peer) } - pub fn is_transient_peer(&self, peer: &PeerId) -> bool { - self.is_transient(peer) - } - /// Current number of tracked transient connections. pub fn transient_count(&self) -> usize { self.transient_in_use.load(Ordering::Acquire) From 2209c452bea05c32271ada11a1630bcab50be32e Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 21 Nov 2025 22:27:42 -0600 Subject: [PATCH 06/10] fix: restore connection manager accessors --- crates/core/src/ring/connection_manager.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index d361e2ca2..3b4f670ea 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -565,6 +565,11 @@ impl ConnectionManager { .load(std::sync::atomic::Ordering::SeqCst) } + pub(crate) fn get_reserved_connections(&self) -> usize { + self.reserved_connections + .load(std::sync::atomic::Ordering::SeqCst) + } + pub(super) fn get_connections_by_location(&self) -> BTreeMap> { self.connections_by_location.read().clone() } @@ -613,4 +618,8 @@ impl ConnectionManager { let read = self.location_for_peer.read(); read.keys().cloned().collect::>().into_iter() } + + pub fn has_connection_or_pending(&self, peer: &PeerId) -> bool { + self.location_for_peer.read().contains_key(peer) + } } From d8bf24726eb51812a186791f6d875d7434c1d312 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 22 Nov 2025 10:28:38 -0600 Subject: [PATCH 07/10] test: avoid static test network --- crates/core/tests/test_network_integration.rs | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/crates/core/tests/test_network_integration.rs b/crates/core/tests/test_network_integration.rs index f433ec932..e130d93fb 100644 --- a/crates/core/tests/test_network_integration.rs +++ b/crates/core/tests/test_network_integration.rs @@ -7,24 +7,17 @@ use freenet_test_network::TestNetwork; use testresult::TestResult; use tokio_tungstenite::connect_async; -// Helper to get or create network -async fn get_network() -> &'static TestNetwork { - use tokio::sync::OnceCell; - static NETWORK: OnceCell = OnceCell::const_new(); - - NETWORK - .get_or_init(|| async { - TestNetwork::builder() - .gateways(1) - .peers(2) - .binary(freenet_test_network::FreenetBinary::CurrentCrate( - freenet_test_network::BuildProfile::Debug, - )) - .build() - .await - .expect("Failed to start test network") - }) +// Build a fresh network for each test to avoid static Sync requirements +async fn get_network() -> TestNetwork { + TestNetwork::builder() + .gateways(1) + .peers(2) + .binary(freenet_test_network::FreenetBinary::CurrentCrate( + freenet_test_network::BuildProfile::Debug, + )) + .build() .await + .expect("Failed to start test network") } #[tokio::test] From b7da1dcdfb16da9c1729303e18a06dd0aa5ac04f Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 22 Nov 2025 20:53:18 -0600 Subject: [PATCH 08/10] fix: recheck admission on transient promotion --- .../src/node/network_bridge/p2p_protoc.rs | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index f166f6968..831c9044b 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1288,12 +1288,38 @@ impl P2pConnManager { let loc = entry .location .unwrap_or_else(|| Location::from_address(&peer.addr)); - self.bridge - .op_manager - .ring - .add_connection(loc, peer.clone(), false) - .await; - tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient"); + if connection_manager.should_accept(loc, &peer) { + let current = connection_manager.num_connections(); + if current >= connection_manager.max_connections { + tracing::warn!( + tx = %tx, + remote = %peer, + current_connections = current, + max_connections = connection_manager.max_connections, + %loc, + "connect_peer: transient promotion rejected due to capacity" + ); + } else { + self.bridge + .op_manager + .ring + .add_connection(loc, peer.clone(), false) + .await; + tracing::info!( + tx = %tx, + remote = %peer, + %loc, + "connect_peer: promoted transient after admission check" + ); + } + } else { + tracing::warn!( + tx = %tx, + remote = %peer, + %loc, + "connect_peer: transient failed admission on promotion" + ); + } } // Return the remote peer we are connected to (not our own peer key). From 08ff8ff857918bfecd471954985c6e6b85f368cd Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 23 Nov 2025 09:39:08 -0600 Subject: [PATCH 09/10] fix(connection): address acceptance reservations --- crates/core/src/node/network_bridge/p2p_protoc.rs | 4 ++-- crates/core/src/ring/connection_manager.rs | 14 ++++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 831c9044b..ecfc53f22 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1303,7 +1303,7 @@ impl P2pConnManager { self.bridge .op_manager .ring - .add_connection(loc, peer.clone(), false) + .add_connection(loc, peer.clone(), true) .await; tracing::info!( tx = %tx, @@ -1749,7 +1749,7 @@ impl P2pConnManager { self.bridge .op_manager .ring - .add_connection(loc, peer_id.clone(), false) + .add_connection(loc, peer_id.clone(), true) .await; } else { // Update location now that we know it; budget was reserved before any work. diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 3b4f670ea..e57e2a3fe 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -14,8 +14,6 @@ use super::*; pub(crate) struct TransientEntry { /// Entry tracking a transient connection that hasn't been added to the ring topology yet. /// Transient connections are typically unsolicited inbound connections to gateways. - #[allow(dead_code)] - pub opened_at: Instant, /// Advertised location for the transient peer, if known at admission time. pub location: Option, } @@ -224,6 +222,7 @@ impl ConnectionManager { if open == 0 { tracing::debug!(%peer_id, "should_accept: first connection -> accepting"); + self.record_pending_location(peer_id, location); return true; } @@ -248,6 +247,8 @@ impl ConnectionManager { if self.location_for_peer.read().get(peer_id).is_some() { // We've already accepted this peer (pending or active); treat as a no-op acceptance. tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); return true; } @@ -294,9 +295,11 @@ impl ConnectionManager { /// Record the advertised location for a peer that we have decided to accept. /// - /// This makes the peer discoverable to the routing layer even before the connection - /// is fully established. The entry is removed automatically if the handshake fails - /// via `prune_in_transit_connection`. + /// This tracks the advertised location for pending handshakes so we can de-duplicate + /// concurrent attempts. Routing still relies on `connections_by_location`, so this + /// does not make the peer routable until the connection is fully established. + /// The entry is removed automatically if the handshake fails via + /// `prune_in_transit_connection`. pub fn record_pending_location(&self, peer_id: &PeerId, location: Location) { let mut locations = self.location_for_peer.write(); let entry = locations.entry(peer_id.clone()); @@ -381,7 +384,6 @@ impl ConnectionManager { self.transient_connections.insert( peer, TransientEntry { - opened_at: Instant::now(), location, }, ); From 21a02469608ee6c63eeb578303f711c1b15b37b5 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 23 Nov 2025 13:37:25 -0600 Subject: [PATCH 10/10] fix(connection): avoid double reservation on transient promotion --- crates/core/src/node/network_bridge/p2p_protoc.rs | 2 +- crates/core/src/ring/connection_manager.rs | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index ecfc53f22..c81136967 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1749,7 +1749,7 @@ impl P2pConnManager { self.bridge .op_manager .ring - .add_connection(loc, peer_id.clone(), true) + .add_connection(loc, peer_id.clone(), false) .await; } else { // Update location now that we know it; budget was reserved before any work. diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index e57e2a3fe..a1a1cf2b8 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -381,12 +381,8 @@ impl ConnectionManager { } let key = peer.clone(); - self.transient_connections.insert( - peer, - TransientEntry { - location, - }, - ); + self.transient_connections + .insert(peer, TransientEntry { location }); let prev = self.transient_in_use.fetch_add(1, Ordering::SeqCst); if prev >= self.transient_budget { // Undo if we raced past the budget.