diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs index 907a8fa75..050b526ad 100644 --- a/crates/core/src/config/mod.rs +++ b/crates/core/src/config/mod.rs @@ -45,7 +45,7 @@ pub(crate) const PCK_VERSION: &str = env!("CARGO_PKG_VERSION"); // Initialize the executor once. static ASYNC_RT: LazyLock> = LazyLock::new(GlobalExecutor::initialize_async_rt); -const DEFAULT_TRANSIENT_BUDGET: usize = 32; +const DEFAULT_TRANSIENT_BUDGET: usize = 2048; const DEFAULT_TRANSIENT_TTL_SECS: u64 = 30; const QUALIFIER: &str = ""; diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 15d1b1534..4cc9d99d1 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -635,6 +635,12 @@ impl P2pConnManager { "Failed to enqueue DropConnection command" ); } + // Immediately prune topology counters so we don't leak open connection slots. + ctx.bridge + .op_manager + .ring + .prune_connection(peer.clone()) + .await; if let Some(conn) = ctx.connections.remove(&peer) { // TODO: review: this could potentially leave garbage tasks in the background with peer listener match timeout( @@ -828,15 +834,23 @@ impl P2pConnManager { // Collect network information if config.include_network_info { - let connected_peers: Vec<_> = ctx - .connections - .keys() - .map(|p| (p.to_string(), p.addr.to_string())) - .collect(); + let cm = &op_manager.ring.connection_manager; + let connections_by_loc = cm.get_connections_by_location(); + let mut connected_peers = Vec::new(); + for conns in connections_by_loc.values() { + for conn in conns { + connected_peers.push(( + conn.location.peer.to_string(), + conn.location.peer.addr.to_string(), + )); + } + } + connected_peers.sort_by(|a, b| a.0.cmp(&b.0)); + connected_peers.dedup_by(|a, b| a.0 == b.0); response.network_info = Some(NetworkInfo { + active_connections: connected_peers.len(), connected_peers, - active_connections: ctx.connections.len(), }); } @@ -915,28 +929,43 @@ impl P2pConnManager { } } + // Collect topology-backed connection info (exclude transient transports). + let cm = &op_manager.ring.connection_manager; + let connections_by_loc = cm.get_connections_by_location(); + let mut connected_peer_ids = Vec::new(); + if config.include_detailed_peer_info { + use freenet_stdlib::client_api::ConnectedPeerInfo; + for conns in connections_by_loc.values() { + for conn in conns { + connected_peer_ids.push(conn.location.peer.to_string()); + response.connected_peers_detailed.push( + ConnectedPeerInfo { + peer_id: conn.location.peer.to_string(), + address: conn.location.peer.addr.to_string(), + }, + ); + } + } + } else { + for conns in connections_by_loc.values() { + connected_peer_ids.extend( + conns.iter().map(|c| c.location.peer.to_string()), + ); + } + } + connected_peer_ids.sort(); + connected_peer_ids.dedup(); + // Collect system metrics if config.include_system_metrics { let seeding_contracts = op_manager.ring.all_network_subscriptions().len() as u32; response.system_metrics = Some(SystemMetrics { - active_connections: ctx.connections.len() as u32, + active_connections: connected_peer_ids.len() as u32, seeding_contracts, }); } - // Collect detailed peer information if requested - if config.include_detailed_peer_info { - use freenet_stdlib::client_api::ConnectedPeerInfo; - // Populate detailed peer information from actual connections - for peer in ctx.connections.keys() { - response.connected_peers_detailed.push(ConnectedPeerInfo { - peer_id: peer.to_string(), - address: peer.addr.to_string(), - }); - } - } - match timeout( Duration::from_secs(2), callback.send(QueryResult::NodeDiagnostics(response)), @@ -976,6 +1005,13 @@ impl P2pConnManager { } => { tracing::debug!(%tx, %key, "local subscribe complete"); + // If this is a child operation, complete it and let the parent flow handle result delivery. + if op_manager.is_sub_operation(tx) { + tracing::info!(%tx, %key, "completing child subscribe operation"); + op_manager.completed(tx); + continue; + } + if !op_manager.is_sub_operation(tx) { let response = Ok(HostResponse::ContractResponse( ContractResponse::SubscribeResponse { key, subscribed }, @@ -1281,10 +1317,57 @@ impl P2pConnManager { let loc = entry .location .unwrap_or_else(|| Location::from_address(&peer.addr)); + // Re-run admission + cap guard when promoting a transient connection. + let should_accept = connection_manager.should_accept(loc, &peer); + if !should_accept { + tracing::warn!( + tx = %tx, + %peer, + %loc, + "connect_peer: promotion rejected by admission logic" + ); + callback + .send_result(Err(())) + .await + .inspect_err(|err| { + tracing::debug!( + tx = %tx, + remote = %peer, + ?err, + "connect_peer: failed to notify rejected-promotion callback" + ); + }) + .ok(); + return Ok(()); + } + let current = connection_manager.connection_count(); + if current >= connection_manager.max_connections { + tracing::warn!( + tx = %tx, + %peer, + current_connections = current, + max_connections = connection_manager.max_connections, + %loc, + "connect_peer: rejecting transient promotion to enforce cap" + ); + callback + .send_result(Err(())) + .await + .inspect_err(|err| { + tracing::debug!( + tx = %tx, + remote = %peer, + ?err, + "connect_peer: failed to notify cap-rejection callback" + ); + }) + .ok(); + return Ok(()); + } self.bridge .op_manager .ring - .add_connection(loc, peer.clone(), false) + .add_connection(loc, peer.clone(), true) .await; tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient"); } @@ -1734,7 +1817,7 @@ impl P2pConnManager { ); return Ok(()); } - let current = connection_manager.num_connections(); + let current = connection_manager.connection_count(); if current >= connection_manager.max_connections { tracing::warn!( %peer_id, @@ -1749,7 +1832,7 @@ impl P2pConnManager { self.bridge .op_manager .ring - .add_connection(loc, peer_id.clone(), false) + .add_connection(loc, peer_id.clone(), true) .await; if is_transient { connection_manager.drop_transient(&peer_id); @@ -1760,7 +1843,7 @@ impl P2pConnManager { let should_accept = connection_manager.should_accept(loc, &peer_id); if should_accept { connection_manager.drop_transient(&peer_id); - let current = connection_manager.num_connections(); + let current = connection_manager.connection_count(); if current >= connection_manager.max_connections { tracing::warn!( %peer_id, diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 456ebd54a..42f568e6b 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -842,6 +842,15 @@ pub(crate) async fn join_ring_request( OpError::ConnError(ConnectionError::LocationUnknown) })?; + tracing::debug!( + peer = %gateway.peer, + reserved_connections = op_manager + .ring + .connection_manager + .get_reserved_connections(), + "join_ring_request: evaluating gateway connection attempt" + ); + if !op_manager .ring .connection_manager @@ -926,56 +935,71 @@ pub(crate) async fn initial_join_procedure( gateways.len() ); + let mut in_flight_gateways = HashSet::new(); + loop { let open_conns = op_manager.ring.open_connections(); let unconnected_gateways: Vec<_> = op_manager.ring.is_not_connected(gateways.iter()).collect(); + let available_gateways: Vec<_> = unconnected_gateways + .into_iter() + .filter(|gateway| !in_flight_gateways.contains(&gateway.peer)) + .collect(); tracing::debug!( - "Connection status: open_connections = {}, unconnected_gateways = {}", - open_conns, - unconnected_gateways.len() + open_connections = open_conns, + inflight_gateway_dials = in_flight_gateways.len(), + available_gateways = available_gateways.len(), + "Connection status before join attempt" ); - let unconnected_count = unconnected_gateways.len(); + let available_count = available_gateways.len(); - if open_conns < BOOTSTRAP_THRESHOLD && unconnected_count > 0 { + if open_conns < BOOTSTRAP_THRESHOLD && available_count > 0 { tracing::info!( "Below bootstrap threshold ({} < {}), attempting to connect to {} gateways", open_conns, BOOTSTRAP_THRESHOLD, - number_of_parallel_connections.min(unconnected_count) + number_of_parallel_connections.min(available_count) ); - let select_all = FuturesUnordered::new(); - for gateway in unconnected_gateways + let mut select_all = FuturesUnordered::new(); + for gateway in available_gateways .into_iter() .shuffle() .take(number_of_parallel_connections) { tracing::info!(%gateway, "Attempting connection to gateway"); + in_flight_gateways.insert(gateway.peer.clone()); let op_manager = op_manager.clone(); + let gateway_clone = gateway.clone(); select_all.push(async move { - (join_ring_request(None, gateway, &op_manager).await, gateway) + ( + join_ring_request(None, &gateway_clone, &op_manager).await, + gateway_clone, + ) }); } - select_all - .for_each(|(res, gateway)| async move { - if let Err(error) = res { - if !matches!( - error, - OpError::ConnError( - crate::node::ConnectionError::UnwantedConnection - ) - ) { - tracing::error!( - %gateway, - %error, - "Failed while attempting connection to gateway" - ); - } + while let Some((res, gateway)) = select_all.next().await { + if let Err(error) = res { + if !matches!( + error, + OpError::ConnError(crate::node::ConnectionError::UnwantedConnection) + ) { + tracing::error!( + %gateway, + %error, + "Failed while attempting connection to gateway" + ); } - }) - .await; + } + in_flight_gateways.remove(&gateway.peer); + } + } else if open_conns < BOOTSTRAP_THRESHOLD && available_count == 0 { + tracing::debug!( + open_connections = open_conns, + inflight = in_flight_gateways.len(), + "Below threshold but all gateways are already connected or in-flight" + ); } else if open_conns >= BOOTSTRAP_THRESHOLD { tracing::trace!( "Have {} connections (>= threshold of {}), not attempting gateway connections", diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index f910becf2..6300c9de7 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -14,8 +14,6 @@ use super::*; pub(crate) struct TransientEntry { /// Entry tracking a transient connection that hasn't been added to the ring topology yet. /// Transient connections are typically unsolicited inbound connections to gateways. - #[allow(dead_code)] - pub opened_at: Instant, /// Advertised location for the transient peer, if known at admission time. pub location: Option, } @@ -49,11 +47,15 @@ impl ConnectionManager { Ring::DEFAULT_MIN_CONNECTIONS }; - let max_connections = if let Some(v) = config.max_number_conn { + let mut max_connections = if let Some(v) = config.max_number_conn { v } else { Ring::DEFAULT_MAX_CONNECTIONS }; + // Gateways benefit from a wider neighbor set for forwarding; default to a higher cap when unset. + if config.is_gateway && config.max_number_conn.is_none() { + max_connections = 20; + } let max_upstream_bandwidth = if let Some(v) = config.max_upstream_bandwidth { v @@ -308,7 +310,6 @@ impl ConnectionManager { self.peer_key.lock().clone() } - #[allow(dead_code)] pub fn is_gateway(&self) -> bool { self.is_gateway } @@ -329,13 +330,8 @@ impl ConnectionManager { } let key = peer.clone(); - self.transient_connections.insert( - peer, - TransientEntry { - opened_at: Instant::now(), - location, - }, - ); + self.transient_connections + .insert(peer, TransientEntry { location }); let prev = self.transient_in_use.fetch_add(1, Ordering::SeqCst); if prev >= self.transient_budget { // Undo if we raced past the budget. @@ -409,6 +405,22 @@ impl ConnectionManager { let previous_location = lop.insert(peer.clone(), loc); drop(lop); + // Enforce the global cap when adding a new peer (relocations reuse the existing slot). + if previous_location.is_none() && self.connection_count() >= self.max_connections { + tracing::warn!( + %peer, + %loc, + max = self.max_connections, + "add_connection: rejecting new connection to enforce cap" + ); + // Roll back bookkeeping since we're refusing the connection. + self.location_for_peer.write().remove(&peer); + if was_reserved { + self.pending_reservations.write().remove(&peer); + } + return; + } + if let Some(prev_loc) = previous_location { tracing::info!( %peer, @@ -439,7 +451,7 @@ impl ConnectionManager { } pub fn update_peer_identity(&self, old_peer: &PeerId, new_peer: PeerId) -> bool { - if old_peer == &new_peer { + if old_peer.addr == new_peer.addr && old_peer.pub_key == new_peer.pub_key { tracing::debug!(%old_peer, "update_peer_identity: identical peers; skipping"); return false; } @@ -557,31 +569,16 @@ impl ConnectionManager { skip_list: impl Contains, router: &Router, ) -> Option { - let connections = self.connections_by_location.read(); - tracing::debug!( - total_locations = connections.len(), - self_peer = self - .get_peer_key() - .as_ref() - .map(|id| id.to_string()) - .unwrap_or_else(|| "unknown".into()), - "routing: considering connections" - ); - let peers = connections.values().filter_map(|conns| { - let conn = conns.choose(&mut rand::rng())?; - if self.is_transient(&conn.location.peer) { - return None; - } - if let Some(requester) = requesting { - if requester == &conn.location.peer { - return None; - } - } - (!skip_list.has_element(conn.location.peer.clone())).then_some(&conn.location) - }); - router.select_peer(peers, target).cloned() + let candidates = self.routing_candidates(target, requesting, skip_list); + + if candidates.is_empty() { + return None; + } + + router.select_peer(candidates.iter(), target).cloned() } + /// Gather routing candidates after applying skip/transient filters. pub fn routing_candidates( &self, target: Location, @@ -589,30 +586,35 @@ impl ConnectionManager { skip_list: impl Contains, ) -> Vec { let connections = self.connections_by_location.read(); - let mut candidates: Vec = connections + let candidates: Vec = connections .values() - .flat_map(|conns| conns.iter()) - .filter(|conn| { - !self.is_transient(&conn.location.peer) - && (requesting != Some(&conn.location.peer)) - && !skip_list.has_element(conn.location.peer.clone()) + .filter_map(|conns| { + let conn = conns.choose(&mut rand::rng())?; + if self.is_transient(&conn.location.peer) { + return None; + } + if let Some(requester) = requesting { + if requester == &conn.location.peer { + return None; + } + } + (!skip_list.has_element(conn.location.peer.clone())) + .then_some(conn.location.clone()) }) - .map(|conn| conn.location.clone()) .collect(); - candidates.sort_by(|a, b| { - let da = a - .location - .unwrap_or_else(|| Location::from_address(&a.peer.addr)) - .distance(target) - .as_f64(); - let db = b - .location - .unwrap_or_else(|| Location::from_address(&b.peer.addr)) - .distance(target) - .as_f64(); - da.partial_cmp(&db).unwrap_or(std::cmp::Ordering::Equal) - }); + tracing::debug!( + total_locations = connections.len(), + candidates = candidates.len(), + target = %target, + self_peer = self + .get_peer_key() + .as_ref() + .map(|id| id.to_string()) + .unwrap_or_else(|| "unknown".into()), + "routing candidates for next hop (non-transient only)" + ); + candidates } diff --git a/crates/core/src/transport/connection_handler.rs b/crates/core/src/transport/connection_handler.rs index 83e0650f1..7b26e75fd 100644 --- a/crates/core/src/transport/connection_handler.rs +++ b/crates/core/src/transport/connection_handler.rs @@ -445,9 +445,9 @@ impl UdpPacketsListener { } else { // Non-gateway peers: mark as expected and wait for the normal peer handshake flow. self.expected_non_gateway.insert(remote_addr.ip()); - tracing::warn!( + tracing::debug!( %remote_addr, - "unexpected peer intro from non-gateway; marking expected_non_gateway and continuing" + "unexpected peer intro; marking expected_non_gateway" ); continue; } diff --git a/crates/core/src/transport/crypto.rs b/crates/core/src/transport/crypto.rs index 342d3791e..79cb3673b 100644 --- a/crates/core/src/transport/crypto.rs +++ b/crates/core/src/transport/crypto.rs @@ -6,7 +6,6 @@ use rsa::{ Pkcs1v15Encrypt, RsaPrivateKey, RsaPublicKey, }; use serde::{Deserialize, Serialize}; -use sha2::{Digest, Sha256}; #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct TransportKeypair { @@ -113,8 +112,15 @@ impl std::fmt::Display for TransportPublicKey { use pkcs8::EncodePublicKey; let encoded = self.0.to_public_key_der().map_err(|_| std::fmt::Error)?; - let digest = Sha256::digest(encoded.as_bytes()); - write!(f, "{}", bs58::encode(digest).into_string()) + if encoded.as_bytes().len() >= 16 { + let bytes = encoded.as_bytes(); + let first_six = &bytes[..6]; + let last_six = &bytes[bytes.len() - 6..]; + let to_encode = [first_six, last_six].concat(); + write!(f, "{}", bs58::encode(to_encode).into_string()) + } else { + write!(f, "{}", bs58::encode(encoded.as_bytes()).into_string()) + } } } diff --git a/crates/core/tests/connectivity.rs b/crates/core/tests/connectivity.rs index 9d06ec191..b06c09d66 100644 --- a/crates/core/tests/connectivity.rs +++ b/crates/core/tests/connectivity.rs @@ -420,6 +420,9 @@ async fn test_three_node_network_connectivity(ctx: &mut TestContext) -> TestResu ); } + // Allow a brief settling period before exercising contract operations. + tokio::time::sleep(Duration::from_secs(2)).await; + // Verify functionality with PUT/GET tracing::info!("Verifying network functionality with PUT/GET operations"); diff --git a/crates/core/tests/test_network_integration.rs b/crates/core/tests/test_network_integration.rs index e130d93fb..3373d5cd0 100644 --- a/crates/core/tests/test_network_integration.rs +++ b/crates/core/tests/test_network_integration.rs @@ -7,7 +7,7 @@ use freenet_test_network::TestNetwork; use testresult::TestResult; use tokio_tungstenite::connect_async; -// Build a fresh network for each test to avoid static Sync requirements +// Helper to get or create network async fn get_network() -> TestNetwork { TestNetwork::builder() .gateways(1)