From 424617dabdf7f05fc78ac25809a31d10174526c4 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 21:35:00 -0600 Subject: [PATCH 01/13] refactor: rename courtesy links to transient --- .../core/src/node/network_bridge/handshake.rs | 95 +++++----- .../src/node/network_bridge/p2p_protoc.rs | 123 ++++--------- crates/core/src/operations/connect.rs | 100 ++++++----- crates/core/src/ring/connection_manager.rs | 169 +++++++++++------- 4 files changed, 238 insertions(+), 249 deletions(-) diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index d3def07e1..c2f3c8b31 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -31,21 +31,21 @@ pub(crate) enum Event { transaction: Option, peer: Option, connection: PeerConnection, - courtesy: bool, + transient: bool, }, /// An outbound connection attempt succeeded. OutboundEstablished { transaction: Transaction, peer: PeerId, connection: PeerConnection, - courtesy: bool, + transient: bool, }, /// An outbound connection attempt failed. OutboundFailed { transaction: Transaction, peer: PeerId, error: ConnectionError, - courtesy: bool, + transient: bool, }, } @@ -56,13 +56,13 @@ pub(crate) enum Command { Connect { peer: PeerId, transaction: Transaction, - courtesy: bool, + transient: bool, }, /// Register expectation for an inbound connection from `peer`. ExpectInbound { peer: PeerId, transaction: Option, - courtesy: bool, + transient: bool, }, /// Remove state associated with `peer`. DropConnection { peer: PeerId }, @@ -122,37 +122,35 @@ impl Stream for HandshakeHandler { struct ExpectedInbound { peer: PeerId, transaction: Option, - courtesy: bool, + transient: bool, // TODO: rename to transient in protocol once we migrate terminology } #[derive(Default)] struct ExpectedInboundTracker { - // Keyed by remote IP to tolerate port changes; multiple expectations per IP - // are tracked and deduped by port. entries: HashMap>, } impl ExpectedInboundTracker { - fn register(&mut self, peer: PeerId, transaction: Option, courtesy: bool) { + fn register(&mut self, peer: PeerId, transaction: Option, transient: bool) { tracing::debug!( remote = %peer.addr, - courtesy, + transient, tx = ?transaction, "ExpectInbound: registering expectation" ); - let list = self.entries.entry(peer.addr.ip()).or_default(); - // Replace any existing expectation for the same peer/port so the newest wins. - list.retain(|entry| entry.peer.addr.port() != peer.addr.port()); - list.push(ExpectedInbound { - peer, - transaction, - courtesy, - }); + self.entries + .entry(peer.addr.ip()) + .or_default() + .push(ExpectedInbound { + peer, + transaction, + transient, + }); } fn drop_peer(&mut self, peer: &PeerId) { if let Some(list) = self.entries.get_mut(&peer.addr.ip()) { - list.retain(|entry| entry.peer.addr.port() != peer.addr.port()); + list.retain(|entry| entry.peer != *peer); if list.is_empty() { self.entries.remove(&peer.addr.ip()); } @@ -160,26 +158,33 @@ impl ExpectedInboundTracker { } fn consume(&mut self, addr: SocketAddr) -> Option { - let list = self.entries.get_mut(&addr.ip())?; - let pos = list + let ip = addr.ip(); + let list = self.entries.get_mut(&ip)?; + if let Some(pos) = list .iter() - .position(|entry| entry.peer.addr.port() == addr.port())?; - let entry = list.swap_remove(pos); + .position(|entry| entry.peer.addr.port() == addr.port()) + { + let entry = list.remove(pos); + if list.is_empty() { + self.entries.remove(&ip); + } + tracing::debug!(remote = %addr, peer = %entry.peer.addr, transient = entry.transient, tx = ?entry.transaction, "ExpectInbound: matched by exact port"); + return Some(entry); + } + let entry = list.pop(); if list.is_empty() { - self.entries.remove(&addr.ip()); + self.entries.remove(&ip); + } + if let Some(entry) = entry { + tracing::debug!(remote = %addr, peer = %entry.peer.addr, transient = entry.transient, tx = ?entry.transaction, "ExpectInbound: matched by IP fallback"); + return Some(entry); } - Some(entry) + None } #[cfg(test)] fn contains(&self, addr: SocketAddr) -> bool { - self.entries - .get(&addr.ip()) - .map(|list| { - list.iter() - .any(|entry| entry.peer.addr.port() == addr.port()) - }) - .unwrap_or(false) + self.entries.contains_key(&addr) } } @@ -197,12 +202,12 @@ async fn run_driver( loop { select! { command = commands_rx.recv() => match command { - Some(Command::Connect { peer, transaction, courtesy }) => { - spawn_outbound(outbound.clone(), events_tx.clone(), peer, transaction, courtesy, peer_ready.clone()); - } - Some(Command::ExpectInbound { peer, transaction, courtesy }) => { - expected_inbound.register(peer, transaction, courtesy); + Some(Command::Connect { peer, transaction, transient }) => { + spawn_outbound(outbound.clone(), events_tx.clone(), peer, transaction, transient, peer_ready.clone()); } + Some(Command::ExpectInbound { peer, transaction, transient }) => { + expected_inbound.register(peer, transaction, transient /* transient */); + } Some(Command::DropConnection { peer }) => { expected_inbound.drop_peer(&peer); } @@ -217,8 +222,8 @@ async fn run_driver( let remote_addr = conn.remote_addr(); let entry = expected_inbound.consume(remote_addr); - let (peer, transaction, courtesy) = if let Some(entry) = entry { - (Some(entry.peer), entry.transaction, entry.courtesy) + let (peer, transaction, transient) = if let Some(entry) = entry { + (Some(entry.peer), entry.transaction, entry.transient) } else { (None, None, false) }; @@ -227,7 +232,7 @@ async fn run_driver( transaction, peer, connection: conn, - courtesy, + transient, }).await.is_err() { break; } @@ -244,7 +249,7 @@ fn spawn_outbound( events_tx: mpsc::Sender, peer: PeerId, transaction: Transaction, - courtesy: bool, + transient: bool, peer_ready: Option>, ) { tokio::spawn(async move { @@ -268,13 +273,13 @@ fn spawn_outbound( transaction, peer: peer.clone(), connection, - courtesy, + transient, }, Err(error) => Event::OutboundFailed { transaction, peer: peer.clone(), error, - courtesy, + transient, }, }; @@ -307,7 +312,7 @@ mod tests { .expect("expected registered inbound entry"); assert_eq!(entry.peer, peer); assert_eq!(entry.transaction, Some(tx)); - assert!(entry.courtesy); + assert!(entry.transient); assert!(tracker.consume(peer.addr).is_none()); } @@ -335,6 +340,6 @@ mod tests { .consume(peer.addr) .expect("entry should be present after overwrite"); assert_eq!(entry.transaction, Some(new_tx)); - assert!(entry.courtesy); + assert!(entry.transient); } } diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index caee74179..391560f66 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -635,12 +635,6 @@ impl P2pConnManager { "Failed to enqueue DropConnection command" ); } - // Immediately prune topology counters so we don't leak open connection slots. - ctx.bridge - .op_manager - .ring - .prune_connection(peer.clone()) - .await; if let Some(conn) = ctx.connections.remove(&peer) { // TODO: review: this could potentially leave garbage tasks in the background with peer listener match timeout( @@ -697,7 +691,7 @@ impl P2pConnManager { .send(HandshakeCommand::ExpectInbound { peer: peer.clone(), transaction: None, - courtesy: false, + transient: false, }) .await { @@ -834,23 +828,15 @@ impl P2pConnManager { // Collect network information if config.include_network_info { - let cm = &op_manager.ring.connection_manager; - let connections_by_loc = cm.get_connections_by_location(); - let mut connected_peers = Vec::new(); - for conns in connections_by_loc.values() { - for conn in conns { - connected_peers.push(( - conn.location.peer.to_string(), - conn.location.peer.addr.to_string(), - )); - } - } - connected_peers.sort_by(|a, b| a.0.cmp(&b.0)); - connected_peers.dedup_by(|a, b| a.0 == b.0); + let connected_peers: Vec<_> = ctx + .connections + .keys() + .map(|p| (p.to_string(), p.addr.to_string())) + .collect(); response.network_info = Some(NetworkInfo { - active_connections: connected_peers.len(), connected_peers, + active_connections: ctx.connections.len(), }); } @@ -929,43 +915,28 @@ impl P2pConnManager { } } - // Collect topology-backed connection info (exclude transient transports). - let cm = &op_manager.ring.connection_manager; - let connections_by_loc = cm.get_connections_by_location(); - let mut connected_peer_ids = Vec::new(); - if config.include_detailed_peer_info { - use freenet_stdlib::client_api::ConnectedPeerInfo; - for conns in connections_by_loc.values() { - for conn in conns { - connected_peer_ids.push(conn.location.peer.to_string()); - response.connected_peers_detailed.push( - ConnectedPeerInfo { - peer_id: conn.location.peer.to_string(), - address: conn.location.peer.addr.to_string(), - }, - ); - } - } - } else { - for conns in connections_by_loc.values() { - connected_peer_ids.extend( - conns.iter().map(|c| c.location.peer.to_string()), - ); - } - } - connected_peer_ids.sort(); - connected_peer_ids.dedup(); - // Collect system metrics if config.include_system_metrics { let seeding_contracts = op_manager.ring.all_network_subscriptions().len() as u32; response.system_metrics = Some(SystemMetrics { - active_connections: connected_peer_ids.len() as u32, + active_connections: ctx.connections.len() as u32, seeding_contracts, }); } + // Collect detailed peer information if requested + if config.include_detailed_peer_info { + use freenet_stdlib::client_api::ConnectedPeerInfo; + // Populate detailed peer information from actual connections + for peer in ctx.connections.keys() { + response.connected_peers_detailed.push(ConnectedPeerInfo { + peer_id: peer.to_string(), + address: peer.addr.to_string(), + }); + } + } + match timeout( Duration::from_secs(2), callback.send(QueryResult::NodeDiagnostics(response)), @@ -1385,7 +1356,7 @@ impl P2pConnManager { .send(HandshakeCommand::Connect { peer: peer.clone(), transaction: tx, - courtesy: transient, + transient, }) .await { @@ -1453,9 +1424,9 @@ impl P2pConnManager { transaction, peer, connection, - courtesy, + transient, } => { - tracing::info!(provided = ?peer, transient = courtesy, tx = ?transaction, "InboundConnection event"); + tracing::info!(provided = ?peer, transient = transient, tx = ?transaction, "InboundConnection event"); let _conn_manager = &self.bridge.op_manager.ring.connection_manager; let remote_addr = connection.remote_addr(); @@ -1463,7 +1434,7 @@ impl P2pConnManager { if blocked_addrs.contains(&remote_addr) { tracing::info!( remote = %remote_addr, - transient = courtesy, + transient = transient, transaction = ?transaction, "Inbound connection blocked by local policy" ); @@ -1475,7 +1446,7 @@ impl P2pConnManager { let peer_id = peer.unwrap_or_else(|| { tracing::info!( remote = %remote_addr, - transient = courtesy, + transient = transient, transaction = ?transaction, "Inbound connection arrived without matching expectation; accepting provisionally" ); @@ -1493,14 +1464,14 @@ impl P2pConnManager { tracing::info!( remote = %peer_id.addr, - transient = courtesy, + transient = transient, transaction = ?transaction, "Inbound connection established" ); // Treat only transient connections as transient. Normal inbound dials (including // gateway bootstrap from peers) should be promoted into the ring once established. - let is_transient = courtesy; + let is_transient = transient; self.handle_successful_connection(peer_id, connection, state, None, is_transient) .await?; @@ -1509,11 +1480,11 @@ impl P2pConnManager { transaction, peer, connection, - courtesy, + transient, } => { tracing::info!( remote = %peer.addr, - transient = courtesy, + transient = transient, transaction = %transaction, "Outbound connection established" ); @@ -1524,11 +1495,11 @@ impl P2pConnManager { transaction, peer, error, - courtesy, + transient, } => { tracing::info!( remote = %peer.addr, - transient = courtesy, + transient = transient, transaction = %transaction, ?error, "Outbound connection failed" @@ -1550,7 +1521,7 @@ impl P2pConnManager { remote = %peer.addr, callbacks = callbacks.len(), pending_txs = ?pending_txs, - transient = courtesy, + transient, "Notifying callbacks after outbound failure" ); @@ -1740,43 +1711,17 @@ impl P2pConnManager { tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] SKIP INSERT: OutboundConnectionSuccessful - connection already exists in HashMap"); } - let promote_to_ring = !is_transient || connection_manager.is_gateway(); - if newly_inserted { tracing::info!(remote = %peer_id, is_transient, "handle_successful_connection: inserted new connection entry"); let pending_loc = connection_manager.prune_in_transit_connection(&peer_id); - if promote_to_ring { + if !is_transient { let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr)); - // Re-apply admission logic on promotion to avoid bypassing capacity/heuristic checks. - let should_accept = connection_manager.should_accept(loc, &peer_id); - if !should_accept { - tracing::warn!( - %peer_id, - %loc, - "handle_successful_connection: promotion rejected by admission logic" - ); - return Ok(()); - } - let current = connection_manager.connection_count(); - if current >= connection_manager.max_connections { - tracing::warn!( - %peer_id, - current_connections = current, - max_connections = connection_manager.max_connections, - %loc, - "handle_successful_connection: rejecting new connection to enforce cap" - ); - return Ok(()); - } tracing::info!(remote = %peer_id, %loc, "handle_successful_connection: promoting connection into ring"); self.bridge .op_manager .ring - .add_connection(loc, peer_id.clone(), true) + .add_connection(loc, peer_id.clone(), false) .await; - if is_transient { - connection_manager.drop_transient(&peer_id); - } } else { // Update location now that we know it; budget was reserved before any work. connection_manager.try_register_transient(peer_id.clone(), pending_loc); diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 4cf93d79b..0f55fab66 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -215,7 +215,6 @@ impl RelayState { if !self.accepted_locally && ctx.should_accept(&self.request.joiner) { self.accepted_locally = true; let acceptor = ctx.self_location().clone(); - let dist = ring_distance(acceptor.location, self.request.joiner.location); let transient = ctx.transient_hint(&acceptor, &self.request.joiner); self.transient_hint = transient; actions.accept_response = Some(ConnectResponse { @@ -223,27 +222,15 @@ impl RelayState { transient, }); actions.expect_connection_from = Some(self.request.joiner.clone()); - tracing::info!( - acceptor_peer = %acceptor.peer, - joiner_peer = %self.request.joiner.peer, - acceptor_loc = ?acceptor.location, - joiner_loc = ?self.request.joiner.location, - ring_distance = ?dist, - transient, - "connect: acceptance issued" - ); } if self.forwarded_to.is_none() && self.request.ttl > 0 { match ctx.select_next_hop(self.request.desired_location, &self.request.visited) { Some(next) => { - let dist = ring_distance(next.location, Some(self.request.desired_location)); - tracing::info!( + tracing::debug!( target = %self.request.desired_location, ttl = self.request.ttl, next_peer = %next.peer, - next_loc = ?next.location, - ring_distance_to_target = ?dist, "connect: forwarding join request to next hop" ); let mut forward_req = self.request.clone(); @@ -255,7 +242,7 @@ impl RelayState { actions.forward = Some((next, forward_snapshot)); } None => { - tracing::info!( + tracing::debug!( target = %self.request.desired_location, ttl = self.request.ttl, visited = ?self.request.visited, @@ -781,13 +768,6 @@ fn store_operation_state_with_msg(op: &mut ConnectOp, msg: Option) - } } -fn ring_distance(a: Option, b: Option) -> Option { - match (a, b) { - (Some(a), Some(b)) => Some(a.distance(b).as_f64()), - _ => None, - } -} - #[tracing::instrument(fields(peer = %op_manager.ring.connection_manager.pub_key), skip_all)] pub(crate) async fn join_ring_request( backoff: Option, @@ -800,6 +780,15 @@ pub(crate) async fn join_ring_request( OpError::ConnError(ConnectionError::LocationUnknown) })?; + tracing::debug!( + peer = %gateway.peer, + reserved_connections = op_manager + .ring + .connection_manager + .get_reserved_connections(), + "join_ring_request: evaluating gateway connection attempt" + ); + if !op_manager .ring .connection_manager @@ -884,56 +873,71 @@ pub(crate) async fn initial_join_procedure( gateways.len() ); + let mut in_flight_gateways = HashSet::new(); + loop { let open_conns = op_manager.ring.open_connections(); let unconnected_gateways: Vec<_> = op_manager.ring.is_not_connected(gateways.iter()).collect(); + let available_gateways: Vec<_> = unconnected_gateways + .into_iter() + .filter(|gateway| !in_flight_gateways.contains(&gateway.peer)) + .collect(); tracing::debug!( - "Connection status: open_connections = {}, unconnected_gateways = {}", - open_conns, - unconnected_gateways.len() + open_connections = open_conns, + inflight_gateway_dials = in_flight_gateways.len(), + available_gateways = available_gateways.len(), + "Connection status before join attempt" ); - let unconnected_count = unconnected_gateways.len(); + let available_count = available_gateways.len(); - if open_conns < BOOTSTRAP_THRESHOLD && unconnected_count > 0 { + if open_conns < BOOTSTRAP_THRESHOLD && available_count > 0 { tracing::info!( "Below bootstrap threshold ({} < {}), attempting to connect to {} gateways", open_conns, BOOTSTRAP_THRESHOLD, - number_of_parallel_connections.min(unconnected_count) + number_of_parallel_connections.min(available_count) ); - let select_all = FuturesUnordered::new(); - for gateway in unconnected_gateways + let mut select_all = FuturesUnordered::new(); + for gateway in available_gateways .into_iter() .shuffle() .take(number_of_parallel_connections) { tracing::info!(%gateway, "Attempting connection to gateway"); + in_flight_gateways.insert(gateway.peer.clone()); let op_manager = op_manager.clone(); + let gateway_clone = gateway.clone(); select_all.push(async move { - (join_ring_request(None, gateway, &op_manager).await, gateway) + ( + join_ring_request(None, &gateway_clone, &op_manager).await, + gateway_clone, + ) }); } - select_all - .for_each(|(res, gateway)| async move { - if let Err(error) = res { - if !matches!( - error, - OpError::ConnError( - crate::node::ConnectionError::UnwantedConnection - ) - ) { - tracing::error!( - %gateway, - %error, - "Failed while attempting connection to gateway" - ); - } + while let Some((res, gateway)) = select_all.next().await { + if let Err(error) = res { + if !matches!( + error, + OpError::ConnError(crate::node::ConnectionError::UnwantedConnection) + ) { + tracing::error!( + %gateway, + %error, + "Failed while attempting connection to gateway" + ); } - }) - .await; + } + in_flight_gateways.remove(&gateway.peer); + } + } else if open_conns < BOOTSTRAP_THRESHOLD && available_count == 0 { + tracing::debug!( + open_connections = open_conns, + inflight = in_flight_gateways.len(), + "Below threshold but all gateways are already connected or in-flight" + ); } else if open_conns >= BOOTSTRAP_THRESHOLD { tracing::trace!( "Have {} connections (>= threshold of {}), not attempting gateway connections", diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 96e410781..8d2744bf0 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -22,7 +22,8 @@ pub(crate) struct TransientEntry { #[derive(Clone)] pub(crate) struct ConnectionManager { - pending_reservations: Arc>>, + open_connections: Arc, + reserved_connections: Arc, pub(super) location_for_peer: Arc>>, pub(super) topology_manager: Arc>, connections_by_location: Arc>>>, @@ -123,7 +124,8 @@ impl ConnectionManager { Self { connections_by_location: Arc::new(RwLock::new(BTreeMap::new())), location_for_peer: Arc::new(RwLock::new(BTreeMap::new())), - pending_reservations: Arc::new(RwLock::new(BTreeMap::new())), + open_connections: Arc::new(AtomicUsize::new(0)), + reserved_connections: Arc::new(AtomicUsize::new(0)), topology_manager, own_location: own_location.into(), peer_key: Arc::new(Mutex::new(peer_id)), @@ -146,8 +148,12 @@ impl ConnectionManager { /// Will panic if the node checking for this condition has no location assigned. pub fn should_accept(&self, location: Location, peer_id: &PeerId) -> bool { tracing::info!("Checking if should accept connection"); - let open = self.connection_count(); - let reserved_before = self.pending_reservations.read().len(); + let open = self + .open_connections + .load(std::sync::atomic::Ordering::SeqCst); + let reserved_before = self + .reserved_connections + .load(std::sync::atomic::Ordering::SeqCst); tracing::info!( %peer_id, @@ -169,16 +175,34 @@ impl ConnectionManager { ); } - if self.location_for_peer.read().get(peer_id).is_some() { - // We've already accepted this peer (pending or active); treat as a no-op acceptance. - tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); - return true; - } - - { - let mut pending = self.pending_reservations.write(); - pending.insert(peer_id.clone(), location); - } + let reserved_before = loop { + let current = self + .reserved_connections + .load(std::sync::atomic::Ordering::SeqCst); + if current == usize::MAX { + tracing::error!( + %peer_id, + "reserved connection counter overflowed; rejecting new connection" + ); + return false; + } + match self.reserved_connections.compare_exchange( + current, + current + 1, + std::sync::atomic::Ordering::SeqCst, + std::sync::atomic::Ordering::SeqCst, + ) { + Ok(_) => break current, + Err(actual) => { + tracing::debug!( + %peer_id, + expected = current, + actual, + "reserved connection counter changed concurrently; retrying" + ); + } + } + }; let total_conn = match reserved_before .checked_add(1) @@ -192,7 +216,8 @@ impl ConnectionManager { open, "connection counters would overflow; rejecting connection" ); - self.pending_reservations.write().remove(peer_id); + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); return false; } }; @@ -202,6 +227,30 @@ impl ConnectionManager { return true; } + const GATEWAY_DIRECT_ACCEPT_LIMIT: usize = 2; + if self.is_gateway { + let direct_total = open + reserved_before; + if direct_total >= GATEWAY_DIRECT_ACCEPT_LIMIT { + tracing::info!( + %peer_id, + open, + reserved_before, + limit = GATEWAY_DIRECT_ACCEPT_LIMIT, + "Gateway reached direct-accept limit; forwarding join request instead" + ); + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + tracing::info!(%peer_id, "should_accept: gateway direct-accept limit hit, forwarding instead"); + return false; + } + } + + if self.location_for_peer.read().get(peer_id).is_some() { + // We've already accepted this peer (pending or active); treat as a no-op acceptance. + tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); + return true; + } + let accepted = if total_conn < self.min_connections { tracing::info!(%peer_id, total_conn, "should_accept: accepted (below min connections)"); true @@ -228,13 +277,14 @@ impl ConnectionManager { accepted, total_conn, open_connections = open, - reserved_connections = self.pending_reservations.read().len(), - max_connections = self.max_connections, - min_connections = self.min_connections, + reserved_connections = self + .reserved_connections + .load(std::sync::atomic::Ordering::SeqCst), "should_accept: final decision" ); if !accepted { - self.pending_reservations.write().remove(peer_id); + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); } else { tracing::info!(%peer_id, total_conn, "should_accept: accepted (reserving spot)"); self.record_pending_location(peer_id, location); @@ -403,30 +453,20 @@ impl ConnectionManager { tracing::info!(%peer, %loc, %was_reserved, "Adding connection to topology"); debug_assert!(self.get_peer_key().expect("should be set") != peer); if was_reserved { - self.pending_reservations.write().remove(&peer); - } - let mut lop = self.location_for_peer.write(); - let previous_location = lop.insert(peer.clone(), loc); - drop(lop); - - if let Some(prev_loc) = previous_location { - tracing::info!( - %peer, - %prev_loc, - %loc, - "add_connection: replacing existing connection for peer" - ); - let mut cbl = self.connections_by_location.write(); - if let Some(prev_list) = cbl.get_mut(&prev_loc) { - if let Some(pos) = prev_list.iter().position(|c| c.location.peer == peer) { - prev_list.swap_remove(pos); - } - if prev_list.is_empty() { - cbl.remove(&prev_loc); + let old = self + .reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + #[cfg(debug_assertions)] + { + tracing::debug!(old, "Decremented reserved connections"); + if old == 0 { + panic!("Underflow of reserved connections"); } } + let _ = old; } - + let mut lop = self.location_for_peer.write(); + lop.insert(peer.clone(), loc); { let mut cbl = self.connections_by_location.write(); cbl.entry(loc).or_default().push(Connection { @@ -434,8 +474,12 @@ impl ConnectionManager { peer: peer.clone(), location: Some(loc), }, + open_at: Instant::now(), }); } + self.open_connections + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + std::mem::drop(lop); } pub fn update_peer_identity(&self, old_peer: &PeerId, new_peer: PeerId) -> bool { @@ -475,6 +519,7 @@ impl ConnectionManager { peer: new_peer, location: Some(loc), }, + open_at: Instant::now(), }); } @@ -492,12 +537,17 @@ impl ConnectionManager { tracing::debug!("no location found for peer, skip pruning"); return None; } else { - let removed = self.pending_reservations.write().remove(peer).is_some(); - if !removed { + let prev = self + .reserved_connections + .load(std::sync::atomic::Ordering::SeqCst); + if prev == 0 { tracing::warn!( %peer, - "prune_connection: no pending reservation to release for in-transit peer" + "prune_connection: no reserved slots to release for in-transit peer" ); + } else { + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); } } return None; @@ -510,38 +560,23 @@ impl ConnectionManager { } } - if !is_alive { - self.pending_reservations.write().remove(peer); + if is_alive { + self.open_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + } else { + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); } Some(loc) } - pub(crate) fn connection_count(&self) -> usize { - // Count only established connections tracked by location buckets. - self.connections_by_location - .read() - .values() - .map(|conns| conns.len()) - .sum() - } - - #[allow(dead_code)] pub(super) fn get_open_connections(&self) -> usize { - self.connection_count() - } - - #[allow(dead_code)] - pub(crate) fn get_reserved_connections(&self) -> usize { - self.pending_reservations.read().len() - } - - pub fn has_connection_or_pending(&self, peer: &PeerId) -> bool { - self.location_for_peer.read().contains_key(peer) - || self.pending_reservations.read().contains_key(peer) + self.open_connections + .load(std::sync::atomic::Ordering::SeqCst) } - pub(crate) fn get_connections_by_location(&self) -> BTreeMap> { + pub(super) fn get_connections_by_location(&self) -> BTreeMap> { self.connections_by_location.read().clone() } From af813d1ebbb8aeb3b8ac9ca101c7e7d03e1834ca Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 21:36:13 -0600 Subject: [PATCH 02/13] test: fix ExpectedInboundTracker helper for transient rename --- crates/core/src/node/network_bridge/handshake.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index c2f3c8b31..ef516a0ae 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -184,7 +184,7 @@ impl ExpectedInboundTracker { #[cfg(test)] fn contains(&self, addr: SocketAddr) -> bool { - self.entries.contains_key(&addr) + self.entries.contains_key(&addr.ip()) } } From aa84dd49d6dbcd73e6fc5af9fc86f2677cc71579 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 22:24:04 -0600 Subject: [PATCH 03/13] feat: expose connection tuning and bump test harness --- Cargo.lock | 5 +---- crates/core/Cargo.toml | 3 +-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f3d68c75..6f2dfb7f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1686,7 +1686,6 @@ dependencies = [ "serde", "serde_json", "serde_with", - "sha2", "sqlx", "statrs", "stretto", @@ -1814,9 +1813,7 @@ dependencies = [ [[package]] name = "freenet-test-network" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5b74dc741d17bc57e55be2a2b2dc0b15bdb4299b77b3f779d371a379611cb13" +version = "0.1.2" dependencies = [ "anyhow", "chrono", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 18695cfc2..98d266021 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -65,7 +65,6 @@ xz2 = { version = "0.1" } reqwest = { version = "0.12", features = ["json"] } rsa = { version = "0.9", features = ["serde", "pem"] } pkcs8 = { version = "0.10", features = ["std", "pem"] } -sha2 = "0.10" # Tracing deps opentelemetry = "0.31" @@ -91,7 +90,7 @@ arbitrary = { features = ["derive"], version = "1" } chrono = { features = ["arbitrary"], workspace = true } freenet-stdlib = { features = ["net", "testing"], workspace = true } freenet-macros = { path = "../freenet-macros" } -freenet-test-network = "0.1.3" +freenet-test-network = { version = "0.1.2", path = "../../../../freenet-test-network" } httptest = "0.16" statrs = "0.18" tempfile = "3" From f81a8f90ac17e955f0465d49f1419325ce8fae3c Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 22:34:56 -0600 Subject: [PATCH 04/13] fix: skip topology add when neighbor map empty --- Cargo.lock | 2 +- crates/core/src/ring/connection_manager.rs | 9 +++ crates/core/src/ring/mod.rs | 73 +++++++++------------- 3 files changed, 39 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f2dfb7f5..b7232167a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1813,7 +1813,7 @@ dependencies = [ [[package]] name = "freenet-test-network" -version = "0.1.2" +version = "0.1.3" dependencies = [ "anyhow", "chrono", diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 8d2744bf0..168ff0c3e 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -576,6 +576,11 @@ impl ConnectionManager { .load(std::sync::atomic::Ordering::SeqCst) } + pub(crate) fn get_reserved_connections(&self) -> usize { + self.reserved_connections + .load(std::sync::atomic::Ordering::SeqCst) + } + pub(super) fn get_connections_by_location(&self) -> BTreeMap> { self.connections_by_location.read().clone() } @@ -633,4 +638,8 @@ impl ConnectionManager { let read = self.location_for_peer.read(); read.keys().cloned().collect::>().into_iter() } + + pub fn has_connection_or_pending(&self, peer: &PeerId) -> bool { + self.location_for_peer.read().contains_key(peer) + } } diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index b23bf8ebe..438602b41 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -3,7 +3,7 @@ //! Mainly maintains a healthy and optimal pool of connections to other peers in the network //! and routes requests to the optimal peers. -use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashSet}; use std::{ sync::{atomic::AtomicU64, Arc, Weak}, time::{Duration, Instant}, @@ -157,7 +157,7 @@ impl Ring { } pub fn open_connections(&self) -> usize { - self.connection_manager.connection_count() + self.connection_manager.get_open_connections() } async fn refresh_router(router: Arc>, register: ER) { @@ -385,6 +385,11 @@ impl Ring { tracing::info!("Initializing connection maintenance task"); let is_gateway = self.is_gateway; tracing::info!(is_gateway, "Connection maintenance task starting"); + #[cfg(not(test))] + const CONNECTION_AGE_THRESOLD: Duration = Duration::from_secs(60 * 5); + #[cfg(test)] + const CONNECTION_AGE_THRESOLD: Duration = Duration::from_secs(5); + #[cfg(not(test))] const CHECK_TICK_DURATION: Duration = Duration::from_secs(60); #[cfg(test)] @@ -455,7 +460,7 @@ impl Ring { error })?; if live_tx.is_none() { - let conns = self.connection_manager.connection_count(); + let conns = self.connection_manager.get_open_connections(); tracing::warn!( "acquire_new returned None - likely no peers to query through (connections: {})", conns @@ -472,50 +477,30 @@ impl Ring { } } - let current_connections = self.connection_manager.connection_count(); + let current_connections = self.connection_manager.get_open_connections(); let pending_connection_targets = pending_conn_adds.len(); - let peers = self.connection_manager.get_connections_by_location(); - let connections_considered: usize = peers.values().map(|c| c.len()).sum(); - - let mut neighbor_locations: BTreeMap<_, Vec<_>> = peers - .iter() - .map(|(loc, conns)| { - let conns: Vec<_> = conns - .iter() - .filter(|conn| !live_tx_tracker.has_live_connection(&conn.location.peer)) - .cloned() - .collect(); - (*loc, conns) - }) - .filter(|(_, conns)| !conns.is_empty()) - .collect(); - - if neighbor_locations.is_empty() && connections_considered > 0 { - tracing::warn!( - current_connections, - connections_considered, - live_tx_peers = live_tx_tracker.len(), - "Neighbor filtering removed all candidates; using all connections" + let neighbor_locations = { + let peers = self.connection_manager.get_connections_by_location(); + tracing::debug!( + "Maintenance task: current connections = {}, checking topology", + current_connections ); - - neighbor_locations = peers + peers .iter() - .map(|(loc, conns)| (*loc, conns.clone())) + .map(|(loc, conns)| { + let conns: Vec<_> = conns + .iter() + .filter(|conn| { + conn.open_at.elapsed() > CONNECTION_AGE_THRESOLD + && !live_tx_tracker.has_live_connection(&conn.location.peer) + }) + .cloned() + .collect(); + (*loc, conns) + }) .filter(|(_, conns)| !conns.is_empty()) - .collect(); - } - - if current_connections > self.connection_manager.max_connections { - // When over capacity, consider all connections for removal regardless of live_tx filter. - neighbor_locations = peers.clone(); - } - - tracing::debug!( - "Maintenance task: current connections = {}, candidates = {}, live_tx_peers = {}", - current_connections, - peers.len(), - live_tx_tracker.len() - ); + .collect() + }; let adjustment = self .connection_manager @@ -604,7 +589,7 @@ impl Ring { live_tx_tracker: &LiveTransactionTracker, op_manager: &Arc, ) -> anyhow::Result> { - let current_connections = self.connection_manager.connection_count(); + let current_connections = self.connection_manager.get_open_connections(); let is_gateway = self.is_gateway; tracing::info!( From bd01f38e0bcef5708aa0e39b0b318471142f19bd Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 21 Nov 2025 23:51:49 -0600 Subject: [PATCH 05/13] feat: add connect forwarding recency filter --- crates/core/src/operations/connect.rs | 204 +++++++++++---------- crates/core/src/ring/connection_manager.rs | 78 ++++++-- 2 files changed, 165 insertions(+), 117 deletions(-) diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 0f55fab66..f4c929248 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -3,7 +3,7 @@ //! The legacy multi-stage connect operation has been removed; this module now powers the node’s //! connection and maintenance paths end-to-end. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::net::SocketAddr; use std::sync::Arc; @@ -86,9 +86,8 @@ impl fmt::Display for ConnectMsg { ), ConnectMsg::Response { sender, target, payload, .. } => write!( f, - "ConnectResponse {{ sender: {sender}, target: {target}, acceptor: {}, transient: {} }}", + "ConnectResponse {{ sender: {sender}, target: {target}, acceptor: {} }}", payload.acceptor, - payload.transient ), ConnectMsg::ObservedAddress { target, address, .. } => { write!(f, "ObservedAddress {{ target: {target}, address: {address} }}") @@ -126,8 +125,6 @@ pub(crate) struct ConnectRequest { pub(crate) struct ConnectResponse { /// The peer that accepted the join request. pub acceptor: PeerKeyLocation, - /// Whether this acceptance is a short-lived transient link. - pub transient: bool, } /// New minimal state machine the joiner tracks. @@ -154,7 +151,6 @@ pub(crate) struct RelayState { pub upstream: PeerKeyLocation, pub request: ConnectRequest, pub forwarded_to: Option, - pub transient_hint: bool, pub observed_sent: bool, pub accepted_locally: bool, } @@ -173,10 +169,8 @@ pub(crate) trait RelayContext { &self, desired_location: Location, visited: &[PeerKeyLocation], + recency: &HashMap, ) -> Option; - - /// Whether the acceptance should be treated as a short-lived transient link. - fn transient_hint(&self, acceptor: &PeerKeyLocation, joiner: &PeerKeyLocation) -> bool; } /// Result of processing a request at a relay. @@ -193,6 +187,7 @@ impl RelayState { &mut self, ctx: &C, observed_remote: &PeerKeyLocation, + recency: &HashMap, ) -> RelayActions { let mut actions = RelayActions::default(); push_unique_peer(&mut self.request.visited, observed_remote.clone()); @@ -215,22 +210,32 @@ impl RelayState { if !self.accepted_locally && ctx.should_accept(&self.request.joiner) { self.accepted_locally = true; let acceptor = ctx.self_location().clone(); - let transient = ctx.transient_hint(&acceptor, &self.request.joiner); - self.transient_hint = transient; + let dist = ring_distance(acceptor.location, self.request.joiner.location); actions.accept_response = Some(ConnectResponse { acceptor: acceptor.clone(), - transient, }); actions.expect_connection_from = Some(self.request.joiner.clone()); + tracing::info!( + acceptor_peer = %acceptor.peer, + joiner_peer = %self.request.joiner.peer, + acceptor_loc = ?acceptor.location, + joiner_loc = ?self.request.joiner.location, + ring_distance = ?dist, + "connect: acceptance issued" + ); } if self.forwarded_to.is_none() && self.request.ttl > 0 { - match ctx.select_next_hop(self.request.desired_location, &self.request.visited) { + match ctx.select_next_hop(self.request.desired_location, &self.request.visited, recency) + { Some(next) => { - tracing::debug!( + let dist = ring_distance(next.location, Some(self.request.desired_location)); + tracing::info!( target = %self.request.desired_location, ttl = self.request.ttl, next_peer = %next.peer, + next_loc = ?next.location, + ring_distance_to_target = ?dist, "connect: forwarding join request to next hop" ); let mut forward_req = self.request.clone(); @@ -242,7 +247,7 @@ impl RelayState { actions.forward = Some((next, forward_snapshot)); } None => { - tracing::debug!( + tracing::info!( target = %self.request.desired_location, ttl = self.request.ttl, visited = ?self.request.visited, @@ -290,27 +295,51 @@ impl RelayContext for RelayEnv<'_> { &self, desired_location: Location, visited: &[PeerKeyLocation], + recency: &HashMap, ) -> Option { let skip = VisitedPeerIds { peers: visited }; let router = self.op_manager.ring.router.read(); - self.op_manager + let candidates = self + .op_manager .ring .connection_manager - .routing(desired_location, None, skip, &router) - } + .routing_candidates(desired_location, None, skip); + + // Prefer least recently forwarded peers. Missing recency wins; otherwise pick the oldest + // recency bucket, then let the router choose among that bucket. This keeps routing bias + // toward the target while avoiding hammering a single neighbor. + let mut best_key: Option> = None; + let mut best: Vec = Vec::new(); + for cand in candidates { + let key = recency.get(&cand.peer).cloned(); + match best_key { + None => { + best_key = Some(key); + best = vec![cand.clone()]; + } + Some(k) => { + if key < k { + best_key = Some(key); + best = vec![cand.clone()]; + } else if key == k { + best.push(cand.clone()); + } + } + } + } - fn transient_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool { - // Courtesy slots still piggyback on regular connections. Flag the first acceptance so the - // joiner can prioritise it, and keep the logic simple until dedicated transient tracking - // is wired in (see transient-connection-budget branch). - self.op_manager.ring.open_connections() == 0 + if best.is_empty() { + None + } else { + router.select_peer(best.iter(), desired_location).cloned() + } } + } #[derive(Debug)] pub struct AcceptedPeer { pub peer: PeerKeyLocation, - pub transient: bool, } #[derive(Debug, Default)] @@ -331,7 +360,6 @@ impl JoinerState { self.last_progress = now; acceptance.new_acceptor = Some(AcceptedPeer { peer: response.acceptor.clone(), - transient: response.transient, }); acceptance.assigned_location = self.accepted.len() == 1; } @@ -355,6 +383,10 @@ pub(crate) struct ConnectOp { pub(crate) gateway: Option>, pub(crate) backoff: Option, pub(crate) desired_location: Option, + /// Tracks when we last forwarded this connect to a peer, to avoid hammering the same + /// neighbors when no acceptors are available. Peers without an entry are treated as + /// immediately eligible. + recency: HashMap, } impl ConnectOp { @@ -379,6 +411,7 @@ impl ConnectOp { gateway: gateway.map(Box::new), backoff, desired_location: Some(desired_location), + recency: HashMap::new(), } } @@ -391,7 +424,6 @@ impl ConnectOp { upstream, request, forwarded_to: None, - transient_hint: false, observed_sent: false, accepted_locally: false, })); @@ -401,6 +433,7 @@ impl ConnectOp { gateway: None, backoff: None, desired_location: None, + recency: HashMap::new(), } } @@ -480,7 +513,15 @@ impl ConnectOp { ) -> Option { match self.state.as_mut() { Some(ConnectState::WaitingForResponses(state)) => { + tracing::info!( + acceptor = %response.acceptor.peer, + acceptor_loc = ?response.acceptor.location, + "connect: joiner received ConnectResponse" + ); let result = state.register_acceptance(response, now); + if let Some(new_acceptor) = &result.new_acceptor { + self.recency.remove(&new_acceptor.peer.peer); + } if result.satisfied { self.state = Some(ConnectState::Completed); } @@ -507,7 +548,6 @@ impl ConnectOp { upstream: upstream.clone(), request: request.clone(), forwarded_to: None, - transient_hint: false, observed_sent: false, accepted_locally: false, }))); @@ -518,7 +558,7 @@ impl ConnectOp { state.upstream = upstream; state.request = request; let upstream_snapshot = state.upstream.clone(); - state.handle_request(ctx, &upstream_snapshot) + state.handle_request(ctx, &upstream_snapshot, &self.recency) } _ => RelayActions::default(), } @@ -603,6 +643,8 @@ impl Operation for ConnectOp { } if let Some((next, request)) = actions.forward { + // Record recency for this forward to avoid hammering the same neighbor. + self.recency.insert(next.peer.clone(), Instant::now()); let forward_msg = ConnectMsg::Request { id: self.id, from: env.self_location().clone(), @@ -666,7 +708,7 @@ impl Operation for ConnectOp { peer: new_acceptor.peer.peer.clone(), tx: self.id, callback, - is_gw: new_acceptor.transient, + is_gw: false, }) .await?; @@ -763,11 +805,19 @@ fn store_operation_state_with_msg(op: &mut ConnectOp, msg: Option) - gateway: op.gateway.clone(), backoff: op.backoff.clone(), desired_location: op.desired_location, + recency: op.recency.clone(), })) }), } } +fn ring_distance(a: Option, b: Option) -> Option { + match (a, b) { + (Some(a), Some(b)) => Some(a.distance(b).as_f64()), + _ => None, + } +} + #[tracing::instrument(fields(peer = %op_manager.ring.connection_manager.pub_key), skip_all)] pub(crate) async fn join_ring_request( backoff: Option, @@ -780,15 +830,6 @@ pub(crate) async fn join_ring_request( OpError::ConnError(ConnectionError::LocationUnknown) })?; - tracing::debug!( - peer = %gateway.peer, - reserved_connections = op_manager - .ring - .connection_manager - .get_reserved_connections(), - "join_ring_request: evaluating gateway connection attempt" - ); - if !op_manager .ring .connection_manager @@ -873,71 +914,56 @@ pub(crate) async fn initial_join_procedure( gateways.len() ); - let mut in_flight_gateways = HashSet::new(); - loop { let open_conns = op_manager.ring.open_connections(); let unconnected_gateways: Vec<_> = op_manager.ring.is_not_connected(gateways.iter()).collect(); - let available_gateways: Vec<_> = unconnected_gateways - .into_iter() - .filter(|gateway| !in_flight_gateways.contains(&gateway.peer)) - .collect(); tracing::debug!( - open_connections = open_conns, - inflight_gateway_dials = in_flight_gateways.len(), - available_gateways = available_gateways.len(), - "Connection status before join attempt" + "Connection status: open_connections = {}, unconnected_gateways = {}", + open_conns, + unconnected_gateways.len() ); - let available_count = available_gateways.len(); + let unconnected_count = unconnected_gateways.len(); - if open_conns < BOOTSTRAP_THRESHOLD && available_count > 0 { + if open_conns < BOOTSTRAP_THRESHOLD && unconnected_count > 0 { tracing::info!( "Below bootstrap threshold ({} < {}), attempting to connect to {} gateways", open_conns, BOOTSTRAP_THRESHOLD, - number_of_parallel_connections.min(available_count) + number_of_parallel_connections.min(unconnected_count) ); - let mut select_all = FuturesUnordered::new(); - for gateway in available_gateways + let select_all = FuturesUnordered::new(); + for gateway in unconnected_gateways .into_iter() .shuffle() .take(number_of_parallel_connections) { tracing::info!(%gateway, "Attempting connection to gateway"); - in_flight_gateways.insert(gateway.peer.clone()); let op_manager = op_manager.clone(); - let gateway_clone = gateway.clone(); select_all.push(async move { - ( - join_ring_request(None, &gateway_clone, &op_manager).await, - gateway_clone, - ) + (join_ring_request(None, gateway, &op_manager).await, gateway) }); } - while let Some((res, gateway)) = select_all.next().await { - if let Err(error) = res { - if !matches!( - error, - OpError::ConnError(crate::node::ConnectionError::UnwantedConnection) - ) { - tracing::error!( - %gateway, - %error, - "Failed while attempting connection to gateway" - ); + select_all + .for_each(|(res, gateway)| async move { + if let Err(error) = res { + if !matches!( + error, + OpError::ConnError( + crate::node::ConnectionError::UnwantedConnection + ) + ) { + tracing::error!( + %gateway, + %error, + "Failed while attempting connection to gateway" + ); + } } - } - in_flight_gateways.remove(&gateway.peer); - } - } else if open_conns < BOOTSTRAP_THRESHOLD && available_count == 0 { - tracing::debug!( - open_connections = open_conns, - inflight = in_flight_gateways.len(), - "Below threshold but all gateways are already connected or in-flight" - ); + }) + .await; } else if open_conns >= BOOTSTRAP_THRESHOLD { tracing::trace!( "Have {} connections (>= threshold of {}), not attempting gateway connections", @@ -984,7 +1010,6 @@ mod tests { self_loc: PeerKeyLocation, accept: bool, next_hop: Option, - transient: bool, } impl TestRelayContext { @@ -993,7 +1018,6 @@ mod tests { self_loc, accept: true, next_hop: None, - transient: false, } } @@ -1006,11 +1030,6 @@ mod tests { self.next_hop = hop; self } - - fn transient(mut self, transient: bool) -> Self { - self.transient = transient; - self - } } impl RelayContext for TestRelayContext { @@ -1026,13 +1045,10 @@ mod tests { &self, _desired_location: Location, _visited: &[PeerKeyLocation], + _recency: &HashMap, ) -> Option { self.next_hop.clone() } - - fn transient_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool { - self.transient - } } fn make_peer(port: u16) -> PeerKeyLocation { @@ -1058,17 +1074,15 @@ mod tests { observed_addr: Some(joiner.peer.addr), }, forwarded_to: None, - transient_hint: false, observed_sent: false, accepted_locally: false, }; - let ctx = TestRelayContext::new(self_loc.clone()).transient(true); + let ctx = TestRelayContext::new(self_loc.clone()); let actions = state.handle_request(&ctx, &joiner); let response = actions.accept_response.expect("expected acceptance"); assert_eq!(response.acceptor.peer, self_loc.peer); - assert!(response.transient); assert_eq!(actions.expect_connection_from.unwrap().peer, joiner.peer); assert!(actions.forward.is_none()); } @@ -1088,7 +1102,6 @@ mod tests { observed_addr: Some(joiner.peer.addr), }, forwarded_to: None, - transient_hint: false, observed_sent: false, accepted_locally: false, }; @@ -1123,7 +1136,6 @@ mod tests { observed_addr: Some(observed_addr), }, forwarded_to: None, - transient_hint: false, observed_sent: false, accepted_locally: false, }; @@ -1151,13 +1163,11 @@ mod tests { let response = ConnectResponse { acceptor: acceptor.clone(), - transient: false, }; let result = state.register_acceptance(&response, Instant::now()); assert!(result.satisfied); let new = result.new_acceptor.expect("expected new acceptor"); assert_eq!(new.peer.peer, acceptor.peer); - assert!(!new.transient); } #[test] diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 168ff0c3e..043a93fa8 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -597,29 +597,67 @@ impl ConnectionManager { skip_list: impl Contains, router: &Router, ) -> Option { + let candidates = self.routing_candidates(target, requesting, skip_list); + + if candidates.is_empty() { + return None; + } + + router.select_peer(candidates.iter(), target).cloned() + } + + /// Gather routing candidates after applying skip/transient filters. + pub fn routing_candidates( + &self, + target: Location, + requesting: Option<&PeerId>, + skip_list: impl Contains, + ) -> Vec { let connections = self.connections_by_location.read(); - tracing::debug!( - total_locations = connections.len(), - self_peer = self - .get_peer_key() - .as_ref() - .map(|id| id.to_string()) - .unwrap_or_else(|| "unknown".into()), - "routing: considering connections" - ); - let peers = connections.values().filter_map(|conns| { - let conn = conns.choose(&mut rand::rng())?; - if self.is_transient(&conn.location.peer) { - return None; - } - if let Some(requester) = requesting { - if requester == &conn.location.peer { + let candidates: Vec = connections + .values() + .filter_map(|conns| { + let conn = conns.choose(&mut rand::rng())?; + if self.is_transient(&conn.location.peer) { return None; } - } - (!skip_list.has_element(conn.location.peer.clone())).then_some(&conn.location) - }); - router.select_peer(peers, target).cloned() + if let Some(requester) = requesting { + if requester == &conn.location.peer { + return None; + } + } + (!skip_list.has_element(conn.location.peer.clone())) + .then_some(conn.location.clone()) + }) + .collect(); + + if candidates.is_empty() { + tracing::info!( + total_locations = connections.len(), + candidates = 0, + target = %target, + self_peer = self + .get_peer_key() + .as_ref() + .map(|id| id.to_string()) + .unwrap_or_else(|| "unknown".into()), + "routing: no non-transient candidates" + ); + } else { + tracing::info!( + total_locations = connections.len(), + candidates = candidates.len(), + target = %target, + self_peer = self + .get_peer_key() + .as_ref() + .map(|id| id.to_string()) + .unwrap_or_else(|| "unknown".into()), + "routing: selecting next hop" + ); + } + + candidates } pub fn num_connections(&self) -> usize { From 0fc90ffa5bba25d1a7418f9459a00e906bdebc26 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 22 Nov 2025 10:09:15 -0600 Subject: [PATCH 06/13] chore: use published freenet-test-network --- Cargo.lock | 2 ++ crates/core/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index b7232167a..26753b4cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1814,6 +1814,8 @@ dependencies = [ [[package]] name = "freenet-test-network" version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5b74dc741d17bc57e55be2a2b2dc0b15bdb4299b77b3f779d371a379611cb13" dependencies = [ "anyhow", "chrono", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 98d266021..9e5fbd64b 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -90,7 +90,7 @@ arbitrary = { features = ["derive"], version = "1" } chrono = { features = ["arbitrary"], workspace = true } freenet-stdlib = { features = ["net", "testing"], workspace = true } freenet-macros = { path = "../freenet-macros" } -freenet-test-network = { version = "0.1.2", path = "../../../../freenet-test-network" } +freenet-test-network = "0.1.3" httptest = "0.16" statrs = "0.18" tempfile = "3" From 8738cd6a51d3dcd830e7e85ce56eca350b93e77c Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 22 Nov 2025 10:27:39 -0600 Subject: [PATCH 07/13] test: avoid static test network --- crates/core/src/operations/connect.rs | 51 ++++++++++++------- crates/core/src/ring/connection_manager.rs | 1 + crates/core/tests/test_network_integration.rs | 2 +- 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index f4c929248..456ebd54a 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -77,20 +77,30 @@ impl InnerMessage for ConnectMsg { impl fmt::Display for ConnectMsg { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - ConnectMsg::Request { target, payload, .. } => write!( + ConnectMsg::Request { + target, payload, .. + } => write!( f, "ConnectRequest {{ target: {target}, desired: {}, ttl: {}, joiner: {} }}", - payload.desired_location, - payload.ttl, - payload.joiner + payload.desired_location, payload.ttl, payload.joiner ), - ConnectMsg::Response { sender, target, payload, .. } => write!( + ConnectMsg::Response { + sender, + target, + payload, + .. + } => write!( f, "ConnectResponse {{ sender: {sender}, target: {target}, acceptor: {} }}", payload.acceptor, ), - ConnectMsg::ObservedAddress { target, address, .. } => { - write!(f, "ObservedAddress {{ target: {target}, address: {address} }}") + ConnectMsg::ObservedAddress { + target, address, .. + } => { + write!( + f, + "ObservedAddress {{ target: {target}, address: {address} }}" + ) } } } @@ -226,8 +236,11 @@ impl RelayState { } if self.forwarded_to.is_none() && self.request.ttl > 0 { - match ctx.select_next_hop(self.request.desired_location, &self.request.visited, recency) - { + match ctx.select_next_hop( + self.request.desired_location, + &self.request.visited, + recency, + ) { Some(next) => { let dist = ring_distance(next.location, Some(self.request.desired_location)); tracing::info!( @@ -299,11 +312,11 @@ impl RelayContext for RelayEnv<'_> { ) -> Option { let skip = VisitedPeerIds { peers: visited }; let router = self.op_manager.ring.router.read(); - let candidates = self - .op_manager - .ring - .connection_manager - .routing_candidates(desired_location, None, skip); + let candidates = self.op_manager.ring.connection_manager.routing_candidates( + desired_location, + None, + skip, + ); // Prefer least recently forwarded peers. Missing recency wins; otherwise pick the oldest // recency bucket, then let the router choose among that bucket. This keeps routing bias @@ -334,7 +347,6 @@ impl RelayContext for RelayEnv<'_> { router.select_peer(best.iter(), desired_location).cloned() } } - } #[derive(Debug)] @@ -1079,7 +1091,8 @@ mod tests { }; let ctx = TestRelayContext::new(self_loc.clone()); - let actions = state.handle_request(&ctx, &joiner); + let recency = HashMap::new(); + let actions = state.handle_request(&ctx, &joiner, &recency); let response = actions.accept_response.expect("expected acceptance"); assert_eq!(response.acceptor.peer, self_loc.peer); @@ -1109,7 +1122,8 @@ mod tests { let ctx = TestRelayContext::new(self_loc) .accept(false) .next_hop(Some(next_hop.clone())); - let actions = state.handle_request(&ctx, &joiner); + let recency = HashMap::new(); + let actions = state.handle_request(&ctx, &joiner, &recency); assert!(actions.accept_response.is_none()); let (forward_to, request) = actions.forward.expect("expected forward"); @@ -1141,7 +1155,8 @@ mod tests { }; let ctx = TestRelayContext::new(self_loc); - let actions = state.handle_request(&ctx, &joiner); + let recency = HashMap::new(); + let actions = state.handle_request(&ctx, &joiner, &recency); let (target, addr) = actions .observed_address diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 043a93fa8..3a3eaacdd 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -576,6 +576,7 @@ impl ConnectionManager { .load(std::sync::atomic::Ordering::SeqCst) } + #[allow(dead_code)] pub(crate) fn get_reserved_connections(&self) -> usize { self.reserved_connections .load(std::sync::atomic::Ordering::SeqCst) diff --git a/crates/core/tests/test_network_integration.rs b/crates/core/tests/test_network_integration.rs index 3373d5cd0..e130d93fb 100644 --- a/crates/core/tests/test_network_integration.rs +++ b/crates/core/tests/test_network_integration.rs @@ -7,7 +7,7 @@ use freenet_test_network::TestNetwork; use testresult::TestResult; use tokio_tungstenite::connect_async; -// Helper to get or create network +// Build a fresh network for each test to avoid static Sync requirements async fn get_network() -> TestNetwork { TestNetwork::builder() .gateways(1) From 0bca7a9275c929bdbf5d418fc51a15b55ccc2a72 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 22 Nov 2025 10:38:23 -0600 Subject: [PATCH 08/13] fix: overwrite expected inbound for same peer --- crates/core/src/node/network_bridge/handshake.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index ef516a0ae..99f527f77 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -138,14 +138,14 @@ impl ExpectedInboundTracker { tx = ?transaction, "ExpectInbound: registering expectation" ); - self.entries - .entry(peer.addr.ip()) - .or_default() - .push(ExpectedInbound { - peer, - transaction, - transient, - }); + let list = self.entries.entry(peer.addr.ip()).or_default(); + // Replace any existing expectation for the same peer/port to ensure the newest registration wins. + list.retain(|entry| entry.peer.addr.port() != peer.addr.port()); + list.push(ExpectedInbound { + peer, + transaction, + transient, + }); } fn drop_peer(&mut self, peer: &PeerId) { From a543eb4862039fcd5b15ba5d73a1729f237f9955 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 22 Nov 2025 10:52:20 -0600 Subject: [PATCH 09/13] test: give mesh connectivity more time --- crates/core/tests/connectivity.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/tests/connectivity.rs b/crates/core/tests/connectivity.rs index f64e7b744..53c704097 100644 --- a/crates/core/tests/connectivity.rs +++ b/crates/core/tests/connectivity.rs @@ -309,7 +309,7 @@ async fn test_three_node_network_connectivity(ctx: &mut TestContext) -> TestResu let mut client2 = WebApi::start(stream2); // Retry loop to wait for full mesh connectivity - // CI can be slower; give more attempts before declaring failure. + // Allow extra time in CI for peers to form a full mesh before running contract ops. const MAX_RETRIES: usize = 60; const RETRY_DELAY: Duration = Duration::from_secs(1); let mut mesh_established = false; From 34aeb31de78621863bed2c7f3136619a2ff44008 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 22 Nov 2025 11:17:40 -0600 Subject: [PATCH 10/13] test: restore mesh retry budget --- crates/core/tests/connectivity.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/core/tests/connectivity.rs b/crates/core/tests/connectivity.rs index 53c704097..bbd99a71f 100644 --- a/crates/core/tests/connectivity.rs +++ b/crates/core/tests/connectivity.rs @@ -309,8 +309,7 @@ async fn test_three_node_network_connectivity(ctx: &mut TestContext) -> TestResu let mut client2 = WebApi::start(stream2); // Retry loop to wait for full mesh connectivity - // Allow extra time in CI for peers to form a full mesh before running contract ops. - const MAX_RETRIES: usize = 60; + const MAX_RETRIES: usize = 30; const RETRY_DELAY: Duration = Duration::from_secs(1); let mut mesh_established = false; let mut last_snapshot = (String::new(), String::new(), String::new()); From a50b58d7ec6611ec502ba6cbf93dec9f0bf186a9 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 22 Nov 2025 12:21:19 -0600 Subject: [PATCH 11/13] fix: promote transient gateway links --- .../core/src/node/network_bridge/p2p_protoc.rs | 7 ++++++- crates/core/src/ring/connection_manager.rs | 18 ------------------ 2 files changed, 6 insertions(+), 19 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 391560f66..275f6f2e9 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1711,10 +1711,12 @@ impl P2pConnManager { tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] SKIP INSERT: OutboundConnectionSuccessful - connection already exists in HashMap"); } + let promote_to_ring = !is_transient || connection_manager.is_gateway(); + if newly_inserted { tracing::info!(remote = %peer_id, is_transient, "handle_successful_connection: inserted new connection entry"); let pending_loc = connection_manager.prune_in_transit_connection(&peer_id); - if !is_transient { + if promote_to_ring { let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr)); tracing::info!(remote = %peer_id, %loc, "handle_successful_connection: promoting connection into ring"); self.bridge @@ -1722,6 +1724,9 @@ impl P2pConnManager { .ring .add_connection(loc, peer_id.clone(), false) .await; + if is_transient { + connection_manager.drop_transient(&peer_id); + } } else { // Update location now that we know it; budget was reserved before any work. connection_manager.try_register_transient(peer_id.clone(), pending_loc); diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 3a3eaacdd..1706ca2ba 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -227,24 +227,6 @@ impl ConnectionManager { return true; } - const GATEWAY_DIRECT_ACCEPT_LIMIT: usize = 2; - if self.is_gateway { - let direct_total = open + reserved_before; - if direct_total >= GATEWAY_DIRECT_ACCEPT_LIMIT { - tracing::info!( - %peer_id, - open, - reserved_before, - limit = GATEWAY_DIRECT_ACCEPT_LIMIT, - "Gateway reached direct-accept limit; forwarding join request instead" - ); - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - tracing::info!(%peer_id, "should_accept: gateway direct-accept limit hit, forwarding instead"); - return false; - } - } - if self.location_for_peer.read().get(peer_id).is_some() { // We've already accepted this peer (pending or active); treat as a no-op acceptance. tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); From dfdc7d7057767cdc5a18db3582bd2386fd1ab933 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 22 Nov 2025 20:30:42 -0600 Subject: [PATCH 12/13] fix: transient promotion admission checks and three-hop hardening --- .../src/node/network_bridge/p2p_protoc.rs | 119 +++++++++++++----- crates/core/src/ring/connection_manager.rs | 8 +- crates/core/tests/connectivity.rs | 5 +- 3 files changed, 97 insertions(+), 35 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 275f6f2e9..15d1b1534 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1426,7 +1426,7 @@ impl P2pConnManager { connection, transient, } => { - tracing::info!(provided = ?peer, transient = transient, tx = ?transaction, "InboundConnection event"); + tracing::info!(provided = ?peer, transient, tx = ?transaction, "InboundConnection event"); let _conn_manager = &self.bridge.op_manager.ring.connection_manager; let remote_addr = connection.remote_addr(); @@ -1434,7 +1434,7 @@ impl P2pConnManager { if blocked_addrs.contains(&remote_addr) { tracing::info!( remote = %remote_addr, - transient = transient, + transient, transaction = ?transaction, "Inbound connection blocked by local policy" ); @@ -1446,7 +1446,7 @@ impl P2pConnManager { let peer_id = peer.unwrap_or_else(|| { tracing::info!( remote = %remote_addr, - transient = transient, + transient, transaction = ?transaction, "Inbound connection arrived without matching expectation; accepting provisionally" ); @@ -1464,7 +1464,7 @@ impl P2pConnManager { tracing::info!( remote = %peer_id.addr, - transient = transient, + transient, transaction = ?transaction, "Inbound connection established" ); @@ -1484,7 +1484,7 @@ impl P2pConnManager { } => { tracing::info!( remote = %peer.addr, - transient = transient, + transient, transaction = %transaction, "Outbound connection established" ); @@ -1499,7 +1499,7 @@ impl P2pConnManager { } => { tracing::info!( remote = %peer.addr, - transient = transient, + transient, transaction = %transaction, ?error, "Outbound connection failed" @@ -1718,6 +1718,33 @@ impl P2pConnManager { let pending_loc = connection_manager.prune_in_transit_connection(&peer_id); if promote_to_ring { let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr)); + tracing::info!( + remote = %peer_id, + %loc, + pending_loc_known = pending_loc.is_some(), + "handle_successful_connection: evaluating promotion to ring" + ); + // Re-apply admission logic on promotion to avoid bypassing capacity/heuristic checks. + let should_accept = connection_manager.should_accept(loc, &peer_id); + if !should_accept { + tracing::warn!( + %peer_id, + %loc, + "handle_successful_connection: promotion rejected by admission logic" + ); + return Ok(()); + } + let current = connection_manager.num_connections(); + if current >= connection_manager.max_connections { + tracing::warn!( + %peer_id, + current_connections = current, + max_connections = connection_manager.max_connections, + %loc, + "handle_successful_connection: rejecting new connection to enforce cap" + ); + return Ok(()); + } tracing::info!(remote = %peer_id, %loc, "handle_successful_connection: promoting connection into ring"); self.bridge .op_manager @@ -1728,32 +1755,62 @@ impl P2pConnManager { connection_manager.drop_transient(&peer_id); } } else { - // Update location now that we know it; budget was reserved before any work. - connection_manager.try_register_transient(peer_id.clone(), pending_loc); - tracing::info!( - peer = %peer_id, - "Registered transient connection (not added to ring topology)" - ); - let ttl = connection_manager.transient_ttl(); - let drop_tx = self.bridge.ev_listener_tx.clone(); - let cm = connection_manager.clone(); - let peer = peer_id.clone(); - tokio::spawn(async move { - sleep(ttl).await; - if cm.drop_transient(&peer).is_some() { - tracing::info!(%peer, "Transient connection expired; dropping"); - if let Err(err) = drop_tx - .send(Right(NodeEvent::DropConnection(peer.clone()))) - .await - { - tracing::warn!( - %peer, - ?err, - "Failed to dispatch DropConnection for expired transient" - ); - } + let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr)); + // Evaluate whether this transient should be promoted; gateways need routable peers. + let should_accept = connection_manager.should_accept(loc, &peer_id); + if should_accept { + connection_manager.drop_transient(&peer_id); + let current = connection_manager.num_connections(); + if current >= connection_manager.max_connections { + tracing::warn!( + %peer_id, + current_connections = current, + max_connections = connection_manager.max_connections, + %loc, + "handle_successful_connection: rejecting transient promotion to enforce cap" + ); + return Ok(()); } - }); + tracing::info!( + remote = %peer_id, + %loc, + pending_loc_known = pending_loc.is_some(), + "handle_successful_connection: promoting transient into ring" + ); + self.bridge + .op_manager + .ring + .add_connection(loc, peer_id.clone(), true) + .await; + } else { + // Keep the connection as transient; budget was reserved before any work. + connection_manager.try_register_transient(peer_id.clone(), pending_loc); + tracing::info!( + peer = %peer_id, + pending_loc_known = pending_loc.is_some(), + "Registered transient connection (not added to ring topology)" + ); + let ttl = connection_manager.transient_ttl(); + let drop_tx = self.bridge.ev_listener_tx.clone(); + let cm = connection_manager.clone(); + let peer = peer_id.clone(); + tokio::spawn(async move { + sleep(ttl).await; + if cm.drop_transient(&peer).is_some() { + tracing::info!(%peer, "Transient connection expired; dropping"); + if let Err(err) = drop_tx + .send(Right(NodeEvent::DropConnection(peer.clone()))) + .await + { + tracing::warn!( + %peer, + ?err, + "Failed to dispatch DropConnection for expired transient" + ); + } + } + }); + } } } else if is_transient { // We reserved budget earlier, but didn't take ownership of the connection. diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 1706ca2ba..c63638b2a 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -50,11 +50,15 @@ impl ConnectionManager { Ring::DEFAULT_MIN_CONNECTIONS }; - let max_connections = if let Some(v) = config.max_number_conn { + let mut max_connections = if let Some(v) = config.max_number_conn { v } else { Ring::DEFAULT_MAX_CONNECTIONS }; + // Gateways benefit from a wider neighbor set for forwarding; default to a higher cap when unset. + if config.is_gateway && config.max_number_conn.is_none() { + max_connections = 20; + } let max_upstream_bandwidth = if let Some(v) = config.max_upstream_bandwidth { v @@ -465,7 +469,7 @@ impl ConnectionManager { } pub fn update_peer_identity(&self, old_peer: &PeerId, new_peer: PeerId) -> bool { - if old_peer == &new_peer { + if old_peer.addr == new_peer.addr && old_peer.pub_key == new_peer.pub_key { tracing::debug!(%old_peer, "update_peer_identity: identical peers; skipping"); return false; } diff --git a/crates/core/tests/connectivity.rs b/crates/core/tests/connectivity.rs index bbd99a71f..9d06ec191 100644 --- a/crates/core/tests/connectivity.rs +++ b/crates/core/tests/connectivity.rs @@ -309,8 +309,9 @@ async fn test_three_node_network_connectivity(ctx: &mut TestContext) -> TestResu let mut client2 = WebApi::start(stream2); // Retry loop to wait for full mesh connectivity - const MAX_RETRIES: usize = 30; - const RETRY_DELAY: Duration = Duration::from_secs(1); + // CI can be slower; give more attempts and longer waits before declaring failure. + const MAX_RETRIES: usize = 90; + const RETRY_DELAY: Duration = Duration::from_secs(2); let mut mesh_established = false; let mut last_snapshot = (String::new(), String::new(), String::new()); From 06562edbccaec10278aaadc8d489bd7b31c45981 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 23 Nov 2025 20:24:38 -0600 Subject: [PATCH 13/13] fix: align ring structs with main; add routing candidates helper --- Cargo.lock | 1 + crates/core/Cargo.toml | 1 + crates/core/src/ring/connection_manager.rs | 251 +++++++++------------ crates/core/src/ring/mod.rs | 73 +++--- 4 files changed, 154 insertions(+), 172 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26753b4cf..5f3d68c75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1686,6 +1686,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "sha2", "sqlx", "statrs", "stretto", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 9e5fbd64b..18695cfc2 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -65,6 +65,7 @@ xz2 = { version = "0.1" } reqwest = { version = "0.12", features = ["json"] } rsa = { version = "0.9", features = ["serde", "pem"] } pkcs8 = { version = "0.10", features = ["std", "pem"] } +sha2 = "0.10" # Tracing deps opentelemetry = "0.31" diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index c63638b2a..f910becf2 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -22,8 +22,7 @@ pub(crate) struct TransientEntry { #[derive(Clone)] pub(crate) struct ConnectionManager { - open_connections: Arc, - reserved_connections: Arc, + pending_reservations: Arc>>, pub(super) location_for_peer: Arc>>, pub(super) topology_manager: Arc>, connections_by_location: Arc>>>, @@ -50,15 +49,11 @@ impl ConnectionManager { Ring::DEFAULT_MIN_CONNECTIONS }; - let mut max_connections = if let Some(v) = config.max_number_conn { + let max_connections = if let Some(v) = config.max_number_conn { v } else { Ring::DEFAULT_MAX_CONNECTIONS }; - // Gateways benefit from a wider neighbor set for forwarding; default to a higher cap when unset. - if config.is_gateway && config.max_number_conn.is_none() { - max_connections = 20; - } let max_upstream_bandwidth = if let Some(v) = config.max_upstream_bandwidth { v @@ -128,8 +123,7 @@ impl ConnectionManager { Self { connections_by_location: Arc::new(RwLock::new(BTreeMap::new())), location_for_peer: Arc::new(RwLock::new(BTreeMap::new())), - open_connections: Arc::new(AtomicUsize::new(0)), - reserved_connections: Arc::new(AtomicUsize::new(0)), + pending_reservations: Arc::new(RwLock::new(BTreeMap::new())), topology_manager, own_location: own_location.into(), peer_key: Arc::new(Mutex::new(peer_id)), @@ -152,12 +146,8 @@ impl ConnectionManager { /// Will panic if the node checking for this condition has no location assigned. pub fn should_accept(&self, location: Location, peer_id: &PeerId) -> bool { tracing::info!("Checking if should accept connection"); - let open = self - .open_connections - .load(std::sync::atomic::Ordering::SeqCst); - let reserved_before = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst); + let open = self.connection_count(); + let reserved_before = self.pending_reservations.read().len(); tracing::info!( %peer_id, @@ -179,34 +169,16 @@ impl ConnectionManager { ); } - let reserved_before = loop { - let current = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst); - if current == usize::MAX { - tracing::error!( - %peer_id, - "reserved connection counter overflowed; rejecting new connection" - ); - return false; - } - match self.reserved_connections.compare_exchange( - current, - current + 1, - std::sync::atomic::Ordering::SeqCst, - std::sync::atomic::Ordering::SeqCst, - ) { - Ok(_) => break current, - Err(actual) => { - tracing::debug!( - %peer_id, - expected = current, - actual, - "reserved connection counter changed concurrently; retrying" - ); - } - } - }; + if self.location_for_peer.read().get(peer_id).is_some() { + // We've already accepted this peer (pending or active); treat as a no-op acceptance. + tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); + return true; + } + + { + let mut pending = self.pending_reservations.write(); + pending.insert(peer_id.clone(), location); + } let total_conn = match reserved_before .checked_add(1) @@ -220,8 +192,7 @@ impl ConnectionManager { open, "connection counters would overflow; rejecting connection" ); - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.pending_reservations.write().remove(peer_id); return false; } }; @@ -231,12 +202,6 @@ impl ConnectionManager { return true; } - if self.location_for_peer.read().get(peer_id).is_some() { - // We've already accepted this peer (pending or active); treat as a no-op acceptance. - tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); - return true; - } - let accepted = if total_conn < self.min_connections { tracing::info!(%peer_id, total_conn, "should_accept: accepted (below min connections)"); true @@ -263,14 +228,13 @@ impl ConnectionManager { accepted, total_conn, open_connections = open, - reserved_connections = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst), + reserved_connections = self.pending_reservations.read().len(), + max_connections = self.max_connections, + min_connections = self.min_connections, "should_accept: final decision" ); if !accepted { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.pending_reservations.write().remove(peer_id); } else { tracing::info!(%peer_id, total_conn, "should_accept: accepted (reserving spot)"); self.record_pending_location(peer_id, location); @@ -439,20 +403,30 @@ impl ConnectionManager { tracing::info!(%peer, %loc, %was_reserved, "Adding connection to topology"); debug_assert!(self.get_peer_key().expect("should be set") != peer); if was_reserved { - let old = self - .reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - #[cfg(debug_assertions)] - { - tracing::debug!(old, "Decremented reserved connections"); - if old == 0 { - panic!("Underflow of reserved connections"); + self.pending_reservations.write().remove(&peer); + } + let mut lop = self.location_for_peer.write(); + let previous_location = lop.insert(peer.clone(), loc); + drop(lop); + + if let Some(prev_loc) = previous_location { + tracing::info!( + %peer, + %prev_loc, + %loc, + "add_connection: replacing existing connection for peer" + ); + let mut cbl = self.connections_by_location.write(); + if let Some(prev_list) = cbl.get_mut(&prev_loc) { + if let Some(pos) = prev_list.iter().position(|c| c.location.peer == peer) { + prev_list.swap_remove(pos); + } + if prev_list.is_empty() { + cbl.remove(&prev_loc); } } - let _ = old; } - let mut lop = self.location_for_peer.write(); - lop.insert(peer.clone(), loc); + { let mut cbl = self.connections_by_location.write(); cbl.entry(loc).or_default().push(Connection { @@ -460,16 +434,12 @@ impl ConnectionManager { peer: peer.clone(), location: Some(loc), }, - open_at: Instant::now(), }); } - self.open_connections - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - std::mem::drop(lop); } pub fn update_peer_identity(&self, old_peer: &PeerId, new_peer: PeerId) -> bool { - if old_peer.addr == new_peer.addr && old_peer.pub_key == new_peer.pub_key { + if old_peer == &new_peer { tracing::debug!(%old_peer, "update_peer_identity: identical peers; skipping"); return false; } @@ -505,7 +475,6 @@ impl ConnectionManager { peer: new_peer, location: Some(loc), }, - open_at: Instant::now(), }); } @@ -523,17 +492,12 @@ impl ConnectionManager { tracing::debug!("no location found for peer, skip pruning"); return None; } else { - let prev = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst); - if prev == 0 { + let removed = self.pending_reservations.write().remove(peer).is_some(); + if !removed { tracing::warn!( %peer, - "prune_connection: no reserved slots to release for in-transit peer" + "prune_connection: no pending reservation to release for in-transit peer" ); - } else { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); } } return None; @@ -546,29 +510,38 @@ impl ConnectionManager { } } - if is_alive { - self.open_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - } else { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + if !is_alive { + self.pending_reservations.write().remove(peer); } Some(loc) } + pub(crate) fn connection_count(&self) -> usize { + // Count only established connections tracked by location buckets. + self.connections_by_location + .read() + .values() + .map(|conns| conns.len()) + .sum() + } + + #[allow(dead_code)] pub(super) fn get_open_connections(&self) -> usize { - self.open_connections - .load(std::sync::atomic::Ordering::SeqCst) + self.connection_count() } #[allow(dead_code)] pub(crate) fn get_reserved_connections(&self) -> usize { - self.reserved_connections - .load(std::sync::atomic::Ordering::SeqCst) + self.pending_reservations.read().len() } - pub(super) fn get_connections_by_location(&self) -> BTreeMap> { + pub fn has_connection_or_pending(&self, peer: &PeerId) -> bool { + self.location_for_peer.read().contains_key(peer) + || self.pending_reservations.read().contains_key(peer) + } + + pub(crate) fn get_connections_by_location(&self) -> BTreeMap> { self.connections_by_location.read().clone() } @@ -584,16 +557,31 @@ impl ConnectionManager { skip_list: impl Contains, router: &Router, ) -> Option { - let candidates = self.routing_candidates(target, requesting, skip_list); - - if candidates.is_empty() { - return None; - } - - router.select_peer(candidates.iter(), target).cloned() + let connections = self.connections_by_location.read(); + tracing::debug!( + total_locations = connections.len(), + self_peer = self + .get_peer_key() + .as_ref() + .map(|id| id.to_string()) + .unwrap_or_else(|| "unknown".into()), + "routing: considering connections" + ); + let peers = connections.values().filter_map(|conns| { + let conn = conns.choose(&mut rand::rng())?; + if self.is_transient(&conn.location.peer) { + return None; + } + if let Some(requester) = requesting { + if requester == &conn.location.peer { + return None; + } + } + (!skip_list.has_element(conn.location.peer.clone())).then_some(&conn.location) + }); + router.select_peer(peers, target).cloned() } - /// Gather routing candidates after applying skip/transient filters. pub fn routing_candidates( &self, target: Location, @@ -601,49 +589,30 @@ impl ConnectionManager { skip_list: impl Contains, ) -> Vec { let connections = self.connections_by_location.read(); - let candidates: Vec = connections + let mut candidates: Vec = connections .values() - .filter_map(|conns| { - let conn = conns.choose(&mut rand::rng())?; - if self.is_transient(&conn.location.peer) { - return None; - } - if let Some(requester) = requesting { - if requester == &conn.location.peer { - return None; - } - } - (!skip_list.has_element(conn.location.peer.clone())) - .then_some(conn.location.clone()) + .flat_map(|conns| conns.iter()) + .filter(|conn| { + !self.is_transient(&conn.location.peer) + && (requesting != Some(&conn.location.peer)) + && !skip_list.has_element(conn.location.peer.clone()) }) + .map(|conn| conn.location.clone()) .collect(); - if candidates.is_empty() { - tracing::info!( - total_locations = connections.len(), - candidates = 0, - target = %target, - self_peer = self - .get_peer_key() - .as_ref() - .map(|id| id.to_string()) - .unwrap_or_else(|| "unknown".into()), - "routing: no non-transient candidates" - ); - } else { - tracing::info!( - total_locations = connections.len(), - candidates = candidates.len(), - target = %target, - self_peer = self - .get_peer_key() - .as_ref() - .map(|id| id.to_string()) - .unwrap_or_else(|| "unknown".into()), - "routing: selecting next hop" - ); - } - + candidates.sort_by(|a, b| { + let da = a + .location + .unwrap_or_else(|| Location::from_address(&a.peer.addr)) + .distance(target) + .as_f64(); + let db = b + .location + .unwrap_or_else(|| Location::from_address(&b.peer.addr)) + .distance(target) + .as_f64(); + da.partial_cmp(&db).unwrap_or(std::cmp::Ordering::Equal) + }); candidates } @@ -663,8 +632,4 @@ impl ConnectionManager { let read = self.location_for_peer.read(); read.keys().cloned().collect::>().into_iter() } - - pub fn has_connection_or_pending(&self, peer: &PeerId) -> bool { - self.location_for_peer.read().contains_key(peer) - } } diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index 438602b41..b23bf8ebe 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -3,7 +3,7 @@ //! Mainly maintains a healthy and optimal pool of connections to other peers in the network //! and routes requests to the optimal peers. -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::{ sync::{atomic::AtomicU64, Arc, Weak}, time::{Duration, Instant}, @@ -157,7 +157,7 @@ impl Ring { } pub fn open_connections(&self) -> usize { - self.connection_manager.get_open_connections() + self.connection_manager.connection_count() } async fn refresh_router(router: Arc>, register: ER) { @@ -385,11 +385,6 @@ impl Ring { tracing::info!("Initializing connection maintenance task"); let is_gateway = self.is_gateway; tracing::info!(is_gateway, "Connection maintenance task starting"); - #[cfg(not(test))] - const CONNECTION_AGE_THRESOLD: Duration = Duration::from_secs(60 * 5); - #[cfg(test)] - const CONNECTION_AGE_THRESOLD: Duration = Duration::from_secs(5); - #[cfg(not(test))] const CHECK_TICK_DURATION: Duration = Duration::from_secs(60); #[cfg(test)] @@ -460,7 +455,7 @@ impl Ring { error })?; if live_tx.is_none() { - let conns = self.connection_manager.get_open_connections(); + let conns = self.connection_manager.connection_count(); tracing::warn!( "acquire_new returned None - likely no peers to query through (connections: {})", conns @@ -477,30 +472,50 @@ impl Ring { } } - let current_connections = self.connection_manager.get_open_connections(); + let current_connections = self.connection_manager.connection_count(); let pending_connection_targets = pending_conn_adds.len(); - let neighbor_locations = { - let peers = self.connection_manager.get_connections_by_location(); - tracing::debug!( - "Maintenance task: current connections = {}, checking topology", - current_connections + let peers = self.connection_manager.get_connections_by_location(); + let connections_considered: usize = peers.values().map(|c| c.len()).sum(); + + let mut neighbor_locations: BTreeMap<_, Vec<_>> = peers + .iter() + .map(|(loc, conns)| { + let conns: Vec<_> = conns + .iter() + .filter(|conn| !live_tx_tracker.has_live_connection(&conn.location.peer)) + .cloned() + .collect(); + (*loc, conns) + }) + .filter(|(_, conns)| !conns.is_empty()) + .collect(); + + if neighbor_locations.is_empty() && connections_considered > 0 { + tracing::warn!( + current_connections, + connections_considered, + live_tx_peers = live_tx_tracker.len(), + "Neighbor filtering removed all candidates; using all connections" ); - peers + + neighbor_locations = peers .iter() - .map(|(loc, conns)| { - let conns: Vec<_> = conns - .iter() - .filter(|conn| { - conn.open_at.elapsed() > CONNECTION_AGE_THRESOLD - && !live_tx_tracker.has_live_connection(&conn.location.peer) - }) - .cloned() - .collect(); - (*loc, conns) - }) + .map(|(loc, conns)| (*loc, conns.clone())) .filter(|(_, conns)| !conns.is_empty()) - .collect() - }; + .collect(); + } + + if current_connections > self.connection_manager.max_connections { + // When over capacity, consider all connections for removal regardless of live_tx filter. + neighbor_locations = peers.clone(); + } + + tracing::debug!( + "Maintenance task: current connections = {}, candidates = {}, live_tx_peers = {}", + current_connections, + peers.len(), + live_tx_tracker.len() + ); let adjustment = self .connection_manager @@ -589,7 +604,7 @@ impl Ring { live_tx_tracker: &LiveTransactionTracker, op_manager: &Arc, ) -> anyhow::Result> { - let current_connections = self.connection_manager.get_open_connections(); + let current_connections = self.connection_manager.connection_count(); let is_gateway = self.is_gateway; tracing::info!(