diff --git a/AGENTS.md b/AGENTS.md index 81c864d48..54150a2ee 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -157,6 +157,7 @@ Run these in any worktree before pushing a branch or opening a PR. ``` - Tests can share the static network and access `NETWORK.gateway(0).ws_url()` to communicate via `freenet_stdlib::client_api::WebApi`. - Run the crate’s suite with `cargo test -p freenet-test-network`. When `preserve_temp_dirs_on_failure(true)` is set, failing startups keep logs under `/tmp/freenet-test-network-/` for inspection. +- A larger soak test lives in `crates/core/tests/large_network.rs`. It is `#[ignore]` by default—run it manually with `cargo test -p freenet --test large_network -- --ignored --nocapture` once you have `riverctl` installed. The test writes diagnostics snapshots to the network’s `run_root()` directory for later analysis. ## Pull Requests & Reviews diff --git a/Cargo.lock b/Cargo.lock index 8ddd374c6..611aeb8bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1679,12 +1679,14 @@ dependencies = [ "pkcs8", "rand 0.9.2", "redb", + "regex", "reqwest", "rsa", "semver", "serde", "serde_json", "serde_with", + "sha2", "sqlx", "statrs", "stretto", @@ -1812,16 +1814,16 @@ dependencies = [ [[package]] name = "freenet-test-network" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d06be6aef3bb0433a963d0cc0c0f9b7d05e50b54fcb929e405fefab10d3b2db9" +version = "0.1.3" dependencies = [ "anyhow", "chrono", "freenet-stdlib", "futures 0.3.31", + "regex", "serde", "serde_json", + "ssh2", "sysinfo", "thiserror 1.0.69", "tokio", @@ -2842,6 +2844,20 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libssh2-sys" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "220e4f05ad4a218192533b300327f5150e809b54c4ec83b5a1d91833601811b9" +dependencies = [ + "cc", + "libc", + "libz-sys", + "openssl-sys", + "pkg-config", + "vcpkg", +] + [[package]] name = "libunwind" version = "1.3.3" @@ -4988,6 +5004,18 @@ dependencies = [ "url", ] +[[package]] +name = "ssh2" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f84d13b3b8a0d4e91a2629911e951db1bb8671512f5c09d7d4ba34500ba68c8" +dependencies = [ + "bitflags 2.10.0", + "libc", + "libssh2-sys", + "parking_lot", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" diff --git a/apps/freenet-ping/app/tests/common/mod.rs b/apps/freenet-ping/app/tests/common/mod.rs index 665962391..906e36832 100644 --- a/apps/freenet-ping/app/tests/common/mod.rs +++ b/apps/freenet-ping/app/tests/common/mod.rs @@ -125,6 +125,8 @@ pub async fn base_node_test_config_with_rng( ignore_protocol_checking: true, address: Some(Ipv4Addr::LOCALHOST.into()), network_port: public_port, // if None, node will pick a free one or use default + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses, transient_budget: None, diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 803670669..f86cfcc81 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -65,6 +65,7 @@ xz2 = { version = "0.1" } reqwest = { version = "0.12", features = ["json"] } rsa = { version = "0.9", features = ["serde", "pem"] } pkcs8 = { version = "0.10", features = ["std", "pem"] } +sha2 = "0.10" # Tracing deps opentelemetry = "0.31" @@ -90,7 +91,7 @@ arbitrary = { features = ["derive"], version = "1" } chrono = { features = ["arbitrary"], workspace = true } freenet-stdlib = { features = ["net", "testing"], workspace = true } freenet-macros = { path = "../freenet-macros" } -freenet-test-network = "0.1.1" +freenet-test-network = { version = "0.1.3", path = "../../../../freenet-test-network" } httptest = "0.16" statrs = "0.18" tempfile = "3" @@ -100,6 +101,7 @@ tokio-tungstenite = "0.27.0" # console-subscriber = { version = "0.4" } ureq = { version = "3.1", features = ["json"] } which = "8.0" +regex = "1" [features] default = ["redb", "trace", "websocket"] diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs index 36072d01f..907a8fa75 100644 --- a/crates/core/src/config/mod.rs +++ b/crates/core/src/config/mod.rs @@ -102,6 +102,8 @@ impl Default for ConfigArgs { blocked_addresses: None, transient_budget: Some(DEFAULT_TRANSIENT_BUDGET), transient_ttl_secs: Some(DEFAULT_TRANSIENT_TTL_SECS), + min_connections: None, + max_connections: None, }, ws_api: WebsocketApiArgs { address: Some(default_listening_address()), @@ -243,6 +245,38 @@ impl ConfigArgs { self.ws_api .token_cleanup_interval_seconds .get_or_insert(cfg.ws_api.token_cleanup_interval_seconds); + self.network_api + .address + .get_or_insert(cfg.network_api.address); + self.network_api + .network_port + .get_or_insert(cfg.network_api.port); + if let Some(addr) = cfg.network_api.public_address { + self.network_api.public_address.get_or_insert(addr); + } + if let Some(port) = cfg.network_api.public_port { + self.network_api.public_port.get_or_insert(port); + } + if let Some(limit) = cfg.network_api.bandwidth_limit { + self.network_api.bandwidth_limit.get_or_insert(limit); + } + if let Some(addrs) = cfg.network_api.blocked_addresses { + self.network_api + .blocked_addresses + .get_or_insert_with(|| addrs.into_iter().collect()); + } + self.network_api + .transient_budget + .get_or_insert(cfg.network_api.transient_budget); + self.network_api + .transient_ttl_secs + .get_or_insert(cfg.network_api.transient_ttl_secs); + self.network_api + .min_connections + .get_or_insert(cfg.network_api.min_connections); + self.network_api + .max_connections + .get_or_insert(cfg.network_api.max_connections); self.log_level.get_or_insert(cfg.log_level); self.config_paths.merge(cfg.config_paths.as_ref().clone()); } @@ -374,6 +408,14 @@ impl ConfigArgs { .network_api .transient_ttl_secs .unwrap_or(DEFAULT_TRANSIENT_TTL_SECS), + min_connections: self + .network_api + .min_connections + .unwrap_or(DEFAULT_MIN_CONNECTIONS), + max_connections: self + .network_api + .max_connections + .unwrap_or(DEFAULT_MAX_CONNECTIONS), }, ws_api: WebsocketApiConfig { // the websocket API is always local @@ -565,6 +607,22 @@ pub struct NetworkArgs { #[arg(long, env = "TRANSIENT_TTL_SECS")] #[serde(rename = "transient-ttl-secs", skip_serializing_if = "Option::is_none")] pub transient_ttl_secs: Option, + + /// Minimum desired connections for the ring topology. Defaults to 10. + #[arg(long = "min-number-of-connections", env = "MIN_NUMBER_OF_CONNECTIONS")] + #[serde( + rename = "min-number-of-connections", + skip_serializing_if = "Option::is_none" + )] + pub min_connections: Option, + + /// Maximum allowed connections for the ring topology. Defaults to 20. + #[arg(long = "max-number-of-connections", env = "MAX_NUMBER_OF_CONNECTIONS")] + #[serde( + rename = "max-number-of-connections", + skip_serializing_if = "Option::is_none" + )] + pub max_connections: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -639,6 +697,20 @@ pub struct NetworkApiConfig { /// Time (in seconds) before an unpromoted transient connection is dropped. #[serde(default = "default_transient_ttl_secs", rename = "transient-ttl-secs")] pub transient_ttl_secs: u64, + + /// Minimum desired connections for the ring topology. + #[serde( + default = "default_min_connections", + rename = "min-number-of-connections" + )] + pub min_connections: usize, + + /// Maximum allowed connections for the ring topology. + #[serde( + default = "default_max_connections", + rename = "max-number-of-connections" + )] + pub max_connections: usize, } mod port_allocation; @@ -656,6 +728,14 @@ fn default_transient_ttl_secs() -> u64 { DEFAULT_TRANSIENT_TTL_SECS } +fn default_min_connections() -> usize { + DEFAULT_MIN_CONNECTIONS +} + +fn default_max_connections() -> usize { + DEFAULT_MAX_CONNECTIONS +} + #[derive(clap::Parser, Debug, Default, Copy, Clone, Serialize, Deserialize)] pub struct WebsocketApiArgs { /// Address to bind to for the websocket API, default is 0.0.0.0 diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index e5446027b..16fabea01 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -192,8 +192,8 @@ impl NodeConfig { config: Arc::new(config.clone()), max_hops_to_live: None, rnd_if_htl_above: None, - max_number_conn: None, - min_number_conn: None, + max_number_conn: Some(config.network_api.max_connections), + min_number_conn: Some(config.network_api.min_connections), max_upstream_bandwidth: None, max_downstream_bandwidth: None, blocked_addresses: config.network_api.blocked_addresses.clone(), diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index d1c56b285..ef516a0ae 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -5,7 +5,7 @@ //! connection attempts to/from the event loop. Higher-level routing decisions now live inside //! `ConnectOp`. -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -31,21 +31,21 @@ pub(crate) enum Event { transaction: Option, peer: Option, connection: PeerConnection, - courtesy: bool, + transient: bool, }, /// An outbound connection attempt succeeded. OutboundEstablished { transaction: Transaction, peer: PeerId, connection: PeerConnection, - courtesy: bool, + transient: bool, }, /// An outbound connection attempt failed. OutboundFailed { transaction: Transaction, peer: PeerId, error: ConnectionError, - courtesy: bool, + transient: bool, }, } @@ -56,13 +56,13 @@ pub(crate) enum Command { Connect { peer: PeerId, transaction: Transaction, - courtesy: bool, + transient: bool, }, /// Register expectation for an inbound connection from `peer`. ExpectInbound { peer: PeerId, transaction: Option, - courtesy: bool, + transient: bool, }, /// Remove state associated with `peer`. DropConnection { peer: PeerId }, @@ -122,37 +122,69 @@ impl Stream for HandshakeHandler { struct ExpectedInbound { peer: PeerId, transaction: Option, - courtesy: bool, + transient: bool, // TODO: rename to transient in protocol once we migrate terminology } #[derive(Default)] struct ExpectedInboundTracker { - entries: HashMap, + entries: HashMap>, } impl ExpectedInboundTracker { - fn register(&mut self, peer: PeerId, transaction: Option, courtesy: bool) { - self.entries.insert( - peer.addr, - ExpectedInbound { + fn register(&mut self, peer: PeerId, transaction: Option, transient: bool) { + tracing::debug!( + remote = %peer.addr, + transient, + tx = ?transaction, + "ExpectInbound: registering expectation" + ); + self.entries + .entry(peer.addr.ip()) + .or_default() + .push(ExpectedInbound { peer, transaction, - courtesy, - }, - ); + transient, + }); } fn drop_peer(&mut self, peer: &PeerId) { - self.entries.remove(&peer.addr); + if let Some(list) = self.entries.get_mut(&peer.addr.ip()) { + list.retain(|entry| entry.peer != *peer); + if list.is_empty() { + self.entries.remove(&peer.addr.ip()); + } + } } fn consume(&mut self, addr: SocketAddr) -> Option { - self.entries.remove(&addr) + let ip = addr.ip(); + let list = self.entries.get_mut(&ip)?; + if let Some(pos) = list + .iter() + .position(|entry| entry.peer.addr.port() == addr.port()) + { + let entry = list.remove(pos); + if list.is_empty() { + self.entries.remove(&ip); + } + tracing::debug!(remote = %addr, peer = %entry.peer.addr, transient = entry.transient, tx = ?entry.transaction, "ExpectInbound: matched by exact port"); + return Some(entry); + } + let entry = list.pop(); + if list.is_empty() { + self.entries.remove(&ip); + } + if let Some(entry) = entry { + tracing::debug!(remote = %addr, peer = %entry.peer.addr, transient = entry.transient, tx = ?entry.transaction, "ExpectInbound: matched by IP fallback"); + return Some(entry); + } + None } #[cfg(test)] fn contains(&self, addr: SocketAddr) -> bool { - self.entries.contains_key(&addr) + self.entries.contains_key(&addr.ip()) } } @@ -170,12 +202,12 @@ async fn run_driver( loop { select! { command = commands_rx.recv() => match command { - Some(Command::Connect { peer, transaction, courtesy }) => { - spawn_outbound(outbound.clone(), events_tx.clone(), peer, transaction, courtesy, peer_ready.clone()); - } - Some(Command::ExpectInbound { peer, transaction, courtesy }) => { - expected_inbound.register(peer, transaction, courtesy); + Some(Command::Connect { peer, transaction, transient }) => { + spawn_outbound(outbound.clone(), events_tx.clone(), peer, transaction, transient, peer_ready.clone()); } + Some(Command::ExpectInbound { peer, transaction, transient }) => { + expected_inbound.register(peer, transaction, transient /* transient */); + } Some(Command::DropConnection { peer }) => { expected_inbound.drop_peer(&peer); } @@ -190,8 +222,8 @@ async fn run_driver( let remote_addr = conn.remote_addr(); let entry = expected_inbound.consume(remote_addr); - let (peer, transaction, courtesy) = if let Some(entry) = entry { - (Some(entry.peer), entry.transaction, entry.courtesy) + let (peer, transaction, transient) = if let Some(entry) = entry { + (Some(entry.peer), entry.transaction, entry.transient) } else { (None, None, false) }; @@ -200,7 +232,7 @@ async fn run_driver( transaction, peer, connection: conn, - courtesy, + transient, }).await.is_err() { break; } @@ -217,7 +249,7 @@ fn spawn_outbound( events_tx: mpsc::Sender, peer: PeerId, transaction: Transaction, - courtesy: bool, + transient: bool, peer_ready: Option>, ) { tokio::spawn(async move { @@ -241,13 +273,13 @@ fn spawn_outbound( transaction, peer: peer.clone(), connection, - courtesy, + transient, }, Err(error) => Event::OutboundFailed { transaction, peer: peer.clone(), error, - courtesy, + transient, }, }; @@ -280,7 +312,7 @@ mod tests { .expect("expected registered inbound entry"); assert_eq!(entry.peer, peer); assert_eq!(entry.transaction, Some(tx)); - assert!(entry.courtesy); + assert!(entry.transient); assert!(tracker.consume(peer.addr).is_none()); } @@ -308,6 +340,6 @@ mod tests { .consume(peer.addr) .expect("entry should be present after overwrite"); assert_eq!(entry.transaction, Some(new_tx)); - assert!(entry.courtesy); + assert!(entry.transient); } } diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 1284e2978..9f4f03e9b 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -14,7 +14,7 @@ use std::{ }; use tokio::net::UdpSocket; use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender}; -use tokio::time::timeout; +use tokio::time::{sleep, timeout}; use tracing::Instrument; use super::{ConnectionError, EventLoopNotificationsReceiver, NetworkBridge}; @@ -635,6 +635,12 @@ impl P2pConnManager { "Failed to enqueue DropConnection command" ); } + // Immediately prune topology counters so we don't leak open connection slots. + ctx.bridge + .op_manager + .ring + .prune_connection(peer.clone()) + .await; if let Some(conn) = ctx.connections.remove(&peer) { // TODO: review: this could potentially leave garbage tasks in the background with peer listener match timeout( @@ -665,13 +671,13 @@ impl P2pConnManager { peer, tx, callback, - is_gw: courtesy, + is_gw: transient, } => { tracing::info!( tx = %tx, remote = %peer, remote_addr = %peer.addr, - courtesy, + transient, "NodeEvent::ConnectPeer received" ); ctx.handle_connect_peer( @@ -680,7 +686,7 @@ impl P2pConnManager { tx, &handshake_cmd_sender, &mut state, - courtesy, + transient, ) .await?; } @@ -691,7 +697,7 @@ impl P2pConnManager { .send(HandshakeCommand::ExpectInbound { peer: peer.clone(), transaction: None, - courtesy: false, + transient: false, }) .await { @@ -828,15 +834,23 @@ impl P2pConnManager { // Collect network information if config.include_network_info { - let connected_peers: Vec<_> = ctx - .connections - .keys() - .map(|p| (p.to_string(), p.addr.to_string())) - .collect(); + let cm = &op_manager.ring.connection_manager; + let connections_by_loc = cm.get_connections_by_location(); + let mut connected_peers = Vec::new(); + for conns in connections_by_loc.values() { + for conn in conns { + connected_peers.push(( + conn.location.peer.to_string(), + conn.location.peer.addr.to_string(), + )); + } + } + connected_peers.sort_by(|a, b| a.0.cmp(&b.0)); + connected_peers.dedup_by(|a, b| a.0 == b.0); response.network_info = Some(NetworkInfo { + active_connections: connected_peers.len(), connected_peers, - active_connections: ctx.connections.len(), }); } @@ -915,28 +929,43 @@ impl P2pConnManager { } } + // Collect topology-backed connection info (exclude transient transports). + let cm = &op_manager.ring.connection_manager; + let connections_by_loc = cm.get_connections_by_location(); + let mut connected_peer_ids = Vec::new(); + if config.include_detailed_peer_info { + use freenet_stdlib::client_api::ConnectedPeerInfo; + for conns in connections_by_loc.values() { + for conn in conns { + connected_peer_ids.push(conn.location.peer.to_string()); + response.connected_peers_detailed.push( + ConnectedPeerInfo { + peer_id: conn.location.peer.to_string(), + address: conn.location.peer.addr.to_string(), + }, + ); + } + } + } else { + for conns in connections_by_loc.values() { + connected_peer_ids.extend( + conns.iter().map(|c| c.location.peer.to_string()), + ); + } + } + connected_peer_ids.sort(); + connected_peer_ids.dedup(); + // Collect system metrics if config.include_system_metrics { let seeding_contracts = op_manager.ring.all_network_subscriptions().len() as u32; response.system_metrics = Some(SystemMetrics { - active_connections: ctx.connections.len() as u32, + active_connections: connected_peer_ids.len() as u32, seeding_contracts, }); } - // Collect detailed peer information if requested - if config.include_detailed_peer_info { - use freenet_stdlib::client_api::ConnectedPeerInfo; - // Populate detailed peer information from actual connections - for peer in ctx.connections.keys() { - response.connected_peers_detailed.push(ConnectedPeerInfo { - peer_id: peer.to_string(), - address: peer.addr.to_string(), - }); - } - } - match timeout( Duration::from_secs(2), callback.send(QueryResult::NodeDiagnostics(response)), @@ -1208,7 +1237,7 @@ impl P2pConnManager { tx: Transaction, handshake_commands: &HandshakeCommandSender, state: &mut EventListenerState, - courtesy: bool, + transient: bool, ) -> anyhow::Result<()> { let mut peer = peer; let mut peer_addr = peer.addr; @@ -1221,13 +1250,13 @@ impl P2pConnManager { tx = %tx, remote = %peer, fallback_addr = %peer_addr, - courtesy, + transient, "ConnectPeer provided unspecified address; using existing connection address" ); } else { tracing::debug!( tx = %tx, - courtesy, + transient, "ConnectPeer received unspecified address without existing connection reference" ); } @@ -1237,7 +1266,7 @@ impl P2pConnManager { tx = %tx, remote = %peer, remote_addr = %peer_addr, - courtesy, + transient, "Connecting to peer" ); if let Some(blocked_addrs) = &self.blocked_addresses { @@ -1268,30 +1297,78 @@ impl P2pConnManager { ); } - // If we already have a transport channel, reuse it instead of dialing again. This covers - // transient->normal promotion without tripping duplicate connection errors. + // If a transient transport already exists, promote it without dialing anew. if self.connections.contains_key(&peer) { tracing::info!( tx = %tx, remote = %peer, - courtesy, - "connect_peer: reusing existing transport" + transient, + "connect_peer: reusing existing transport / promoting transient if present" ); let connection_manager = &self.bridge.op_manager.ring.connection_manager; if let Some(entry) = connection_manager.drop_transient(&peer) { let loc = entry .location .unwrap_or_else(|| Location::from_address(&peer.addr)); + // Re-run admission + cap guard when promoting a transient connection. + let should_accept = connection_manager.should_accept(loc, &peer); + if !should_accept { + tracing::warn!( + tx = %tx, + %peer, + %loc, + "connect_peer: promotion rejected by admission logic" + ); + callback + .send_result(Err(())) + .await + .inspect_err(|err| { + tracing::debug!( + tx = %tx, + remote = %peer, + ?err, + "connect_peer: failed to notify rejected-promotion callback" + ); + }) + .ok(); + return Ok(()); + } + let current = connection_manager.connection_count(); + if current >= connection_manager.max_connections { + tracing::warn!( + tx = %tx, + %peer, + current_connections = current, + max_connections = connection_manager.max_connections, + %loc, + "connect_peer: rejecting transient promotion to enforce cap" + ); + callback + .send_result(Err(())) + .await + .inspect_err(|err| { + tracing::debug!( + tx = %tx, + remote = %peer, + ?err, + "connect_peer: failed to notify cap-rejection callback" + ); + }) + .ok(); + return Ok(()); + } self.bridge .op_manager .ring - .add_connection(loc, peer.clone(), false) + .add_connection(loc, peer.clone(), true) .await; tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient"); } + // Return the remote peer we are connected to (not our own peer key). + let resolved_peer_id = peer.clone(); callback - .send_result(Ok((peer.clone(), None))) + .send_result(Ok((resolved_peer_id, None))) .await .inspect_err(|err| { tracing::debug!( @@ -1315,7 +1392,7 @@ impl P2pConnManager { tx = %tx, remote = %peer_addr, pending = callbacks.get().len(), - courtesy, + transient, "Connection already pending, queuing additional requester" ); callbacks.get_mut().push(callback); @@ -1324,7 +1401,7 @@ impl P2pConnManager { remote = %peer_addr, pending = callbacks.get().len(), pending_txs = ?txs_entry, - courtesy, + transient, "connect_peer: connection already pending, queued callback" ); return Ok(()); @@ -1335,7 +1412,7 @@ impl P2pConnManager { tracing::debug!( tx = %tx, remote = %peer_addr, - courtesy, + transient, "connect_peer: registering new pending connection" ); entry.insert(vec![callback]); @@ -1344,7 +1421,7 @@ impl P2pConnManager { remote = %peer_addr, pending = 1, pending_txs = ?txs_entry, - courtesy, + transient, "connect_peer: registered new pending connection" ); state.outbound_handler.expect_incoming(peer_addr); @@ -1355,14 +1432,14 @@ impl P2pConnManager { .send(HandshakeCommand::Connect { peer: peer.clone(), transaction: tx, - courtesy, + transient, }) .await { tracing::warn!( tx = %tx, remote = %peer.addr, - courtesy, + transient, ?error, "Failed to enqueue connect command" ); @@ -1377,7 +1454,7 @@ impl P2pConnManager { tx = %tx, remote = %peer_addr, callbacks = callbacks.len(), - courtesy, + transient, "Cleaning up callbacks after connect command failure" ); for mut cb in callbacks { @@ -1404,7 +1481,7 @@ impl P2pConnManager { tracing::debug!( tx = %tx, remote = %peer_addr, - courtesy, + transient, "connect_peer: handshake command dispatched" ); } @@ -1423,16 +1500,17 @@ impl P2pConnManager { transaction, peer, connection, - courtesy, + transient, } => { - let conn_manager = &self.bridge.op_manager.ring.connection_manager; + tracing::info!(provided = ?peer, transient = transient, tx = ?transaction, "InboundConnection event"); + let _conn_manager = &self.bridge.op_manager.ring.connection_manager; let remote_addr = connection.remote_addr(); if let Some(blocked_addrs) = &self.blocked_addresses { if blocked_addrs.contains(&remote_addr) { tracing::info!( remote = %remote_addr, - courtesy, + transient = transient, transaction = ?transaction, "Inbound connection blocked by local policy" ); @@ -1440,11 +1518,11 @@ impl P2pConnManager { } } - let provided_peer = peer.clone(); + let _provided_peer = peer.clone(); let peer_id = peer.unwrap_or_else(|| { tracing::info!( remote = %remote_addr, - courtesy, + transient = transient, transaction = ?transaction, "Inbound connection arrived without matching expectation; accepting provisionally" ); @@ -1462,13 +1540,14 @@ impl P2pConnManager { tracing::info!( remote = %peer_id.addr, - courtesy, + transient = transient, transaction = ?transaction, "Inbound connection established" ); - let is_transient = - conn_manager.is_gateway() && provided_peer.is_none() && transaction.is_none(); + // Treat only transient connections as transient. Normal inbound dials (including + // gateway bootstrap from peers) should be promoted into the ring once established. + let is_transient = transient; self.handle_successful_connection(peer_id, connection, state, None, is_transient) .await?; @@ -1477,11 +1556,11 @@ impl P2pConnManager { transaction, peer, connection, - courtesy, + transient, } => { tracing::info!( remote = %peer.addr, - courtesy, + transient = transient, transaction = %transaction, "Outbound connection established" ); @@ -1492,11 +1571,11 @@ impl P2pConnManager { transaction, peer, error, - courtesy, + transient, } => { tracing::info!( remote = %peer.addr, - courtesy, + transient = transient, transaction = %transaction, ?error, "Outbound connection failed" @@ -1518,7 +1597,7 @@ impl P2pConnManager { remote = %peer.addr, callbacks = callbacks.len(), pending_txs = ?pending_txs, - courtesy, + transient, "Notifying callbacks after outbound failure" ); @@ -1610,9 +1689,14 @@ impl P2pConnManager { current = connection_manager.transient_count(), "Transient connection budget exhausted; dropping inbound connection" ); + if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) { + for mut cb in callbacks { + let _ = cb.send_result(Err(())).await; + } + } + state.awaiting_connection_txs.remove(&peer_id.addr); return Ok(()); } - let pending_txs = state .awaiting_connection_txs .remove(&peer_id.addr) @@ -1675,6 +1759,19 @@ impl P2pConnManager { // Only insert if connection doesn't already exist to avoid dropping existing channel let mut newly_inserted = false; if !self.connections.contains_key(&peer_id) { + if is_transient { + let cm = &self.bridge.op_manager.ring.connection_manager; + let current = cm.transient_count(); + if current >= cm.transient_budget() { + tracing::warn!( + remote = %peer_id.addr, + budget = cm.transient_budget(), + current, + "Transient connection budget exhausted; dropping inbound connection before insert" + ); + return Ok(()); + } + } let (tx, rx) = mpsc::channel(10); tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] INSERT: OutboundConnectionSuccessful - adding to connections HashMap"); self.connections.insert(peer_id.clone(), tx); @@ -1691,13 +1788,36 @@ impl P2pConnManager { } if newly_inserted { + tracing::info!(remote = %peer_id, is_transient, "handle_successful_connection: inserted new connection entry"); let pending_loc = connection_manager.prune_in_transit_connection(&peer_id); if !is_transient { let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr)); + // Re-apply admission logic on promotion to avoid bypassing capacity/heuristic checks. + let should_accept = connection_manager.should_accept(loc, &peer_id); + if !should_accept { + tracing::warn!( + %peer_id, + %loc, + "handle_successful_connection: promotion rejected by admission logic" + ); + return Ok(()); + } + let current = connection_manager.connection_count(); + if current >= connection_manager.max_connections { + tracing::warn!( + %peer_id, + current_connections = current, + max_connections = connection_manager.max_connections, + %loc, + "handle_successful_connection: rejecting new connection to enforce cap" + ); + return Ok(()); + } + tracing::info!(remote = %peer_id, %loc, "handle_successful_connection: promoting connection into ring"); self.bridge .op_manager .ring - .add_connection(loc, peer_id.clone(), false) + .add_connection(loc, peer_id.clone(), true) .await; } else { // Update location now that we know it; budget was reserved before any work. @@ -1711,14 +1831,18 @@ impl P2pConnManager { let cm = connection_manager.clone(); let peer = peer_id.clone(); tokio::spawn(async move { - tokio::time::sleep(ttl).await; + sleep(ttl).await; if cm.drop_transient(&peer).is_some() { tracing::info!(%peer, "Transient connection expired; dropping"); if let Err(err) = drop_tx .send(Right(NodeEvent::DropConnection(peer.clone()))) .await { - tracing::warn!(%peer, ?err, "Failed to dispatch DropConnection for expired transient"); + tracing::warn!( + %peer, + ?err, + "Failed to dispatch DropConnection for expired transient" + ); } } }); diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index f0d055715..4cf93d79b 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -86,9 +86,9 @@ impl fmt::Display for ConnectMsg { ), ConnectMsg::Response { sender, target, payload, .. } => write!( f, - "ConnectResponse {{ sender: {sender}, target: {target}, acceptor: {}, courtesy: {} }}", + "ConnectResponse {{ sender: {sender}, target: {target}, acceptor: {}, transient: {} }}", payload.acceptor, - payload.courtesy + payload.transient ), ConnectMsg::ObservedAddress { target, address, .. } => { write!(f, "ObservedAddress {{ target: {target}, address: {address} }}") @@ -126,8 +126,8 @@ pub(crate) struct ConnectRequest { pub(crate) struct ConnectResponse { /// The peer that accepted the join request. pub acceptor: PeerKeyLocation, - /// Whether this acceptance is a short-lived courtesy link. - pub courtesy: bool, + /// Whether this acceptance is a short-lived transient link. + pub transient: bool, } /// New minimal state machine the joiner tracks. @@ -154,7 +154,7 @@ pub(crate) struct RelayState { pub upstream: PeerKeyLocation, pub request: ConnectRequest, pub forwarded_to: Option, - pub courtesy_hint: bool, + pub transient_hint: bool, pub observed_sent: bool, pub accepted_locally: bool, } @@ -175,8 +175,8 @@ pub(crate) trait RelayContext { visited: &[PeerKeyLocation], ) -> Option; - /// Whether the acceptance should be treated as a short-lived courtesy link. - fn courtesy_hint(&self, acceptor: &PeerKeyLocation, joiner: &PeerKeyLocation) -> bool; + /// Whether the acceptance should be treated as a short-lived transient link. + fn transient_hint(&self, acceptor: &PeerKeyLocation, joiner: &PeerKeyLocation) -> bool; } /// Result of processing a request at a relay. @@ -215,22 +215,35 @@ impl RelayState { if !self.accepted_locally && ctx.should_accept(&self.request.joiner) { self.accepted_locally = true; let acceptor = ctx.self_location().clone(); - let courtesy = ctx.courtesy_hint(&acceptor, &self.request.joiner); - self.courtesy_hint = courtesy; + let dist = ring_distance(acceptor.location, self.request.joiner.location); + let transient = ctx.transient_hint(&acceptor, &self.request.joiner); + self.transient_hint = transient; actions.accept_response = Some(ConnectResponse { acceptor: acceptor.clone(), - courtesy, + transient, }); actions.expect_connection_from = Some(self.request.joiner.clone()); + tracing::info!( + acceptor_peer = %acceptor.peer, + joiner_peer = %self.request.joiner.peer, + acceptor_loc = ?acceptor.location, + joiner_loc = ?self.request.joiner.location, + ring_distance = ?dist, + transient, + "connect: acceptance issued" + ); } if self.forwarded_to.is_none() && self.request.ttl > 0 { match ctx.select_next_hop(self.request.desired_location, &self.request.visited) { Some(next) => { - tracing::debug!( + let dist = ring_distance(next.location, Some(self.request.desired_location)); + tracing::info!( target = %self.request.desired_location, ttl = self.request.ttl, next_peer = %next.peer, + next_loc = ?next.location, + ring_distance_to_target = ?dist, "connect: forwarding join request to next hop" ); let mut forward_req = self.request.clone(); @@ -242,7 +255,7 @@ impl RelayState { actions.forward = Some((next, forward_snapshot)); } None => { - tracing::debug!( + tracing::info!( target = %self.request.desired_location, ttl = self.request.ttl, visited = ?self.request.visited, @@ -299,10 +312,10 @@ impl RelayContext for RelayEnv<'_> { .routing(desired_location, None, skip, &router) } - fn courtesy_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool { + fn transient_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool { // Courtesy slots still piggyback on regular connections. Flag the first acceptance so the - // joiner can prioritise it, and keep the logic simple until dedicated courtesy tracking - // is wired in (see courtesy-connection-budget branch). + // joiner can prioritise it, and keep the logic simple until dedicated transient tracking + // is wired in (see transient-connection-budget branch). self.op_manager.ring.open_connections() == 0 } } @@ -310,7 +323,7 @@ impl RelayContext for RelayEnv<'_> { #[derive(Debug)] pub struct AcceptedPeer { pub peer: PeerKeyLocation, - pub courtesy: bool, + pub transient: bool, } #[derive(Debug, Default)] @@ -331,7 +344,7 @@ impl JoinerState { self.last_progress = now; acceptance.new_acceptor = Some(AcceptedPeer { peer: response.acceptor.clone(), - courtesy: response.courtesy, + transient: response.transient, }); acceptance.assigned_location = self.accepted.len() == 1; } @@ -391,7 +404,7 @@ impl ConnectOp { upstream, request, forwarded_to: None, - courtesy_hint: false, + transient_hint: false, observed_sent: false, accepted_locally: false, })); @@ -507,7 +520,7 @@ impl ConnectOp { upstream: upstream.clone(), request: request.clone(), forwarded_to: None, - courtesy_hint: false, + transient_hint: false, observed_sent: false, accepted_locally: false, }))); @@ -666,7 +679,7 @@ impl Operation for ConnectOp { peer: new_acceptor.peer.peer.clone(), tx: self.id, callback, - is_gw: new_acceptor.courtesy, + is_gw: new_acceptor.transient, }) .await?; @@ -768,6 +781,13 @@ fn store_operation_state_with_msg(op: &mut ConnectOp, msg: Option) - } } +fn ring_distance(a: Option, b: Option) -> Option { + match (a, b) { + (Some(a), Some(b)) => Some(a.distance(b).as_f64()), + _ => None, + } +} + #[tracing::instrument(fields(peer = %op_manager.ring.connection_manager.pub_key), skip_all)] pub(crate) async fn join_ring_request( backoff: Option, @@ -960,7 +980,7 @@ mod tests { self_loc: PeerKeyLocation, accept: bool, next_hop: Option, - courtesy: bool, + transient: bool, } impl TestRelayContext { @@ -969,7 +989,7 @@ mod tests { self_loc, accept: true, next_hop: None, - courtesy: false, + transient: false, } } @@ -983,8 +1003,8 @@ mod tests { self } - fn courtesy(mut self, courtesy: bool) -> Self { - self.courtesy = courtesy; + fn transient(mut self, transient: bool) -> Self { + self.transient = transient; self } } @@ -1006,8 +1026,8 @@ mod tests { self.next_hop.clone() } - fn courtesy_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool { - self.courtesy + fn transient_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool { + self.transient } } @@ -1034,17 +1054,17 @@ mod tests { observed_addr: Some(joiner.peer.addr), }, forwarded_to: None, - courtesy_hint: false, + transient_hint: false, observed_sent: false, accepted_locally: false, }; - let ctx = TestRelayContext::new(self_loc.clone()).courtesy(true); + let ctx = TestRelayContext::new(self_loc.clone()).transient(true); let actions = state.handle_request(&ctx, &joiner); let response = actions.accept_response.expect("expected acceptance"); assert_eq!(response.acceptor.peer, self_loc.peer); - assert!(response.courtesy); + assert!(response.transient); assert_eq!(actions.expect_connection_from.unwrap().peer, joiner.peer); assert!(actions.forward.is_none()); } @@ -1064,7 +1084,7 @@ mod tests { observed_addr: Some(joiner.peer.addr), }, forwarded_to: None, - courtesy_hint: false, + transient_hint: false, observed_sent: false, accepted_locally: false, }; @@ -1099,7 +1119,7 @@ mod tests { observed_addr: Some(observed_addr), }, forwarded_to: None, - courtesy_hint: false, + transient_hint: false, observed_sent: false, accepted_locally: false, }; @@ -1127,13 +1147,13 @@ mod tests { let response = ConnectResponse { acceptor: acceptor.clone(), - courtesy: false, + transient: false, }; let result = state.register_acceptance(&response, Instant::now()); assert!(result.satisfied); let new = result.new_acceptor.expect("expected new acceptor"); assert_eq!(new.peer.peer, acceptor.peer); - assert!(!new.courtesy); + assert!(!new.transient); } #[test] diff --git a/crates/core/src/ring/connection.rs b/crates/core/src/ring/connection.rs index 2629886d0..270d3c9e9 100644 --- a/crates/core/src/ring/connection.rs +++ b/crates/core/src/ring/connection.rs @@ -1,8 +1,6 @@ use super::PeerKeyLocation; -use std::time::Instant; #[derive(Clone, Debug)] pub struct Connection { pub(crate) location: PeerKeyLocation, - pub(crate) open_at: Instant, } diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 5e3f19240..214f9715f 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -2,24 +2,27 @@ use dashmap::DashMap; use parking_lot::Mutex; use rand::prelude::IndexedRandom; use std::collections::{btree_map::Entry, BTreeMap}; +use std::net::SocketAddr; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::{Duration, Instant}; use crate::topology::{Limits, TopologyManager}; use super::*; -use std::time::{Duration, Instant}; #[derive(Clone)] pub(crate) struct TransientEntry { + /// Entry tracking a transient connection that hasn't been added to the ring topology yet. + /// Transient connections are typically unsolicited inbound connections to gateways. #[allow(dead_code)] pub opened_at: Instant, + /// Advertised location for the transient peer, if known at admission time. pub location: Option, } #[derive(Clone)] pub(crate) struct ConnectionManager { - open_connections: Arc, - reserved_connections: Arc, + pending_reservations: Arc>>, pub(super) location_for_peer: Arc>>, pub(super) topology_manager: Arc>, connections_by_location: Arc>>>, @@ -120,8 +123,7 @@ impl ConnectionManager { Self { connections_by_location: Arc::new(RwLock::new(BTreeMap::new())), location_for_peer: Arc::new(RwLock::new(BTreeMap::new())), - open_connections: Arc::new(AtomicUsize::new(0)), - reserved_connections: Arc::new(AtomicUsize::new(0)), + pending_reservations: Arc::new(RwLock::new(BTreeMap::new())), topology_manager, own_location: own_location.into(), peer_key: Arc::new(Mutex::new(peer_id)), @@ -144,12 +146,8 @@ impl ConnectionManager { /// Will panic if the node checking for this condition has no location assigned. pub fn should_accept(&self, location: Location, peer_id: &PeerId) -> bool { tracing::info!("Checking if should accept connection"); - let open = self - .open_connections - .load(std::sync::atomic::Ordering::SeqCst); - let reserved_before = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst); + let open = self.connection_count(); + let reserved_before = self.pending_reservations.read().len(); tracing::info!( %peer_id, @@ -171,34 +169,16 @@ impl ConnectionManager { ); } - let reserved_before = loop { - let current = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst); - if current == usize::MAX { - tracing::error!( - %peer_id, - "reserved connection counter overflowed; rejecting new connection" - ); - return false; - } - match self.reserved_connections.compare_exchange( - current, - current + 1, - std::sync::atomic::Ordering::SeqCst, - std::sync::atomic::Ordering::SeqCst, - ) { - Ok(_) => break current, - Err(actual) => { - tracing::debug!( - %peer_id, - expected = current, - actual, - "reserved connection counter changed concurrently; retrying" - ); - } - } - }; + if self.location_for_peer.read().get(peer_id).is_some() { + // We've already accepted this peer (pending or active); treat as a no-op acceptance. + tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); + return true; + } + + { + let mut pending = self.pending_reservations.write(); + pending.insert(peer_id.clone(), location); + } let total_conn = match reserved_before .checked_add(1) @@ -212,8 +192,7 @@ impl ConnectionManager { open, "connection counters would overflow; rejecting connection" ); - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.pending_reservations.write().remove(peer_id); return false; } }; @@ -234,19 +213,12 @@ impl ConnectionManager { limit = GATEWAY_DIRECT_ACCEPT_LIMIT, "Gateway reached direct-accept limit; forwarding join request instead" ); - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.pending_reservations.write().remove(peer_id); tracing::info!(%peer_id, "should_accept: gateway direct-accept limit hit, forwarding instead"); return false; } } - if self.location_for_peer.read().get(peer_id).is_some() { - // We've already accepted this peer (pending or active); treat as a no-op acceptance. - tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); - return true; - } - let accepted = if total_conn < self.min_connections { tracing::info!(%peer_id, total_conn, "should_accept: accepted (below min connections)"); true @@ -273,14 +245,13 @@ impl ConnectionManager { accepted, total_conn, open_connections = open, - reserved_connections = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst), + reserved_connections = self.pending_reservations.read().len(), + max_connections = self.max_connections, + min_connections = self.min_connections, "should_accept: final decision" ); if !accepted { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.pending_reservations.write().remove(peer_id); } else { tracing::info!(%peer_id, total_conn, "should_accept: accepted (reserving spot)"); self.record_pending_location(peer_id, location); @@ -354,6 +325,7 @@ impl ConnectionManager { self.peer_key.lock().clone() } + #[allow(dead_code)] pub fn is_gateway(&self) -> bool { self.is_gateway } @@ -405,19 +377,22 @@ impl ConnectionManager { removed } - #[allow(dead_code)] + /// Check whether a peer is currently tracked as transient. pub fn is_transient(&self, peer: &PeerId) -> bool { self.transient_connections.contains_key(peer) } + /// Current number of tracked transient connections. pub fn transient_count(&self) -> usize { self.transient_in_use.load(Ordering::Acquire) } + /// Maximum transient slots allowed. pub fn transient_budget(&self) -> usize { self.transient_budget } + /// Time-to-live for transients before automatic drop. pub fn transient_ttl(&self) -> Duration { self.transient_ttl } @@ -445,20 +420,46 @@ impl ConnectionManager { tracing::info!(%peer, %loc, %was_reserved, "Adding connection to topology"); debug_assert!(self.get_peer_key().expect("should be set") != peer); if was_reserved { - let old = self - .reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - #[cfg(debug_assertions)] - { - tracing::debug!(old, "Decremented reserved connections"); - if old == 0 { - panic!("Underflow of reserved connections"); + self.pending_reservations.write().remove(&peer); + } + let mut lop = self.location_for_peer.write(); + let previous_location = lop.insert(peer.clone(), loc); + drop(lop); + + // Enforce the global cap when adding a new peer (not a relocation). + if previous_location.is_none() && self.connection_count() >= self.max_connections { + tracing::warn!( + %peer, + %loc, + max = self.max_connections, + "add_connection: rejecting new connection to enforce cap" + ); + // Roll back bookkeeping since we're refusing the connection. + self.location_for_peer.write().remove(&peer); + if was_reserved { + self.pending_reservations.write().remove(&peer); + } + return; + } + + if let Some(prev_loc) = previous_location { + tracing::info!( + %peer, + %prev_loc, + %loc, + "add_connection: replacing existing connection for peer" + ); + let mut cbl = self.connections_by_location.write(); + if let Some(prev_list) = cbl.get_mut(&prev_loc) { + if let Some(pos) = prev_list.iter().position(|c| c.location.peer == peer) { + prev_list.swap_remove(pos); + } + if prev_list.is_empty() { + cbl.remove(&prev_loc); } } - let _ = old; } - let mut lop = self.location_for_peer.write(); - lop.insert(peer.clone(), loc); + { let mut cbl = self.connections_by_location.write(); cbl.entry(loc).or_default().push(Connection { @@ -466,12 +467,8 @@ impl ConnectionManager { peer: peer.clone(), location: Some(loc), }, - open_at: Instant::now(), }); } - self.open_connections - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - std::mem::drop(lop); } pub fn update_peer_identity(&self, old_peer: &PeerId, new_peer: PeerId) -> bool { @@ -511,7 +508,6 @@ impl ConnectionManager { peer: new_peer, location: Some(loc), }, - open_at: Instant::now(), }); } @@ -529,8 +525,13 @@ impl ConnectionManager { tracing::debug!("no location found for peer, skip pruning"); return None; } else { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + let removed = self.pending_reservations.write().remove(peer).is_some(); + if !removed { + tracing::warn!( + %peer, + "prune_connection: no pending reservation to release for in-transit peer" + ); + } } return None; }; @@ -542,23 +543,23 @@ impl ConnectionManager { } } - if is_alive { - self.open_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - } else { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + if !is_alive { + self.pending_reservations.write().remove(peer); } Some(loc) } - pub(super) fn get_open_connections(&self) -> usize { - self.open_connections - .load(std::sync::atomic::Ordering::SeqCst) + pub(crate) fn connection_count(&self) -> usize { + // Count only established connections tracked by location buckets. + self.connections_by_location + .read() + .values() + .map(|conns| conns.len()) + .sum() } - pub(super) fn get_connections_by_location(&self) -> BTreeMap> { + pub(crate) fn get_connections_by_location(&self) -> BTreeMap> { self.connections_by_location.read().clone() } @@ -575,8 +576,20 @@ impl ConnectionManager { router: &Router, ) -> Option { let connections = self.connections_by_location.read(); + tracing::debug!( + total_locations = connections.len(), + self_peer = self + .get_peer_key() + .as_ref() + .map(|id| id.to_string()) + .unwrap_or_else(|| "unknown".into()), + "routing: considering connections" + ); let peers = connections.values().filter_map(|conns| { let conn = conns.choose(&mut rand::rng())?; + if self.is_transient(&conn.location.peer) { + return None; + } if let Some(requester) = requesting { if requester == &conn.location.peer { return None; diff --git a/crates/core/src/ring/live_tx.rs b/crates/core/src/ring/live_tx.rs index 2a0988a1e..d9750683f 100644 --- a/crates/core/src/ring/live_tx.rs +++ b/crates/core/src/ring/live_tx.rs @@ -45,4 +45,8 @@ impl LiveTransactionTracker { pub(crate) fn still_alive(&self, tx: &Transaction) -> bool { self.tx_per_peer.iter().any(|e| e.value().contains(tx)) } + + pub(crate) fn len(&self) -> usize { + self.tx_per_peer.len() + } } diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index 85a875bdd..210b16e39 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -3,8 +3,7 @@ //! Mainly maintains a healthy and optimal pool of connections to other peers in the network //! and routes requests to the optimal peers. -use std::collections::{BTreeSet, HashSet}; -use std::net::SocketAddr; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::{ sync::{atomic::AtomicU64, Arc, Weak}, time::{Duration, Instant}, @@ -158,7 +157,7 @@ impl Ring { } pub fn open_connections(&self) -> usize { - self.connection_manager.get_open_connections() + self.connection_manager.connection_count() } async fn refresh_router(router: Arc>, register: ER) { @@ -384,11 +383,6 @@ impl Ring { tracing::info!("Initializing connection maintenance task"); let is_gateway = self.is_gateway; tracing::info!(is_gateway, "Connection maintenance task starting"); - #[cfg(not(test))] - const CONNECTION_AGE_THRESOLD: Duration = Duration::from_secs(60 * 5); - #[cfg(test)] - const CONNECTION_AGE_THRESOLD: Duration = Duration::from_secs(5); - #[cfg(not(test))] const CHECK_TICK_DURATION: Duration = Duration::from_secs(60); #[cfg(test)] @@ -459,7 +453,7 @@ impl Ring { error })?; if live_tx.is_none() { - let conns = self.connection_manager.get_open_connections(); + let conns = self.connection_manager.connection_count(); tracing::warn!( "acquire_new returned None - likely no peers to query through (connections: {})", conns @@ -476,30 +470,50 @@ impl Ring { } } - let current_connections = self.connection_manager.get_open_connections(); + let current_connections = self.connection_manager.connection_count(); let pending_connection_targets = pending_conn_adds.len(); - let neighbor_locations = { - let peers = self.connection_manager.get_connections_by_location(); - tracing::debug!( - "Maintenance task: current connections = {}, checking topology", - current_connections + let peers = self.connection_manager.get_connections_by_location(); + let connections_considered: usize = peers.values().map(|c| c.len()).sum(); + + let mut neighbor_locations: BTreeMap<_, Vec<_>> = peers + .iter() + .map(|(loc, conns)| { + let conns: Vec<_> = conns + .iter() + .filter(|conn| !live_tx_tracker.has_live_connection(&conn.location.peer)) + .cloned() + .collect(); + (*loc, conns) + }) + .filter(|(_, conns)| !conns.is_empty()) + .collect(); + + if neighbor_locations.is_empty() && connections_considered > 0 { + tracing::warn!( + current_connections, + connections_considered, + live_tx_peers = live_tx_tracker.len(), + "Neighbor filtering removed all candidates; using all connections" ); - peers + + neighbor_locations = peers .iter() - .map(|(loc, conns)| { - let conns: Vec<_> = conns - .iter() - .filter(|conn| { - conn.open_at.elapsed() > CONNECTION_AGE_THRESOLD - && !live_tx_tracker.has_live_connection(&conn.location.peer) - }) - .cloned() - .collect(); - (*loc, conns) - }) + .map(|(loc, conns)| (*loc, conns.clone())) .filter(|(_, conns)| !conns.is_empty()) - .collect() - }; + .collect(); + } + + if current_connections > self.connection_manager.max_connections { + // When over capacity, consider all connections for removal regardless of live_tx filter. + neighbor_locations = peers.clone(); + } + + tracing::debug!( + "Maintenance task: current connections = {}, candidates = {}, live_tx_peers = {}", + current_connections, + peers.len(), + live_tx_tracker.len() + ); let adjustment = self .connection_manager @@ -588,7 +602,7 @@ impl Ring { live_tx_tracker: &LiveTransactionTracker, op_manager: &Arc, ) -> anyhow::Result> { - let current_connections = self.connection_manager.get_open_connections(); + let current_connections = self.connection_manager.connection_count(); let is_gateway = self.is_gateway; tracing::info!( @@ -604,13 +618,18 @@ impl Ring { %ideal_location, num_connections, skip_list_size = skip_list.len(), + self_peer = %self.connection_manager.get_peer_key().as_ref().map(|id| id.to_string()).unwrap_or_else(|| "unknown".into()), "Looking for peer to route through" ); if let Some(target) = self.connection_manager .routing(ideal_location, None, skip_list, &router) { - tracing::debug!(query_target = %target, "Found routing target"); + tracing::info!( + query_target = %target, + %ideal_location, + "connection_maintenance selected routing target" + ); target } else { tracing::warn!( diff --git a/crates/core/src/topology/mod.rs b/crates/core/src/topology/mod.rs index 6f7f29b55..447302010 100644 --- a/crates/core/src/topology/mod.rs +++ b/crates/core/src/topology/mod.rs @@ -9,7 +9,7 @@ use std::{ collections::{BTreeMap, HashMap}, time::Instant, }; -use tracing::{debug, error, event, info, span, Level}; +use tracing::{debug, error, event, info, span, warn, Level}; pub mod connection_evaluator; mod constants; @@ -449,6 +449,28 @@ impl TopologyManager { } } + if current_connections > self.limits.max_connections { + let mut adj = adjustment.unwrap_or(TopologyAdjustment::NoChange); + if matches!(adj, TopologyAdjustment::NoChange) { + if let Some(peer) = select_fallback_peer_to_drop(neighbor_locations, my_location) { + info!( + current_connections, + max_allowed = self.limits.max_connections, + %peer.peer, + "Enforcing max-connections cap via fallback removal" + ); + adj = TopologyAdjustment::RemoveConnections(vec![peer]); + } else { + warn!( + current_connections, + max_allowed = self.limits.max_connections, + "Over capacity but no removable peer found; leaving topology unchanged" + ); + } + } + return adj; + } + adjustment.unwrap_or(TopologyAdjustment::NoChange) } @@ -480,6 +502,11 @@ impl TopologyManager { &mut self, neighbor_locations: &BTreeMap>, ) -> anyhow::Result { + if neighbor_locations.is_empty() { + tracing::warn!("select_connections_to_add: neighbor map empty; skipping adjustment"); + return Ok(TopologyAdjustment::NoChange); + } + let function_span = span!(Level::INFO, "add_connections"); let _enter = function_span.enter(); @@ -579,6 +606,30 @@ impl TopologyManager { } } +fn select_fallback_peer_to_drop( + neighbor_locations: &BTreeMap>, + my_location: &Option, +) -> Option { + let mut candidate: Option<(PeerKeyLocation, f64)> = None; + for (loc, conns) in neighbor_locations.iter() { + for conn in conns { + let score = match my_location { + Some(me) => me.distance(*loc).as_f64(), + None => 0.0, + }; + if let Some((_, best_score)) = &mut candidate { + if score > *best_score { + *best_score = score; + candidate = Some((conn.location.clone(), score)); + } + } else { + candidate = Some((conn.location.clone(), score)); + } + } + } + candidate.map(|(peer, _)| peer) +} + #[derive(PartialEq, Debug, Clone, Copy)] pub(crate) enum ConnectionAcquisitionStrategy { /// Acquire new connections slowly, be picky diff --git a/crates/core/src/transport/connection_handler.rs b/crates/core/src/transport/connection_handler.rs index c9aa84132..7b26e75fd 100644 --- a/crates/core/src/transport/connection_handler.rs +++ b/crates/core/src/transport/connection_handler.rs @@ -416,46 +416,41 @@ impl UdpPacketsListener { continue; } - if !self.is_gateway { - let allow = self.expected_non_gateway.contains(&remote_addr.ip()); - let gateway_allow = self - .known_gateway_addrs - .as_ref() - .map(|set| set.contains(&remote_addr)) - .unwrap_or(false); - if !allow && gateway_allow { - tracing::debug!( - %remote_addr, - "allowing inbound handshake from known gateway without prior expectation" - ); - } - if !allow && !gateway_allow { - tracing::warn!( - %remote_addr, - %size, - "unexpected packet from non-gateway node; dropping intro packet" - ); - self.expected_non_gateway.insert(remote_addr.ip()); + let is_known_gateway = self + .known_gateway_addrs + .as_ref() + .map(|set| set.contains(&remote_addr)) + .unwrap_or(false); + + if self.is_gateway || is_known_gateway { + // Handle gateway-intro packets (peer -> gateway) + + // Check if we already have a gateway connection in progress + if ongoing_gw_connections.contains_key(&remote_addr) { + tracing::debug!(%remote_addr, "gateway connection already in progress, ignoring duplicate packet"); continue; } - } - // Check if we already have a gateway connection in progress - if ongoing_gw_connections.contains_key(&remote_addr) { - tracing::debug!(%remote_addr, "gateway connection already in progress, ignoring duplicate packet"); + let inbound_key_bytes = key_from_addr(&remote_addr); + let (gw_ongoing_connection, packets_sender) = self.gateway_connection(packet_data, remote_addr, inbound_key_bytes); + let task = tokio::spawn(gw_ongoing_connection + .instrument(tracing::span!(tracing::Level::DEBUG, "gateway_connection")) + .map_err(move |error| { + tracing::warn!(%remote_addr, %error, "gateway connection error"); + (error, remote_addr) + })); + ongoing_gw_connections.insert(remote_addr, packets_sender); + gw_connection_tasks.push(task); + continue; + } else { + // Non-gateway peers: mark as expected and wait for the normal peer handshake flow. + self.expected_non_gateway.insert(remote_addr.ip()); + tracing::debug!( + %remote_addr, + "unexpected peer intro; marking expected_non_gateway" + ); continue; } - - let inbound_key_bytes = key_from_addr(&remote_addr); - let (gw_ongoing_connection, packets_sender) = self.gateway_connection(packet_data, remote_addr, inbound_key_bytes); - let task = tokio::spawn(gw_ongoing_connection - .instrument(tracing::span!(tracing::Level::DEBUG, "gateway_connection")) - .map_err(move |error| { - tracing::warn!(%remote_addr, %error, "gateway connection error"); - (error, remote_addr) - })); - ongoing_gw_connections.insert(remote_addr, packets_sender); - gw_connection_tasks.push(task); } Err(e) => { tracing::error!("Failed to receive UDP packet: {:?}", e); diff --git a/crates/core/src/transport/crypto.rs b/crates/core/src/transport/crypto.rs index 79cb3673b..342d3791e 100644 --- a/crates/core/src/transport/crypto.rs +++ b/crates/core/src/transport/crypto.rs @@ -6,6 +6,7 @@ use rsa::{ Pkcs1v15Encrypt, RsaPrivateKey, RsaPublicKey, }; use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct TransportKeypair { @@ -112,15 +113,8 @@ impl std::fmt::Display for TransportPublicKey { use pkcs8::EncodePublicKey; let encoded = self.0.to_public_key_der().map_err(|_| std::fmt::Error)?; - if encoded.as_bytes().len() >= 16 { - let bytes = encoded.as_bytes(); - let first_six = &bytes[..6]; - let last_six = &bytes[bytes.len() - 6..]; - let to_encode = [first_six, last_six].concat(); - write!(f, "{}", bs58::encode(to_encode).into_string()) - } else { - write!(f, "{}", bs58::encode(encoded.as_bytes()).into_string()) - } + let digest = Sha256::digest(encoded.as_bytes()); + write!(f, "{}", bs58::encode(digest).into_string()) } } diff --git a/crates/core/tests/connection_cap.rs b/crates/core/tests/connection_cap.rs new file mode 100644 index 000000000..82186c451 --- /dev/null +++ b/crates/core/tests/connection_cap.rs @@ -0,0 +1,38 @@ +//! Minimal repro harness for connection-cap enforcement. +//! +//! This test spins up a tiny network (2 gateways + 6 peers) with a low max-connections +//! setting (min=5, max=6) and waits for connectivity. It then inspects diagnostics to +//! ensure no peer reports more than `max` connections. This is intended to quickly catch +//! admission/cap bypass regressions without running the full soak. + +use freenet_test_network::{BuildProfile, FreenetBinary, NetworkBuilder}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn connection_cap_respected() -> anyhow::Result<()> { + let max_connections = 6usize; + let net = NetworkBuilder::new() + .gateways(2) + .peers(6) + .min_connections(4) + .max_connections(max_connections) + .start_stagger(std::time::Duration::from_millis(300)) + .require_connectivity(0.9) + .connectivity_timeout(std::time::Duration::from_secs(40)) + .binary(FreenetBinary::CurrentCrate(BuildProfile::Debug)) + .build() + .await?; + + let snapshot = net.collect_diagnostics().await?; + for peer in snapshot.peers { + let count = peer.connected_peer_ids.len(); + assert!( + count <= max_connections, + "peer {} exceeds max connections ({} > {})", + peer.peer_id, + count, + max_connections + ); + } + + Ok(()) +} diff --git a/crates/core/tests/error_notification.rs b/crates/core/tests/error_notification.rs index 7f59a642c..392a78019 100644 --- a/crates/core/tests/error_notification.rs +++ b/crates/core/tests/error_notification.rs @@ -298,6 +298,8 @@ async fn test_connection_drop_error_notification() -> anyhow::Result<()> { ignore_protocol_checking: true, address: Some(Ipv4Addr::LOCALHOST.into()), network_port: Some(gateway_port), + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses: None, transient_budget: None, @@ -349,6 +351,8 @@ async fn test_connection_drop_error_notification() -> anyhow::Result<()> { ignore_protocol_checking: true, address: Some(Ipv4Addr::LOCALHOST.into()), network_port: None, + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses: None, transient_budget: None, diff --git a/crates/core/tests/large_network.rs b/crates/core/tests/large_network.rs new file mode 100644 index 000000000..53bf5eb46 --- /dev/null +++ b/crates/core/tests/large_network.rs @@ -0,0 +1,327 @@ +//! Large-scale soak test using `freenet-test-network`. +//! +//! This test intentionally spins up a sizable network (2 gateways + N peers) and exercises the +//! cluster for several minutes while capturing diagnostics snapshots and running River client +//! workflows via `riverctl`. +//! +//! ## Running Manually +//! ```text +//! cargo test -p freenet --test large_network -- --ignored --nocapture +//! ``` +//! Environment overrides: +//! - `SOAK_PEER_COUNT` – number of non-gateway peers (default: 38). +//! - `SOAK_SNAPSHOT_INTERVAL_SECS` – seconds between diagnostics snapshots (default: 60). +//! - `SOAK_SNAPSHOT_ITERATIONS` – number of snapshots to capture (default: 5). +//! - `SOAK_CONNECTIVITY_TARGET` – minimum ratio of peers that must report >=1 connection (default: 0.75). +//! +//! Requirements: +//! - `riverctl` must be installed and in PATH (`cargo install riverctl`). +//! - Enough CPU/RAM to host ~40 peers locally. +//! +//! The snapshots are stored under the network's `run_root()/large-soak/` directory for later +//! inspection or visualization. + +use anyhow::{anyhow, bail, ensure, Context}; +use freenet_test_network::{BuildProfile, FreenetBinary, TestNetwork}; +use regex::Regex; +use serde_json::to_string_pretty; +use std::{ + env, fs, + path::PathBuf, + time::{Duration, Instant}, +}; +use tempfile::TempDir; +use tokio::time::sleep; +use which::which; + +const DEFAULT_PEER_COUNT: usize = 38; +const DEFAULT_SNAPSHOT_INTERVAL: Duration = Duration::from_secs(60); +const DEFAULT_SNAPSHOT_ITERATIONS: usize = 5; +const DEFAULT_SNAPSHOT_WARMUP: Duration = Duration::from_secs(60); +const DEFAULT_CONNECTIVITY_TARGET: f64 = 0.75; +const DEFAULT_MIN_CONNECTIONS: usize = 5; +const DEFAULT_MAX_CONNECTIONS: usize = 7; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[ignore = "Large soak test - run manually (see file header for instructions)"] +async fn large_network_soak() -> anyhow::Result<()> { + let peer_count = env::var("SOAK_PEER_COUNT") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_PEER_COUNT); + let snapshot_interval = env::var("SOAK_SNAPSHOT_INTERVAL_SECS") + .ok() + .and_then(|val| val.parse().ok()) + .map(Duration::from_secs) + .unwrap_or(DEFAULT_SNAPSHOT_INTERVAL); + let snapshot_iterations = env::var("SOAK_SNAPSHOT_ITERATIONS") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_SNAPSHOT_ITERATIONS); + let connectivity_target = env::var("SOAK_CONNECTIVITY_TARGET") + .ok() + .and_then(|val| val.parse::().ok()) + .unwrap_or(DEFAULT_CONNECTIVITY_TARGET); + let snapshot_warmup = env::var("SOAK_SNAPSHOT_WARMUP_SECS") + .ok() + .and_then(|val| val.parse().ok()) + .map(Duration::from_secs) + .unwrap_or(DEFAULT_SNAPSHOT_WARMUP); + let min_connections = env::var("SOAK_MIN_CONNECTIONS") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_MIN_CONNECTIONS); + let max_connections = env::var("SOAK_MAX_CONNECTIONS") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_MAX_CONNECTIONS); + + let network = TestNetwork::builder() + .gateways(2) + .peers(peer_count) + .min_connections(min_connections) + .max_connections(max_connections) + .require_connectivity(connectivity_target) + .connectivity_timeout(Duration::from_secs(120)) + .preserve_temp_dirs_on_failure(true) + .preserve_temp_dirs_on_success(true) + .binary(FreenetBinary::CurrentCrate(BuildProfile::Debug)) + .build() + .await + .context("failed to start soak test network")?; + + println!( + "Started soak network with {} gateways and {} peers (run root: {})", + 2, + peer_count, + network.run_root().display() + ); + println!( + "Min connections: {}, max connections: {} (override via SOAK_MIN_CONNECTIONS / SOAK_MAX_CONNECTIONS)", + min_connections, max_connections + ); + + let riverctl_path = which("riverctl") + .context("riverctl not found in PATH; install via `cargo install riverctl`")?; + + let alice_url = format!("{}?encodingProtocol=native", network.gateway(0).ws_url()); + let bob_url = format!("{}?encodingProtocol=native", network.peer(0).ws_url()); + let session = RiverSession::initialize(riverctl_path, alice_url, bob_url).await?; + + let snapshots_dir = network.run_root().join("large-soak"); + fs::create_dir_all(&snapshots_dir)?; + + // Allow topology maintenance to run before the first snapshot. + println!( + "Waiting {:?} before first snapshot to allow topology maintenance to converge", + snapshot_warmup + ); + sleep(snapshot_warmup).await; + + let mut iteration = 0usize; + let mut next_tick = Instant::now(); + while iteration < snapshot_iterations { + iteration += 1; + let snapshot = network.collect_diagnostics().await?; + let snapshot_path = snapshots_dir.join(format!("snapshot-{iteration:02}.json")); + fs::write(&snapshot_path, to_string_pretty(&snapshot)?)?; + + // Also capture ring topology for visualizing evolution over time. + let ring_snapshot = network.ring_snapshot().await?; + let ring_path = snapshots_dir.join(format!("ring-{iteration:02}.json")); + fs::write(&ring_path, to_string_pretty(&ring_snapshot)?)?; + + let healthy = snapshot + .peers + .iter() + .filter(|peer| peer.error.is_none() && !peer.connected_peer_ids.is_empty()) + .count(); + let ratio = healthy as f64 / snapshot.peers.len().max(1) as f64; + println!( + "Snapshot {iteration}/{snapshot_iterations}: {:.1}% peers healthy ({} / {}), wrote {}", + ratio * 100.0, + healthy, + snapshot.peers.len(), + snapshot_path.display() + ); + ensure!( + ratio >= connectivity_target, + "Connectivity dropped below {:.0}% (actual: {:.1}%). Inspect {}", + connectivity_target * 100.0, + ratio * 100.0, + snapshot_path.display() + ); + + // Exercise River application flows to ensure contracts stay responsive. + session + .send_message( + RiverUser::Alice, + &format!("Large soak heartbeat {} from Alice", iteration), + ) + .await?; + session + .send_message( + RiverUser::Bob, + &format!("Large soak heartbeat {} from Bob", iteration), + ) + .await?; + session.list_messages(RiverUser::Alice).await?; + + next_tick += snapshot_interval; + let now = Instant::now(); + if next_tick > now { + sleep(next_tick - now).await; + } + } + + println!( + "Large network soak complete; inspect {} for diagnostics snapshots", + snapshots_dir.display() + ); + Ok(()) +} + +struct RiverSession { + riverctl: PathBuf, + alice_dir: TempDir, + bob_dir: TempDir, + alice_url: String, + bob_url: String, + room_key: String, + invite_regex: Regex, + room_regex: Regex, +} + +#[derive(Clone, Copy, Debug)] +enum RiverUser { + Alice, + Bob, +} + +impl RiverSession { + async fn initialize( + riverctl: PathBuf, + alice_url: String, + bob_url: String, + ) -> anyhow::Result { + let alice_dir = TempDir::new().context("failed to create Alice temp config dir")?; + let bob_dir = TempDir::new().context("failed to create Bob temp config dir")?; + + let mut session = Self { + riverctl, + alice_dir, + bob_dir, + alice_url, + bob_url, + room_key: String::new(), + invite_regex: Regex::new(r"[A-Za-z0-9+/=]{40,}").unwrap(), + room_regex: Regex::new(r"[A-Za-z0-9]{40,}").unwrap(), + }; + + session.setup_room().await?; + Ok(session) + } + + async fn setup_room(&mut self) -> anyhow::Result<()> { + let create_output = self + .run_riverctl( + RiverUser::Alice, + &[ + "room", + "create", + "--name", + "large-network-soak", + "--nickname", + "Alice", + ], + ) + .await?; + self.room_key = self + .room_regex + .find(&create_output) + .map(|m| m.as_str().to_string()) + .ok_or_else(|| anyhow!("failed to parse room owner key from riverctl output"))?; + + let invite_output = self + .run_riverctl( + RiverUser::Alice, + &["invite", "create", self.room_key.as_str()], + ) + .await?; + let invitation_code = self + .invite_regex + .find_iter(&invite_output) + .filter(|m| m.as_str() != self.room_key) + .last() + .map(|m| m.as_str().to_string()) + .ok_or_else(|| anyhow!("failed to parse invitation code from riverctl output"))?; + + self.run_riverctl( + RiverUser::Bob, + &["invite", "accept", &invitation_code, "--nickname", "Bob"], + ) + .await?; + + self.send_message(RiverUser::Alice, "Soak test initialized") + .await?; + self.send_message(RiverUser::Bob, "Bob joined the soak test") + .await?; + Ok(()) + } + + async fn send_message(&self, user: RiverUser, body: &str) -> anyhow::Result<()> { + self.run_riverctl(user, &["message", "send", self.room_key.as_str(), body]) + .await + .map(|_| ()) + } + + async fn list_messages(&self, user: RiverUser) -> anyhow::Result<()> { + self.run_riverctl(user, &["message", "list", self.room_key.as_str()]) + .await + .map(|_| ()) + } + + async fn run_riverctl(&self, user: RiverUser, args: &[&str]) -> anyhow::Result { + let (url, config_dir) = match user { + RiverUser::Alice => (&self.alice_url, self.alice_dir.path()), + RiverUser::Bob => (&self.bob_url, self.bob_dir.path()), + }; + + const MAX_RETRIES: usize = 3; + const RETRY_DELAY: Duration = Duration::from_secs(5); + + for attempt in 1..=MAX_RETRIES { + let mut cmd = tokio::process::Command::new(&self.riverctl); + cmd.arg("--node-url").arg(url); + cmd.args(args); + cmd.env("RIVER_CONFIG_DIR", config_dir); + + let output = cmd + .output() + .await + .context("failed to execute riverctl command")?; + if output.status.success() { + return Ok(String::from_utf8_lossy(&output.stdout).to_string()); + } + + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + let retriable = stderr.contains("Timeout waiting for") + || stderr.contains("connection refused") + || stderr.contains("HTTP request failed"); + if attempt == MAX_RETRIES || !retriable { + bail!("riverctl failed (user {:?}): {}", user, stderr); + } + println!( + "riverctl attempt {}/{} failed for {:?}: {}; retrying in {}s", + attempt, + MAX_RETRIES, + user, + stderr.trim(), + RETRY_DELAY.as_secs() + ); + sleep(RETRY_DELAY).await; + } + + unreachable!("riverctl retry loop should always return or bail") + } +} diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index e200a2e6b..5224682e4 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -93,6 +93,8 @@ async fn base_node_test_config( ignore_protocol_checking: true, address: Some(Ipv4Addr::LOCALHOST.into()), network_port: public_port, + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses: None, transient_budget: None, diff --git a/crates/core/tests/test_network_integration.rs b/crates/core/tests/test_network_integration.rs index f433ec932..3373d5cd0 100644 --- a/crates/core/tests/test_network_integration.rs +++ b/crates/core/tests/test_network_integration.rs @@ -8,23 +8,16 @@ use testresult::TestResult; use tokio_tungstenite::connect_async; // Helper to get or create network -async fn get_network() -> &'static TestNetwork { - use tokio::sync::OnceCell; - static NETWORK: OnceCell = OnceCell::const_new(); - - NETWORK - .get_or_init(|| async { - TestNetwork::builder() - .gateways(1) - .peers(2) - .binary(freenet_test_network::FreenetBinary::CurrentCrate( - freenet_test_network::BuildProfile::Debug, - )) - .build() - .await - .expect("Failed to start test network") - }) +async fn get_network() -> TestNetwork { + TestNetwork::builder() + .gateways(1) + .peers(2) + .binary(freenet_test_network::FreenetBinary::CurrentCrate( + freenet_test_network::BuildProfile::Debug, + )) + .build() .await + .expect("Failed to start test network") } #[tokio::test] diff --git a/crates/core/tests/token_expiration.rs b/crates/core/tests/token_expiration.rs index e51f10cf1..ff37cc72a 100644 --- a/crates/core/tests/token_expiration.rs +++ b/crates/core/tests/token_expiration.rs @@ -38,6 +38,8 @@ async fn create_test_config( network_api: NetworkArgs { address: Some(Ipv4Addr::LOCALHOST.into()), network_port: Some(network_socket.local_addr()?.port()), + transient_budget: None, + transient_ttl_secs: None, ..Default::default() }, config_paths: freenet::config::ConfigPathsArgs { @@ -130,6 +132,8 @@ async fn test_default_token_configuration() -> TestResult { network_api: NetworkArgs { address: Some(Ipv4Addr::LOCALHOST.into()), network_port: Some(network_socket.local_addr()?.port()), + transient_budget: None, + transient_ttl_secs: None, ..Default::default() }, config_paths: freenet::config::ConfigPathsArgs { diff --git a/crates/core/tests/ubertest.rs b/crates/core/tests/ubertest.rs index 5ed7fc2ef..de7ed05c3 100644 --- a/crates/core/tests/ubertest.rs +++ b/crates/core/tests/ubertest.rs @@ -186,6 +186,8 @@ async fn create_peer_config( ignore_protocol_checking: true, address: Some(peer_ip.into()), network_port: Some(network_port), + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses: None, transient_budget: None, diff --git a/crates/freenet-macros/src/codegen.rs b/crates/freenet-macros/src/codegen.rs index e7a7f76a7..cc9fc454b 100644 --- a/crates/freenet-macros/src/codegen.rs +++ b/crates/freenet-macros/src/codegen.rs @@ -147,6 +147,8 @@ fn generate_node_setup(args: &FreenetTestArgs) -> TokenStream { ignore_protocol_checking: true, address: Some(std::net::Ipv4Addr::LOCALHOST.into()), network_port: Some(network_port), + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses: None, transient_budget: None, @@ -259,6 +261,8 @@ fn generate_node_setup(args: &FreenetTestArgs) -> TokenStream { ignore_protocol_checking: true, address: Some(std::net::Ipv4Addr::LOCALHOST.into()), network_port: Some(network_port), + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses: None, transient_budget: None,