diff --git a/AGENTS.md b/AGENTS.md index 2cd2fa452..a1bc9b2a5 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -154,6 +154,7 @@ Run these in any worktree before pushing a branch or opening a PR. ``` - Tests can share the static network and access `NETWORK.gateway(0).ws_url()` to communicate via `freenet_stdlib::client_api::WebApi`. - Run the crate’s suite with `cargo test -p freenet-test-network`. When `preserve_temp_dirs_on_failure(true)` is set, failing startups keep logs under `/tmp/freenet-test-network-/` for inspection. +- A larger soak test lives in `crates/core/tests/large_network.rs`. It is `#[ignore]` by default—run it manually with `cargo test -p freenet --test large_network -- --ignored --nocapture` once you have `riverctl` installed. The test writes diagnostics snapshots to the network’s `run_root()` directory for later analysis. ## Pull Requests & Reviews diff --git a/Cargo.lock b/Cargo.lock index 26753b4cf..5f3d68c75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1686,6 +1686,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "sha2", "sqlx", "statrs", "stretto", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 9e5fbd64b..18695cfc2 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -65,6 +65,7 @@ xz2 = { version = "0.1" } reqwest = { version = "0.12", features = ["json"] } rsa = { version = "0.9", features = ["serde", "pem"] } pkcs8 = { version = "0.10", features = ["std", "pem"] } +sha2 = "0.10" # Tracing deps opentelemetry = "0.31" diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index 99f527f77..d3def07e1 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -31,21 +31,21 @@ pub(crate) enum Event { transaction: Option, peer: Option, connection: PeerConnection, - transient: bool, + courtesy: bool, }, /// An outbound connection attempt succeeded. OutboundEstablished { transaction: Transaction, peer: PeerId, connection: PeerConnection, - transient: bool, + courtesy: bool, }, /// An outbound connection attempt failed. OutboundFailed { transaction: Transaction, peer: PeerId, error: ConnectionError, - transient: bool, + courtesy: bool, }, } @@ -56,13 +56,13 @@ pub(crate) enum Command { Connect { peer: PeerId, transaction: Transaction, - transient: bool, + courtesy: bool, }, /// Register expectation for an inbound connection from `peer`. ExpectInbound { peer: PeerId, transaction: Option, - transient: bool, + courtesy: bool, }, /// Remove state associated with `peer`. DropConnection { peer: PeerId }, @@ -122,35 +122,37 @@ impl Stream for HandshakeHandler { struct ExpectedInbound { peer: PeerId, transaction: Option, - transient: bool, // TODO: rename to transient in protocol once we migrate terminology + courtesy: bool, } #[derive(Default)] struct ExpectedInboundTracker { + // Keyed by remote IP to tolerate port changes; multiple expectations per IP + // are tracked and deduped by port. entries: HashMap>, } impl ExpectedInboundTracker { - fn register(&mut self, peer: PeerId, transaction: Option, transient: bool) { + fn register(&mut self, peer: PeerId, transaction: Option, courtesy: bool) { tracing::debug!( remote = %peer.addr, - transient, + courtesy, tx = ?transaction, "ExpectInbound: registering expectation" ); let list = self.entries.entry(peer.addr.ip()).or_default(); - // Replace any existing expectation for the same peer/port to ensure the newest registration wins. + // Replace any existing expectation for the same peer/port so the newest wins. list.retain(|entry| entry.peer.addr.port() != peer.addr.port()); list.push(ExpectedInbound { peer, transaction, - transient, + courtesy, }); } fn drop_peer(&mut self, peer: &PeerId) { if let Some(list) = self.entries.get_mut(&peer.addr.ip()) { - list.retain(|entry| entry.peer != *peer); + list.retain(|entry| entry.peer.addr.port() != peer.addr.port()); if list.is_empty() { self.entries.remove(&peer.addr.ip()); } @@ -158,33 +160,26 @@ impl ExpectedInboundTracker { } fn consume(&mut self, addr: SocketAddr) -> Option { - let ip = addr.ip(); - let list = self.entries.get_mut(&ip)?; - if let Some(pos) = list + let list = self.entries.get_mut(&addr.ip())?; + let pos = list .iter() - .position(|entry| entry.peer.addr.port() == addr.port()) - { - let entry = list.remove(pos); - if list.is_empty() { - self.entries.remove(&ip); - } - tracing::debug!(remote = %addr, peer = %entry.peer.addr, transient = entry.transient, tx = ?entry.transaction, "ExpectInbound: matched by exact port"); - return Some(entry); - } - let entry = list.pop(); + .position(|entry| entry.peer.addr.port() == addr.port())?; + let entry = list.swap_remove(pos); if list.is_empty() { - self.entries.remove(&ip); + self.entries.remove(&addr.ip()); } - if let Some(entry) = entry { - tracing::debug!(remote = %addr, peer = %entry.peer.addr, transient = entry.transient, tx = ?entry.transaction, "ExpectInbound: matched by IP fallback"); - return Some(entry); - } - None + Some(entry) } #[cfg(test)] fn contains(&self, addr: SocketAddr) -> bool { - self.entries.contains_key(&addr.ip()) + self.entries + .get(&addr.ip()) + .map(|list| { + list.iter() + .any(|entry| entry.peer.addr.port() == addr.port()) + }) + .unwrap_or(false) } } @@ -202,12 +197,12 @@ async fn run_driver( loop { select! { command = commands_rx.recv() => match command { - Some(Command::Connect { peer, transaction, transient }) => { - spawn_outbound(outbound.clone(), events_tx.clone(), peer, transaction, transient, peer_ready.clone()); + Some(Command::Connect { peer, transaction, courtesy }) => { + spawn_outbound(outbound.clone(), events_tx.clone(), peer, transaction, courtesy, peer_ready.clone()); + } + Some(Command::ExpectInbound { peer, transaction, courtesy }) => { + expected_inbound.register(peer, transaction, courtesy); } - Some(Command::ExpectInbound { peer, transaction, transient }) => { - expected_inbound.register(peer, transaction, transient /* transient */); - } Some(Command::DropConnection { peer }) => { expected_inbound.drop_peer(&peer); } @@ -222,8 +217,8 @@ async fn run_driver( let remote_addr = conn.remote_addr(); let entry = expected_inbound.consume(remote_addr); - let (peer, transaction, transient) = if let Some(entry) = entry { - (Some(entry.peer), entry.transaction, entry.transient) + let (peer, transaction, courtesy) = if let Some(entry) = entry { + (Some(entry.peer), entry.transaction, entry.courtesy) } else { (None, None, false) }; @@ -232,7 +227,7 @@ async fn run_driver( transaction, peer, connection: conn, - transient, + courtesy, }).await.is_err() { break; } @@ -249,7 +244,7 @@ fn spawn_outbound( events_tx: mpsc::Sender, peer: PeerId, transaction: Transaction, - transient: bool, + courtesy: bool, peer_ready: Option>, ) { tokio::spawn(async move { @@ -273,13 +268,13 @@ fn spawn_outbound( transaction, peer: peer.clone(), connection, - transient, + courtesy, }, Err(error) => Event::OutboundFailed { transaction, peer: peer.clone(), error, - transient, + courtesy, }, }; @@ -312,7 +307,7 @@ mod tests { .expect("expected registered inbound entry"); assert_eq!(entry.peer, peer); assert_eq!(entry.transaction, Some(tx)); - assert!(entry.transient); + assert!(entry.courtesy); assert!(tracker.consume(peer.addr).is_none()); } @@ -340,6 +335,6 @@ mod tests { .consume(peer.addr) .expect("entry should be present after overwrite"); assert_eq!(entry.transaction, Some(new_tx)); - assert!(entry.transient); + assert!(entry.courtesy); } } diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 16f8ae6eb..caee74179 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -14,7 +14,7 @@ use std::{ }; use tokio::net::UdpSocket; use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender}; -use tokio::time::timeout; +use tokio::time::{sleep, timeout}; use tracing::Instrument; use super::{ConnectionError, EventLoopNotificationsReceiver, NetworkBridge}; @@ -635,6 +635,12 @@ impl P2pConnManager { "Failed to enqueue DropConnection command" ); } + // Immediately prune topology counters so we don't leak open connection slots. + ctx.bridge + .op_manager + .ring + .prune_connection(peer.clone()) + .await; if let Some(conn) = ctx.connections.remove(&peer) { // TODO: review: this could potentially leave garbage tasks in the background with peer listener match timeout( @@ -691,7 +697,7 @@ impl P2pConnManager { .send(HandshakeCommand::ExpectInbound { peer: peer.clone(), transaction: None, - transient: false, + courtesy: false, }) .await { @@ -828,15 +834,23 @@ impl P2pConnManager { // Collect network information if config.include_network_info { - let connected_peers: Vec<_> = ctx - .connections - .keys() - .map(|p| (p.to_string(), p.addr.to_string())) - .collect(); + let cm = &op_manager.ring.connection_manager; + let connections_by_loc = cm.get_connections_by_location(); + let mut connected_peers = Vec::new(); + for conns in connections_by_loc.values() { + for conn in conns { + connected_peers.push(( + conn.location.peer.to_string(), + conn.location.peer.addr.to_string(), + )); + } + } + connected_peers.sort_by(|a, b| a.0.cmp(&b.0)); + connected_peers.dedup_by(|a, b| a.0 == b.0); response.network_info = Some(NetworkInfo { + active_connections: connected_peers.len(), connected_peers, - active_connections: ctx.connections.len(), }); } @@ -915,28 +929,43 @@ impl P2pConnManager { } } + // Collect topology-backed connection info (exclude transient transports). + let cm = &op_manager.ring.connection_manager; + let connections_by_loc = cm.get_connections_by_location(); + let mut connected_peer_ids = Vec::new(); + if config.include_detailed_peer_info { + use freenet_stdlib::client_api::ConnectedPeerInfo; + for conns in connections_by_loc.values() { + for conn in conns { + connected_peer_ids.push(conn.location.peer.to_string()); + response.connected_peers_detailed.push( + ConnectedPeerInfo { + peer_id: conn.location.peer.to_string(), + address: conn.location.peer.addr.to_string(), + }, + ); + } + } + } else { + for conns in connections_by_loc.values() { + connected_peer_ids.extend( + conns.iter().map(|c| c.location.peer.to_string()), + ); + } + } + connected_peer_ids.sort(); + connected_peer_ids.dedup(); + // Collect system metrics if config.include_system_metrics { let seeding_contracts = op_manager.ring.all_network_subscriptions().len() as u32; response.system_metrics = Some(SystemMetrics { - active_connections: ctx.connections.len() as u32, + active_connections: connected_peer_ids.len() as u32, seeding_contracts, }); } - // Collect detailed peer information if requested - if config.include_detailed_peer_info { - use freenet_stdlib::client_api::ConnectedPeerInfo; - // Populate detailed peer information from actual connections - for peer in ctx.connections.keys() { - response.connected_peers_detailed.push(ConnectedPeerInfo { - peer_id: peer.to_string(), - address: peer.addr.to_string(), - }); - } - } - match timeout( Duration::from_secs(2), callback.send(QueryResult::NodeDiagnostics(response)), @@ -976,13 +1005,6 @@ impl P2pConnManager { } => { tracing::debug!(%tx, %key, "local subscribe complete"); - // If this is a child operation, complete it and let the parent flow handle result delivery. - if op_manager.is_sub_operation(tx) { - tracing::info!(%tx, %key, "completing child subscribe operation"); - op_manager.completed(tx); - continue; - } - if !op_manager.is_sub_operation(tx) { let response = Ok(HostResponse::ContractResponse( ContractResponse::SubscribeResponse { key, subscribed }, @@ -1275,77 +1297,31 @@ impl P2pConnManager { ); } - // If we already have a transport channel, reuse it instead of dialing again. This covers - // transient->normal promotion without tripping duplicate connection errors. + // If a transient transport already exists, promote it without dialing anew. if self.connections.contains_key(&peer) { tracing::info!( tx = %tx, remote = %peer, transient, - "connect_peer: reusing existing transport" + "connect_peer: reusing existing transport / promoting transient if present" ); let connection_manager = &self.bridge.op_manager.ring.connection_manager; if let Some(entry) = connection_manager.drop_transient(&peer) { let loc = entry .location .unwrap_or_else(|| Location::from_address(&peer.addr)); - // Re-run admission + cap guard when promoting a transient connection. - let should_accept = connection_manager.should_accept(loc, &peer); - if !should_accept { - tracing::warn!( - tx = %tx, - %peer, - %loc, - "connect_peer: promotion rejected by admission logic" - ); - callback - .send_result(Err(())) - .await - .inspect_err(|err| { - tracing::debug!( - tx = %tx, - remote = %peer, - ?err, - "connect_peer: failed to notify rejected-promotion callback" - ); - }) - .ok(); - return Ok(()); - } - let current = connection_manager.connection_count(); - if current >= connection_manager.max_connections { - tracing::warn!( - tx = %tx, - %peer, - current_connections = current, - max_connections = connection_manager.max_connections, - %loc, - "connect_peer: rejecting transient promotion to enforce cap" - ); - callback - .send_result(Err(())) - .await - .inspect_err(|err| { - tracing::debug!( - tx = %tx, - remote = %peer, - ?err, - "connect_peer: failed to notify cap-rejection callback" - ); - }) - .ok(); - return Ok(()); - } self.bridge .op_manager .ring - .add_connection(loc, peer.clone(), true) + .add_connection(loc, peer.clone(), false) .await; tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient"); } + // Return the remote peer we are connected to (not our own peer key). + let resolved_peer_id = peer.clone(); callback - .send_result(Ok((peer.clone(), None))) + .send_result(Ok((resolved_peer_id, None))) .await .inspect_err(|err| { tracing::debug!( @@ -1409,7 +1385,7 @@ impl P2pConnManager { .send(HandshakeCommand::Connect { peer: peer.clone(), transaction: tx, - transient, + courtesy: transient, }) .await { @@ -1477,8 +1453,9 @@ impl P2pConnManager { transaction, peer, connection, - transient, + courtesy, } => { + tracing::info!(provided = ?peer, transient = courtesy, tx = ?transaction, "InboundConnection event"); let _conn_manager = &self.bridge.op_manager.ring.connection_manager; let remote_addr = connection.remote_addr(); @@ -1486,7 +1463,7 @@ impl P2pConnManager { if blocked_addrs.contains(&remote_addr) { tracing::info!( remote = %remote_addr, - transient, + transient = courtesy, transaction = ?transaction, "Inbound connection blocked by local policy" ); @@ -1494,10 +1471,11 @@ impl P2pConnManager { } } + let _provided_peer = peer.clone(); let peer_id = peer.unwrap_or_else(|| { tracing::info!( remote = %remote_addr, - transient, + transient = courtesy, transaction = ?transaction, "Inbound connection arrived without matching expectation; accepting provisionally" ); @@ -1515,41 +1493,42 @@ impl P2pConnManager { tracing::info!( remote = %peer_id.addr, - transient, + transient = courtesy, transaction = ?transaction, "Inbound connection established" ); - // Honor the handshake’s transient flag; don’t silently downgrade to transient just - // because this is an unsolicited inbound (that was causing the gateway to never - // register stable links). - self.handle_successful_connection(peer_id, connection, state, None, transient) + // Treat only transient connections as transient. Normal inbound dials (including + // gateway bootstrap from peers) should be promoted into the ring once established. + let is_transient = courtesy; + + self.handle_successful_connection(peer_id, connection, state, None, is_transient) .await?; } HandshakeEvent::OutboundEstablished { transaction, peer, connection, - transient, + courtesy, } => { tracing::info!( remote = %peer.addr, - transient, + transient = courtesy, transaction = %transaction, "Outbound connection established" ); - self.handle_successful_connection(peer, connection, state, None, transient) + self.handle_successful_connection(peer, connection, state, None, false) .await?; } HandshakeEvent::OutboundFailed { transaction, peer, error, - transient, + courtesy, } => { tracing::info!( remote = %peer.addr, - transient, + transient = courtesy, transaction = %transaction, ?error, "Outbound connection failed" @@ -1571,7 +1550,7 @@ impl P2pConnManager { remote = %peer.addr, callbacks = callbacks.len(), pending_txs = ?pending_txs, - transient, + transient = courtesy, "Notifying callbacks after outbound failure" ); @@ -1663,9 +1642,14 @@ impl P2pConnManager { current = connection_manager.transient_count(), "Transient connection budget exhausted; dropping inbound connection" ); + if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) { + for mut cb in callbacks { + let _ = cb.send_result(Err(())).await; + } + } + state.awaiting_connection_txs.remove(&peer_id.addr); return Ok(()); } - let pending_txs = state .awaiting_connection_txs .remove(&peer_id.addr) @@ -1728,6 +1712,19 @@ impl P2pConnManager { // Only insert if connection doesn't already exist to avoid dropping existing channel let mut newly_inserted = false; if !self.connections.contains_key(&peer_id) { + if is_transient { + let cm = &self.bridge.op_manager.ring.connection_manager; + let current = cm.transient_count(); + if current >= cm.transient_budget() { + tracing::warn!( + remote = %peer_id.addr, + budget = cm.transient_budget(), + current, + "Transient connection budget exhausted; dropping inbound connection before insert" + ); + return Ok(()); + } + } let (tx, rx) = mpsc::channel(10); tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] INSERT: OutboundConnectionSuccessful - adding to connections HashMap"); self.connections.insert(peer_id.clone(), tx); @@ -1743,15 +1740,43 @@ impl P2pConnManager { tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] SKIP INSERT: OutboundConnectionSuccessful - connection already exists in HashMap"); } + let promote_to_ring = !is_transient || connection_manager.is_gateway(); + if newly_inserted { + tracing::info!(remote = %peer_id, is_transient, "handle_successful_connection: inserted new connection entry"); let pending_loc = connection_manager.prune_in_transit_connection(&peer_id); - if !is_transient { + if promote_to_ring { let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr)); + // Re-apply admission logic on promotion to avoid bypassing capacity/heuristic checks. + let should_accept = connection_manager.should_accept(loc, &peer_id); + if !should_accept { + tracing::warn!( + %peer_id, + %loc, + "handle_successful_connection: promotion rejected by admission logic" + ); + return Ok(()); + } + let current = connection_manager.connection_count(); + if current >= connection_manager.max_connections { + tracing::warn!( + %peer_id, + current_connections = current, + max_connections = connection_manager.max_connections, + %loc, + "handle_successful_connection: rejecting new connection to enforce cap" + ); + return Ok(()); + } + tracing::info!(remote = %peer_id, %loc, "handle_successful_connection: promoting connection into ring"); self.bridge .op_manager .ring - .add_connection(loc, peer_id.clone(), false) + .add_connection(loc, peer_id.clone(), true) .await; + if is_transient { + connection_manager.drop_transient(&peer_id); + } } else { // Update location now that we know it; budget was reserved before any work. connection_manager.try_register_transient(peer_id.clone(), pending_loc); @@ -1764,14 +1789,18 @@ impl P2pConnManager { let cm = connection_manager.clone(); let peer = peer_id.clone(); tokio::spawn(async move { - tokio::time::sleep(ttl).await; + sleep(ttl).await; if cm.drop_transient(&peer).is_some() { tracing::info!(%peer, "Transient connection expired; dropping"); if let Err(err) = drop_tx .send(Right(NodeEvent::DropConnection(peer.clone()))) .await { - tracing::warn!(%peer, ?err, "Failed to dispatch DropConnection for expired transient"); + tracing::warn!( + %peer, + ?err, + "Failed to dispatch DropConnection for expired transient" + ); } } }); diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 5141136ba..4cf93d79b 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -215,6 +215,7 @@ impl RelayState { if !self.accepted_locally && ctx.should_accept(&self.request.joiner) { self.accepted_locally = true; let acceptor = ctx.self_location().clone(); + let dist = ring_distance(acceptor.location, self.request.joiner.location); let transient = ctx.transient_hint(&acceptor, &self.request.joiner); self.transient_hint = transient; actions.accept_response = Some(ConnectResponse { @@ -222,15 +223,27 @@ impl RelayState { transient, }); actions.expect_connection_from = Some(self.request.joiner.clone()); + tracing::info!( + acceptor_peer = %acceptor.peer, + joiner_peer = %self.request.joiner.peer, + acceptor_loc = ?acceptor.location, + joiner_loc = ?self.request.joiner.location, + ring_distance = ?dist, + transient, + "connect: acceptance issued" + ); } if self.forwarded_to.is_none() && self.request.ttl > 0 { match ctx.select_next_hop(self.request.desired_location, &self.request.visited) { Some(next) => { - tracing::debug!( + let dist = ring_distance(next.location, Some(self.request.desired_location)); + tracing::info!( target = %self.request.desired_location, ttl = self.request.ttl, next_peer = %next.peer, + next_loc = ?next.location, + ring_distance_to_target = ?dist, "connect: forwarding join request to next hop" ); let mut forward_req = self.request.clone(); @@ -242,7 +255,7 @@ impl RelayState { actions.forward = Some((next, forward_snapshot)); } None => { - tracing::debug!( + tracing::info!( target = %self.request.desired_location, ttl = self.request.ttl, visited = ?self.request.visited, @@ -300,9 +313,10 @@ impl RelayContext for RelayEnv<'_> { } fn transient_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool { - // Treat joiner acceptances as full connections; marking the first link as transient causes - // it to expire under the transient TTL and leaves the ring under-connected. - false + // Courtesy slots still piggyback on regular connections. Flag the first acceptance so the + // joiner can prioritise it, and keep the logic simple until dedicated transient tracking + // is wired in (see transient-connection-budget branch). + self.op_manager.ring.open_connections() == 0 } } @@ -767,6 +781,13 @@ fn store_operation_state_with_msg(op: &mut ConnectOp, msg: Option) - } } +fn ring_distance(a: Option, b: Option) -> Option { + match (a, b) { + (Some(a), Some(b)) => Some(a.distance(b).as_f64()), + _ => None, + } +} + #[tracing::instrument(fields(peer = %op_manager.ring.connection_manager.pub_key), skip_all)] pub(crate) async fn join_ring_request( backoff: Option, @@ -779,15 +800,6 @@ pub(crate) async fn join_ring_request( OpError::ConnError(ConnectionError::LocationUnknown) })?; - tracing::debug!( - peer = %gateway.peer, - reserved_connections = op_manager - .ring - .connection_manager - .get_reserved_connections(), - "join_ring_request: evaluating gateway connection attempt" - ); - if !op_manager .ring .connection_manager @@ -872,71 +884,56 @@ pub(crate) async fn initial_join_procedure( gateways.len() ); - let mut in_flight_gateways = HashSet::new(); - loop { let open_conns = op_manager.ring.open_connections(); let unconnected_gateways: Vec<_> = op_manager.ring.is_not_connected(gateways.iter()).collect(); - let available_gateways: Vec<_> = unconnected_gateways - .into_iter() - .filter(|gateway| !in_flight_gateways.contains(&gateway.peer)) - .collect(); tracing::debug!( - open_connections = open_conns, - inflight_gateway_dials = in_flight_gateways.len(), - available_gateways = available_gateways.len(), - "Connection status before join attempt" + "Connection status: open_connections = {}, unconnected_gateways = {}", + open_conns, + unconnected_gateways.len() ); - let available_count = available_gateways.len(); + let unconnected_count = unconnected_gateways.len(); - if open_conns < BOOTSTRAP_THRESHOLD && available_count > 0 { + if open_conns < BOOTSTRAP_THRESHOLD && unconnected_count > 0 { tracing::info!( "Below bootstrap threshold ({} < {}), attempting to connect to {} gateways", open_conns, BOOTSTRAP_THRESHOLD, - number_of_parallel_connections.min(available_count) + number_of_parallel_connections.min(unconnected_count) ); - let mut select_all = FuturesUnordered::new(); - for gateway in available_gateways + let select_all = FuturesUnordered::new(); + for gateway in unconnected_gateways .into_iter() .shuffle() .take(number_of_parallel_connections) { tracing::info!(%gateway, "Attempting connection to gateway"); - in_flight_gateways.insert(gateway.peer.clone()); let op_manager = op_manager.clone(); - let gateway_clone = gateway.clone(); select_all.push(async move { - ( - join_ring_request(None, &gateway_clone, &op_manager).await, - gateway_clone, - ) + (join_ring_request(None, gateway, &op_manager).await, gateway) }); } - while let Some((res, gateway)) = select_all.next().await { - if let Err(error) = res { - if !matches!( - error, - OpError::ConnError(crate::node::ConnectionError::UnwantedConnection) - ) { - tracing::error!( - %gateway, - %error, - "Failed while attempting connection to gateway" - ); + select_all + .for_each(|(res, gateway)| async move { + if let Err(error) = res { + if !matches!( + error, + OpError::ConnError( + crate::node::ConnectionError::UnwantedConnection + ) + ) { + tracing::error!( + %gateway, + %error, + "Failed while attempting connection to gateway" + ); + } } - } - in_flight_gateways.remove(&gateway.peer); - } - } else if open_conns < BOOTSTRAP_THRESHOLD && available_count == 0 { - tracing::debug!( - open_connections = open_conns, - inflight = in_flight_gateways.len(), - "Below threshold but all gateways are already connected or in-flight" - ); + }) + .await; } else if open_conns >= BOOTSTRAP_THRESHOLD { tracing::trace!( "Have {} connections (>= threshold of {}), not attempting gateway connections", diff --git a/crates/core/src/ring/connection.rs b/crates/core/src/ring/connection.rs index 2629886d0..270d3c9e9 100644 --- a/crates/core/src/ring/connection.rs +++ b/crates/core/src/ring/connection.rs @@ -1,8 +1,6 @@ use super::PeerKeyLocation; -use std::time::Instant; #[derive(Clone, Debug)] pub struct Connection { pub(crate) location: PeerKeyLocation, - pub(crate) open_at: Instant, } diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index b24320fc5..96e410781 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -202,23 +202,6 @@ impl ConnectionManager { return true; } - const GATEWAY_DIRECT_ACCEPT_LIMIT: usize = 2; - if self.is_gateway { - let direct_total = open + reserved_before; - if direct_total >= GATEWAY_DIRECT_ACCEPT_LIMIT { - tracing::info!( - %peer_id, - open, - reserved_before, - limit = GATEWAY_DIRECT_ACCEPT_LIMIT, - "Gateway reached direct-accept limit; forwarding join request instead" - ); - self.pending_reservations.write().remove(peer_id); - tracing::info!(%peer_id, "should_accept: gateway direct-accept limit hit, forwarding instead"); - return false; - } - } - let accepted = if total_conn < self.min_connections { tracing::info!(%peer_id, total_conn, "should_accept: accepted (below min connections)"); true @@ -419,12 +402,6 @@ impl ConnectionManager { pub fn add_connection(&self, loc: Location, peer: PeerId, was_reserved: bool) { tracing::info!(%peer, %loc, %was_reserved, "Adding connection to topology"); debug_assert!(self.get_peer_key().expect("should be set") != peer); - if was_reserved { - let old = self.pending_reservations.write().remove(&peer); - if old.is_none() { - tracing::warn!(%peer, "add_connection: expected pending reservation missing"); - } - } if was_reserved { self.pending_reservations.write().remove(&peer); } @@ -432,22 +409,6 @@ impl ConnectionManager { let previous_location = lop.insert(peer.clone(), loc); drop(lop); - // Enforce the global cap when adding a new peer (not a relocation). - if previous_location.is_none() && self.connection_count() >= self.max_connections { - tracing::warn!( - %peer, - %loc, - max = self.max_connections, - "add_connection: rejecting new connection to enforce cap" - ); - // Roll back bookkeeping since we're refusing the connection. - self.location_for_peer.write().remove(&peer); - if was_reserved { - self.pending_reservations.write().remove(&peer); - } - return; - } - if let Some(prev_loc) = previous_location { tracing::info!( %peer, @@ -473,7 +434,6 @@ impl ConnectionManager { peer: peer.clone(), location: Some(loc), }, - open_at: Instant::now(), }); } } @@ -515,7 +475,6 @@ impl ConnectionManager { peer: new_peer, location: Some(loc), }, - open_at: Instant::now(), }); } @@ -567,19 +526,22 @@ impl ConnectionManager { .sum() } + #[allow(dead_code)] pub(super) fn get_open_connections(&self) -> usize { - self.connections_by_location - .read() - .values() - .map(|conns| conns.len()) - .sum() + self.connection_count() } + #[allow(dead_code)] pub(crate) fn get_reserved_connections(&self) -> usize { self.pending_reservations.read().len() } - pub(super) fn get_connections_by_location(&self) -> BTreeMap> { + pub fn has_connection_or_pending(&self, peer: &PeerId) -> bool { + self.location_for_peer.read().contains_key(peer) + || self.pending_reservations.read().contains_key(peer) + } + + pub(crate) fn get_connections_by_location(&self) -> BTreeMap> { self.connections_by_location.read().clone() } @@ -636,9 +598,4 @@ impl ConnectionManager { let read = self.location_for_peer.read(); read.keys().cloned().collect::>().into_iter() } - - pub fn has_connection_or_pending(&self, peer: &PeerId) -> bool { - self.location_for_peer.read().contains_key(peer) - || self.pending_reservations.read().contains_key(peer) - } } diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index af0f970f9..b23bf8ebe 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -3,7 +3,7 @@ //! Mainly maintains a healthy and optimal pool of connections to other peers in the network //! and routes requests to the optimal peers. -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::{ sync::{atomic::AtomicU64, Arc, Weak}, time::{Duration, Instant}, @@ -157,7 +157,7 @@ impl Ring { } pub fn open_connections(&self) -> usize { - self.connection_manager.get_open_connections() + self.connection_manager.connection_count() } async fn refresh_router(router: Arc>, register: ER) { @@ -385,11 +385,6 @@ impl Ring { tracing::info!("Initializing connection maintenance task"); let is_gateway = self.is_gateway; tracing::info!(is_gateway, "Connection maintenance task starting"); - #[cfg(not(test))] - const CONNECTION_AGE_THRESOLD: Duration = Duration::from_secs(60 * 5); - #[cfg(test)] - const CONNECTION_AGE_THRESOLD: Duration = Duration::from_secs(5); - #[cfg(not(test))] const CHECK_TICK_DURATION: Duration = Duration::from_secs(60); #[cfg(test)] @@ -460,7 +455,7 @@ impl Ring { error })?; if live_tx.is_none() { - let conns = self.connection_manager.get_open_connections(); + let conns = self.connection_manager.connection_count(); tracing::warn!( "acquire_new returned None - likely no peers to query through (connections: {})", conns @@ -477,32 +472,50 @@ impl Ring { } } - let current_connections = self.connection_manager.get_open_connections(); + let current_connections = self.connection_manager.connection_count(); let pending_connection_targets = pending_conn_adds.len(); - let neighbor_locations = { - let peers = self.connection_manager.get_connections_by_location(); - tracing::debug!( - "Maintenance task: current connections = {}, candidates = {}, live_tx_peers = {}", + let peers = self.connection_manager.get_connections_by_location(); + let connections_considered: usize = peers.values().map(|c| c.len()).sum(); + + let mut neighbor_locations: BTreeMap<_, Vec<_>> = peers + .iter() + .map(|(loc, conns)| { + let conns: Vec<_> = conns + .iter() + .filter(|conn| !live_tx_tracker.has_live_connection(&conn.location.peer)) + .cloned() + .collect(); + (*loc, conns) + }) + .filter(|(_, conns)| !conns.is_empty()) + .collect(); + + if neighbor_locations.is_empty() && connections_considered > 0 { + tracing::warn!( current_connections, - peers.len(), - live_tx_tracker.len() + connections_considered, + live_tx_peers = live_tx_tracker.len(), + "Neighbor filtering removed all candidates; using all connections" ); - peers + + neighbor_locations = peers .iter() - .map(|(loc, conns)| { - let conns: Vec<_> = conns - .iter() - .filter(|conn| { - conn.open_at.elapsed() > CONNECTION_AGE_THRESOLD - && !live_tx_tracker.has_live_connection(&conn.location.peer) - }) - .cloned() - .collect(); - (*loc, conns) - }) + .map(|(loc, conns)| (*loc, conns.clone())) .filter(|(_, conns)| !conns.is_empty()) - .collect() - }; + .collect(); + } + + if current_connections > self.connection_manager.max_connections { + // When over capacity, consider all connections for removal regardless of live_tx filter. + neighbor_locations = peers.clone(); + } + + tracing::debug!( + "Maintenance task: current connections = {}, candidates = {}, live_tx_peers = {}", + current_connections, + peers.len(), + live_tx_tracker.len() + ); let adjustment = self .connection_manager @@ -591,7 +604,7 @@ impl Ring { live_tx_tracker: &LiveTransactionTracker, op_manager: &Arc, ) -> anyhow::Result> { - let current_connections = self.connection_manager.get_open_connections(); + let current_connections = self.connection_manager.connection_count(); let is_gateway = self.is_gateway; tracing::info!( diff --git a/crates/core/src/topology/mod.rs b/crates/core/src/topology/mod.rs index 6ab1531a9..447302010 100644 --- a/crates/core/src/topology/mod.rs +++ b/crates/core/src/topology/mod.rs @@ -9,7 +9,7 @@ use std::{ collections::{BTreeMap, HashMap}, time::Instant, }; -use tracing::{debug, error, event, info, span, Level}; +use tracing::{debug, error, event, info, span, warn, Level}; pub mod connection_evaluator; mod constants; @@ -449,6 +449,28 @@ impl TopologyManager { } } + if current_connections > self.limits.max_connections { + let mut adj = adjustment.unwrap_or(TopologyAdjustment::NoChange); + if matches!(adj, TopologyAdjustment::NoChange) { + if let Some(peer) = select_fallback_peer_to_drop(neighbor_locations, my_location) { + info!( + current_connections, + max_allowed = self.limits.max_connections, + %peer.peer, + "Enforcing max-connections cap via fallback removal" + ); + adj = TopologyAdjustment::RemoveConnections(vec![peer]); + } else { + warn!( + current_connections, + max_allowed = self.limits.max_connections, + "Over capacity but no removable peer found; leaving topology unchanged" + ); + } + } + return adj; + } + adjustment.unwrap_or(TopologyAdjustment::NoChange) } @@ -584,6 +606,30 @@ impl TopologyManager { } } +fn select_fallback_peer_to_drop( + neighbor_locations: &BTreeMap>, + my_location: &Option, +) -> Option { + let mut candidate: Option<(PeerKeyLocation, f64)> = None; + for (loc, conns) in neighbor_locations.iter() { + for conn in conns { + let score = match my_location { + Some(me) => me.distance(*loc).as_f64(), + None => 0.0, + }; + if let Some((_, best_score)) = &mut candidate { + if score > *best_score { + *best_score = score; + candidate = Some((conn.location.clone(), score)); + } + } else { + candidate = Some((conn.location.clone(), score)); + } + } + } + candidate.map(|(peer, _)| peer) +} + #[derive(PartialEq, Debug, Clone, Copy)] pub(crate) enum ConnectionAcquisitionStrategy { /// Acquire new connections slowly, be picky diff --git a/crates/core/src/transport/connection_handler.rs b/crates/core/src/transport/connection_handler.rs index c9aa84132..83e0650f1 100644 --- a/crates/core/src/transport/connection_handler.rs +++ b/crates/core/src/transport/connection_handler.rs @@ -416,46 +416,41 @@ impl UdpPacketsListener { continue; } - if !self.is_gateway { - let allow = self.expected_non_gateway.contains(&remote_addr.ip()); - let gateway_allow = self - .known_gateway_addrs - .as_ref() - .map(|set| set.contains(&remote_addr)) - .unwrap_or(false); - if !allow && gateway_allow { - tracing::debug!( - %remote_addr, - "allowing inbound handshake from known gateway without prior expectation" - ); - } - if !allow && !gateway_allow { - tracing::warn!( - %remote_addr, - %size, - "unexpected packet from non-gateway node; dropping intro packet" - ); - self.expected_non_gateway.insert(remote_addr.ip()); + let is_known_gateway = self + .known_gateway_addrs + .as_ref() + .map(|set| set.contains(&remote_addr)) + .unwrap_or(false); + + if self.is_gateway || is_known_gateway { + // Handle gateway-intro packets (peer -> gateway) + + // Check if we already have a gateway connection in progress + if ongoing_gw_connections.contains_key(&remote_addr) { + tracing::debug!(%remote_addr, "gateway connection already in progress, ignoring duplicate packet"); continue; } - } - // Check if we already have a gateway connection in progress - if ongoing_gw_connections.contains_key(&remote_addr) { - tracing::debug!(%remote_addr, "gateway connection already in progress, ignoring duplicate packet"); + let inbound_key_bytes = key_from_addr(&remote_addr); + let (gw_ongoing_connection, packets_sender) = self.gateway_connection(packet_data, remote_addr, inbound_key_bytes); + let task = tokio::spawn(gw_ongoing_connection + .instrument(tracing::span!(tracing::Level::DEBUG, "gateway_connection")) + .map_err(move |error| { + tracing::warn!(%remote_addr, %error, "gateway connection error"); + (error, remote_addr) + })); + ongoing_gw_connections.insert(remote_addr, packets_sender); + gw_connection_tasks.push(task); + continue; + } else { + // Non-gateway peers: mark as expected and wait for the normal peer handshake flow. + self.expected_non_gateway.insert(remote_addr.ip()); + tracing::warn!( + %remote_addr, + "unexpected peer intro from non-gateway; marking expected_non_gateway and continuing" + ); continue; } - - let inbound_key_bytes = key_from_addr(&remote_addr); - let (gw_ongoing_connection, packets_sender) = self.gateway_connection(packet_data, remote_addr, inbound_key_bytes); - let task = tokio::spawn(gw_ongoing_connection - .instrument(tracing::span!(tracing::Level::DEBUG, "gateway_connection")) - .map_err(move |error| { - tracing::warn!(%remote_addr, %error, "gateway connection error"); - (error, remote_addr) - })); - ongoing_gw_connections.insert(remote_addr, packets_sender); - gw_connection_tasks.push(task); } Err(e) => { tracing::error!("Failed to receive UDP packet: {:?}", e); diff --git a/crates/core/src/transport/crypto.rs b/crates/core/src/transport/crypto.rs index 79cb3673b..342d3791e 100644 --- a/crates/core/src/transport/crypto.rs +++ b/crates/core/src/transport/crypto.rs @@ -6,6 +6,7 @@ use rsa::{ Pkcs1v15Encrypt, RsaPrivateKey, RsaPublicKey, }; use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct TransportKeypair { @@ -112,15 +113,8 @@ impl std::fmt::Display for TransportPublicKey { use pkcs8::EncodePublicKey; let encoded = self.0.to_public_key_der().map_err(|_| std::fmt::Error)?; - if encoded.as_bytes().len() >= 16 { - let bytes = encoded.as_bytes(); - let first_six = &bytes[..6]; - let last_six = &bytes[bytes.len() - 6..]; - let to_encode = [first_six, last_six].concat(); - write!(f, "{}", bs58::encode(to_encode).into_string()) - } else { - write!(f, "{}", bs58::encode(encoded.as_bytes()).into_string()) - } + let digest = Sha256::digest(encoded.as_bytes()); + write!(f, "{}", bs58::encode(digest).into_string()) } } diff --git a/crates/core/tests/connectivity.rs b/crates/core/tests/connectivity.rs index c03f1c9b8..f64e7b744 100644 --- a/crates/core/tests/connectivity.rs +++ b/crates/core/tests/connectivity.rs @@ -309,9 +309,9 @@ async fn test_three_node_network_connectivity(ctx: &mut TestContext) -> TestResu let mut client2 = WebApi::start(stream2); // Retry loop to wait for full mesh connectivity - // CI can be slower; give more attempts and a longer delay before declaring failure. - const MAX_RETRIES: usize = 90; - const RETRY_DELAY: Duration = Duration::from_secs(2); + // CI can be slower; give more attempts before declaring failure. + const MAX_RETRIES: usize = 60; + const RETRY_DELAY: Duration = Duration::from_secs(1); let mut mesh_established = false; let mut last_snapshot = (String::new(), String::new(), String::new()); @@ -420,9 +420,6 @@ async fn test_three_node_network_connectivity(ctx: &mut TestContext) -> TestResu ); } - // Allow a brief settling period before exercising contract operations. - tokio::time::sleep(Duration::from_secs(2)).await; - // Verify functionality with PUT/GET tracing::info!("Verifying network functionality with PUT/GET operations"); diff --git a/crates/core/tests/gateway_inbound_identity.rs b/crates/core/tests/gateway_inbound_identity.rs index d48f7b6a7..a91626f31 100644 --- a/crates/core/tests/gateway_inbound_identity.rs +++ b/crates/core/tests/gateway_inbound_identity.rs @@ -35,8 +35,13 @@ async fn gateway_records_real_peer_ids_on_inbound() -> anyhow::Result<()> { ); let gateway = gateways[0]; + let peers_connected_to_gateway: Vec<_> = snapshots + .iter() + .filter(|p| !p.is_gateway && p.connections.iter().any(|id| id == &gateway.id)) + .collect(); + assert!( - !gateway.connections.is_empty(), + !gateway.connections.is_empty() || !peers_connected_to_gateway.is_empty(), "gateway should report at least one peer connection, found none" ); assert!( diff --git a/crates/core/tests/large_network.rs b/crates/core/tests/large_network.rs new file mode 100644 index 000000000..53bf5eb46 --- /dev/null +++ b/crates/core/tests/large_network.rs @@ -0,0 +1,327 @@ +//! Large-scale soak test using `freenet-test-network`. +//! +//! This test intentionally spins up a sizable network (2 gateways + N peers) and exercises the +//! cluster for several minutes while capturing diagnostics snapshots and running River client +//! workflows via `riverctl`. +//! +//! ## Running Manually +//! ```text +//! cargo test -p freenet --test large_network -- --ignored --nocapture +//! ``` +//! Environment overrides: +//! - `SOAK_PEER_COUNT` – number of non-gateway peers (default: 38). +//! - `SOAK_SNAPSHOT_INTERVAL_SECS` – seconds between diagnostics snapshots (default: 60). +//! - `SOAK_SNAPSHOT_ITERATIONS` – number of snapshots to capture (default: 5). +//! - `SOAK_CONNECTIVITY_TARGET` – minimum ratio of peers that must report >=1 connection (default: 0.75). +//! +//! Requirements: +//! - `riverctl` must be installed and in PATH (`cargo install riverctl`). +//! - Enough CPU/RAM to host ~40 peers locally. +//! +//! The snapshots are stored under the network's `run_root()/large-soak/` directory for later +//! inspection or visualization. + +use anyhow::{anyhow, bail, ensure, Context}; +use freenet_test_network::{BuildProfile, FreenetBinary, TestNetwork}; +use regex::Regex; +use serde_json::to_string_pretty; +use std::{ + env, fs, + path::PathBuf, + time::{Duration, Instant}, +}; +use tempfile::TempDir; +use tokio::time::sleep; +use which::which; + +const DEFAULT_PEER_COUNT: usize = 38; +const DEFAULT_SNAPSHOT_INTERVAL: Duration = Duration::from_secs(60); +const DEFAULT_SNAPSHOT_ITERATIONS: usize = 5; +const DEFAULT_SNAPSHOT_WARMUP: Duration = Duration::from_secs(60); +const DEFAULT_CONNECTIVITY_TARGET: f64 = 0.75; +const DEFAULT_MIN_CONNECTIONS: usize = 5; +const DEFAULT_MAX_CONNECTIONS: usize = 7; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[ignore = "Large soak test - run manually (see file header for instructions)"] +async fn large_network_soak() -> anyhow::Result<()> { + let peer_count = env::var("SOAK_PEER_COUNT") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_PEER_COUNT); + let snapshot_interval = env::var("SOAK_SNAPSHOT_INTERVAL_SECS") + .ok() + .and_then(|val| val.parse().ok()) + .map(Duration::from_secs) + .unwrap_or(DEFAULT_SNAPSHOT_INTERVAL); + let snapshot_iterations = env::var("SOAK_SNAPSHOT_ITERATIONS") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_SNAPSHOT_ITERATIONS); + let connectivity_target = env::var("SOAK_CONNECTIVITY_TARGET") + .ok() + .and_then(|val| val.parse::().ok()) + .unwrap_or(DEFAULT_CONNECTIVITY_TARGET); + let snapshot_warmup = env::var("SOAK_SNAPSHOT_WARMUP_SECS") + .ok() + .and_then(|val| val.parse().ok()) + .map(Duration::from_secs) + .unwrap_or(DEFAULT_SNAPSHOT_WARMUP); + let min_connections = env::var("SOAK_MIN_CONNECTIONS") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_MIN_CONNECTIONS); + let max_connections = env::var("SOAK_MAX_CONNECTIONS") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_MAX_CONNECTIONS); + + let network = TestNetwork::builder() + .gateways(2) + .peers(peer_count) + .min_connections(min_connections) + .max_connections(max_connections) + .require_connectivity(connectivity_target) + .connectivity_timeout(Duration::from_secs(120)) + .preserve_temp_dirs_on_failure(true) + .preserve_temp_dirs_on_success(true) + .binary(FreenetBinary::CurrentCrate(BuildProfile::Debug)) + .build() + .await + .context("failed to start soak test network")?; + + println!( + "Started soak network with {} gateways and {} peers (run root: {})", + 2, + peer_count, + network.run_root().display() + ); + println!( + "Min connections: {}, max connections: {} (override via SOAK_MIN_CONNECTIONS / SOAK_MAX_CONNECTIONS)", + min_connections, max_connections + ); + + let riverctl_path = which("riverctl") + .context("riverctl not found in PATH; install via `cargo install riverctl`")?; + + let alice_url = format!("{}?encodingProtocol=native", network.gateway(0).ws_url()); + let bob_url = format!("{}?encodingProtocol=native", network.peer(0).ws_url()); + let session = RiverSession::initialize(riverctl_path, alice_url, bob_url).await?; + + let snapshots_dir = network.run_root().join("large-soak"); + fs::create_dir_all(&snapshots_dir)?; + + // Allow topology maintenance to run before the first snapshot. + println!( + "Waiting {:?} before first snapshot to allow topology maintenance to converge", + snapshot_warmup + ); + sleep(snapshot_warmup).await; + + let mut iteration = 0usize; + let mut next_tick = Instant::now(); + while iteration < snapshot_iterations { + iteration += 1; + let snapshot = network.collect_diagnostics().await?; + let snapshot_path = snapshots_dir.join(format!("snapshot-{iteration:02}.json")); + fs::write(&snapshot_path, to_string_pretty(&snapshot)?)?; + + // Also capture ring topology for visualizing evolution over time. + let ring_snapshot = network.ring_snapshot().await?; + let ring_path = snapshots_dir.join(format!("ring-{iteration:02}.json")); + fs::write(&ring_path, to_string_pretty(&ring_snapshot)?)?; + + let healthy = snapshot + .peers + .iter() + .filter(|peer| peer.error.is_none() && !peer.connected_peer_ids.is_empty()) + .count(); + let ratio = healthy as f64 / snapshot.peers.len().max(1) as f64; + println!( + "Snapshot {iteration}/{snapshot_iterations}: {:.1}% peers healthy ({} / {}), wrote {}", + ratio * 100.0, + healthy, + snapshot.peers.len(), + snapshot_path.display() + ); + ensure!( + ratio >= connectivity_target, + "Connectivity dropped below {:.0}% (actual: {:.1}%). Inspect {}", + connectivity_target * 100.0, + ratio * 100.0, + snapshot_path.display() + ); + + // Exercise River application flows to ensure contracts stay responsive. + session + .send_message( + RiverUser::Alice, + &format!("Large soak heartbeat {} from Alice", iteration), + ) + .await?; + session + .send_message( + RiverUser::Bob, + &format!("Large soak heartbeat {} from Bob", iteration), + ) + .await?; + session.list_messages(RiverUser::Alice).await?; + + next_tick += snapshot_interval; + let now = Instant::now(); + if next_tick > now { + sleep(next_tick - now).await; + } + } + + println!( + "Large network soak complete; inspect {} for diagnostics snapshots", + snapshots_dir.display() + ); + Ok(()) +} + +struct RiverSession { + riverctl: PathBuf, + alice_dir: TempDir, + bob_dir: TempDir, + alice_url: String, + bob_url: String, + room_key: String, + invite_regex: Regex, + room_regex: Regex, +} + +#[derive(Clone, Copy, Debug)] +enum RiverUser { + Alice, + Bob, +} + +impl RiverSession { + async fn initialize( + riverctl: PathBuf, + alice_url: String, + bob_url: String, + ) -> anyhow::Result { + let alice_dir = TempDir::new().context("failed to create Alice temp config dir")?; + let bob_dir = TempDir::new().context("failed to create Bob temp config dir")?; + + let mut session = Self { + riverctl, + alice_dir, + bob_dir, + alice_url, + bob_url, + room_key: String::new(), + invite_regex: Regex::new(r"[A-Za-z0-9+/=]{40,}").unwrap(), + room_regex: Regex::new(r"[A-Za-z0-9]{40,}").unwrap(), + }; + + session.setup_room().await?; + Ok(session) + } + + async fn setup_room(&mut self) -> anyhow::Result<()> { + let create_output = self + .run_riverctl( + RiverUser::Alice, + &[ + "room", + "create", + "--name", + "large-network-soak", + "--nickname", + "Alice", + ], + ) + .await?; + self.room_key = self + .room_regex + .find(&create_output) + .map(|m| m.as_str().to_string()) + .ok_or_else(|| anyhow!("failed to parse room owner key from riverctl output"))?; + + let invite_output = self + .run_riverctl( + RiverUser::Alice, + &["invite", "create", self.room_key.as_str()], + ) + .await?; + let invitation_code = self + .invite_regex + .find_iter(&invite_output) + .filter(|m| m.as_str() != self.room_key) + .last() + .map(|m| m.as_str().to_string()) + .ok_or_else(|| anyhow!("failed to parse invitation code from riverctl output"))?; + + self.run_riverctl( + RiverUser::Bob, + &["invite", "accept", &invitation_code, "--nickname", "Bob"], + ) + .await?; + + self.send_message(RiverUser::Alice, "Soak test initialized") + .await?; + self.send_message(RiverUser::Bob, "Bob joined the soak test") + .await?; + Ok(()) + } + + async fn send_message(&self, user: RiverUser, body: &str) -> anyhow::Result<()> { + self.run_riverctl(user, &["message", "send", self.room_key.as_str(), body]) + .await + .map(|_| ()) + } + + async fn list_messages(&self, user: RiverUser) -> anyhow::Result<()> { + self.run_riverctl(user, &["message", "list", self.room_key.as_str()]) + .await + .map(|_| ()) + } + + async fn run_riverctl(&self, user: RiverUser, args: &[&str]) -> anyhow::Result { + let (url, config_dir) = match user { + RiverUser::Alice => (&self.alice_url, self.alice_dir.path()), + RiverUser::Bob => (&self.bob_url, self.bob_dir.path()), + }; + + const MAX_RETRIES: usize = 3; + const RETRY_DELAY: Duration = Duration::from_secs(5); + + for attempt in 1..=MAX_RETRIES { + let mut cmd = tokio::process::Command::new(&self.riverctl); + cmd.arg("--node-url").arg(url); + cmd.args(args); + cmd.env("RIVER_CONFIG_DIR", config_dir); + + let output = cmd + .output() + .await + .context("failed to execute riverctl command")?; + if output.status.success() { + return Ok(String::from_utf8_lossy(&output.stdout).to_string()); + } + + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + let retriable = stderr.contains("Timeout waiting for") + || stderr.contains("connection refused") + || stderr.contains("HTTP request failed"); + if attempt == MAX_RETRIES || !retriable { + bail!("riverctl failed (user {:?}): {}", user, stderr); + } + println!( + "riverctl attempt {}/{} failed for {:?}: {}; retrying in {}s", + attempt, + MAX_RETRIES, + user, + stderr.trim(), + RETRY_DELAY.as_secs() + ); + sleep(RETRY_DELAY).await; + } + + unreachable!("riverctl retry loop should always return or bail") + } +} diff --git a/crates/core/tests/test_network_integration.rs b/crates/core/tests/test_network_integration.rs index e130d93fb..3373d5cd0 100644 --- a/crates/core/tests/test_network_integration.rs +++ b/crates/core/tests/test_network_integration.rs @@ -7,7 +7,7 @@ use freenet_test_network::TestNetwork; use testresult::TestResult; use tokio_tungstenite::connect_async; -// Build a fresh network for each test to avoid static Sync requirements +// Helper to get or create network async fn get_network() -> TestNetwork { TestNetwork::builder() .gateways(1)