From 5ffb5f4c1d0872ee9102ddc820b021678e0b418d Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 18 Nov 2025 15:39:32 -0600 Subject: [PATCH 01/18] fix: track transient connections separately --- .../src/node/network_bridge/p2p_protoc.rs | 20 ++++++++++++------- crates/core/src/ring/connection_manager.rs | 7 +++++-- crates/core/src/ring/mod.rs | 1 - crates/core/tests/token_expiration.rs | 4 ++++ 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 1284e2978..e93b6cd4f 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -14,7 +14,7 @@ use std::{ }; use tokio::net::UdpSocket; use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender}; -use tokio::time::timeout; +use tokio::time::{sleep, timeout}; use tracing::Instrument; use super::{ConnectionError, EventLoopNotificationsReceiver, NetworkBridge}; @@ -1268,14 +1268,13 @@ impl P2pConnManager { ); } - // If we already have a transport channel, reuse it instead of dialing again. This covers - // transient->normal promotion without tripping duplicate connection errors. + // If a transient transport already exists, promote it without dialing anew. if self.connections.contains_key(&peer) { tracing::info!( tx = %tx, remote = %peer, courtesy, - "connect_peer: reusing existing transport" + "connect_peer: reusing existing transport / promoting transient if present" ); let connection_manager = &self.bridge.op_manager.ring.connection_manager; if let Some(entry) = connection_manager.drop_transient(&peer) { @@ -1290,8 +1289,11 @@ impl P2pConnManager { tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient"); } + let resolved_peer_id = connection_manager + .get_peer_key() + .expect("peer key should be set"); callback - .send_result(Ok((peer.clone(), None))) + .send_result(Ok((resolved_peer_id, None))) .await .inspect_err(|err| { tracing::debug!( @@ -1711,14 +1713,18 @@ impl P2pConnManager { let cm = connection_manager.clone(); let peer = peer_id.clone(); tokio::spawn(async move { - tokio::time::sleep(ttl).await; + sleep(ttl).await; if cm.drop_transient(&peer).is_some() { tracing::info!(%peer, "Transient connection expired; dropping"); if let Err(err) = drop_tx .send(Right(NodeEvent::DropConnection(peer.clone()))) .await { - tracing::warn!(%peer, ?err, "Failed to dispatch DropConnection for expired transient"); + tracing::warn!( + %peer, + ?err, + "Failed to dispatch DropConnection for expired transient" + ); } } }); diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 5e3f19240..28eb51e58 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -2,12 +2,13 @@ use dashmap::DashMap; use parking_lot::Mutex; use rand::prelude::IndexedRandom; use std::collections::{btree_map::Entry, BTreeMap}; +use std::net::SocketAddr; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::{Duration, Instant}; use crate::topology::{Limits, TopologyManager}; use super::*; -use std::time::{Duration, Instant}; #[derive(Clone)] pub(crate) struct TransientEntry { @@ -405,7 +406,6 @@ impl ConnectionManager { removed } - #[allow(dead_code)] pub fn is_transient(&self, peer: &PeerId) -> bool { self.transient_connections.contains_key(peer) } @@ -577,6 +577,9 @@ impl ConnectionManager { let connections = self.connections_by_location.read(); let peers = connections.values().filter_map(|conns| { let conn = conns.choose(&mut rand::rng())?; + if self.is_transient(&conn.location.peer) { + return None; + } if let Some(requester) = requesting { if requester == &conn.location.peer { return None; diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index 85a875bdd..7c68f0248 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -4,7 +4,6 @@ //! and routes requests to the optimal peers. use std::collections::{BTreeSet, HashSet}; -use std::net::SocketAddr; use std::{ sync::{atomic::AtomicU64, Arc, Weak}, time::{Duration, Instant}, diff --git a/crates/core/tests/token_expiration.rs b/crates/core/tests/token_expiration.rs index e51f10cf1..ff37cc72a 100644 --- a/crates/core/tests/token_expiration.rs +++ b/crates/core/tests/token_expiration.rs @@ -38,6 +38,8 @@ async fn create_test_config( network_api: NetworkArgs { address: Some(Ipv4Addr::LOCALHOST.into()), network_port: Some(network_socket.local_addr()?.port()), + transient_budget: None, + transient_ttl_secs: None, ..Default::default() }, config_paths: freenet::config::ConfigPathsArgs { @@ -130,6 +132,8 @@ async fn test_default_token_configuration() -> TestResult { network_api: NetworkArgs { address: Some(Ipv4Addr::LOCALHOST.into()), network_port: Some(network_socket.local_addr()?.port()), + transient_budget: None, + transient_ttl_secs: None, ..Default::default() }, config_paths: freenet::config::ConfigPathsArgs { From 432b5ffb9863e36dd44fee53f0dcf1fb8761e850 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 18 Nov 2025 15:40:43 -0600 Subject: [PATCH 02/18] fix: tidy transient registry formatting --- crates/core/src/ring/connection_manager.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 28eb51e58..88794c1da 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -393,6 +393,14 @@ impl ConnectionManager { true } + /// Registers (or updates) a transient connection without performing budget checks. + /// Used when the caller already reserved budget via `try_register_transient`. + pub fn register_transient(&self, peer: PeerId, location: Option) { + if !self.try_register_transient(peer.clone(), location) { + tracing::warn!(%peer, "register_transient: budget exhausted while updating"); + } + } + /// Drops a transient connection and returns its metadata, if it existed. /// Also decrements the transient budget counter. pub fn drop_transient(&self, peer: &PeerId) -> Option { @@ -406,10 +414,18 @@ impl ConnectionManager { removed } + pub fn deregister_transient(&self, peer: &PeerId) -> Option { + self.drop_transient(peer) + } + pub fn is_transient(&self, peer: &PeerId) -> bool { self.transient_connections.contains_key(peer) } + pub fn is_transient_peer(&self, peer: &PeerId) -> bool { + self.is_transient(peer) + } + pub fn transient_count(&self) -> usize { self.transient_in_use.load(Ordering::Acquire) } From 8248032f68dc6ca46becf7c9d218426da5de5937 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 18 Nov 2025 17:13:34 -0600 Subject: [PATCH 03/18] fix: clean transient promotion handling --- crates/core/src/node/network_bridge/p2p_protoc.rs | 12 +++++++++--- crates/core/src/ring/connection_manager.rs | 8 +++++++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index e93b6cd4f..c069ef5e4 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1289,9 +1289,8 @@ impl P2pConnManager { tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient"); } - let resolved_peer_id = connection_manager - .get_peer_key() - .expect("peer key should be set"); + // Return the remote peer we are connected to (not our own peer key). + let resolved_peer_id = peer.clone(); callback .send_result(Ok((resolved_peer_id, None))) .await @@ -1612,8 +1611,15 @@ impl P2pConnManager { current = connection_manager.transient_count(), "Transient connection budget exhausted; dropping inbound connection" ); + if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) { + for mut cb in callbacks { + let _ = cb.send_result(Err(())).await; + } + } + state.awaiting_connection_txs.remove(&peer_id.addr); return Ok(()); } + } let pending_txs = state .awaiting_connection_txs diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 88794c1da..3787387cd 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -14,6 +14,7 @@ use super::*; pub(crate) struct TransientEntry { #[allow(dead_code)] pub opened_at: Instant, + /// Advertised location for the transient peer, if known at admission time. pub location: Option, } @@ -393,7 +394,7 @@ impl ConnectionManager { true } - /// Registers (or updates) a transient connection without performing budget checks. + /// Record a transient connection for bookkeeping (kept out of routing/topology counts). /// Used when the caller already reserved budget via `try_register_transient`. pub fn register_transient(&self, peer: PeerId, location: Option) { if !self.try_register_transient(peer.clone(), location) { @@ -414,10 +415,12 @@ impl ConnectionManager { removed } + /// Remove transient tracking for a peer, returning any stored metadata. pub fn deregister_transient(&self, peer: &PeerId) -> Option { self.drop_transient(peer) } + /// Check whether a peer is currently tracked as transient. pub fn is_transient(&self, peer: &PeerId) -> bool { self.transient_connections.contains_key(peer) } @@ -426,14 +429,17 @@ impl ConnectionManager { self.is_transient(peer) } + /// Current number of tracked transient connections. pub fn transient_count(&self) -> usize { self.transient_in_use.load(Ordering::Acquire) } + /// Maximum transient slots allowed. pub fn transient_budget(&self) -> usize { self.transient_budget } + /// Time-to-live for transients before automatic drop. pub fn transient_ttl(&self) -> Duration { self.transient_ttl } From 33e3ceee466f6409e303fba905865c860e6bb04e Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 18 Nov 2025 17:16:14 -0600 Subject: [PATCH 04/18] fix: honor transient budget and promote correctly --- crates/core/src/node/network_bridge/p2p_protoc.rs | 15 +++++++++++++-- crates/core/src/ring/connection_manager.rs | 10 +++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index c069ef5e4..7c2f47a89 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1619,8 +1619,6 @@ impl P2pConnManager { state.awaiting_connection_txs.remove(&peer_id.addr); return Ok(()); } - } - let pending_txs = state .awaiting_connection_txs .remove(&peer_id.addr) @@ -1683,6 +1681,19 @@ impl P2pConnManager { // Only insert if connection doesn't already exist to avoid dropping existing channel let mut newly_inserted = false; if !self.connections.contains_key(&peer_id) { + if is_transient { + let cm = &self.bridge.op_manager.ring.connection_manager; + let current = cm.transient_count(); + if current >= cm.transient_budget() { + tracing::warn!( + remote = %peer_id.addr, + budget = cm.transient_budget(), + current, + "Transient connection budget exhausted; dropping inbound connection before insert" + ); + return Ok(()); + } + } let (tx, rx) = mpsc::channel(10); tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] INSERT: OutboundConnectionSuccessful - adding to connections HashMap"); self.connections.insert(peer_id.clone(), tx); diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 3787387cd..6594330f3 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -12,6 +12,8 @@ use super::*; #[derive(Clone)] pub(crate) struct TransientEntry { + /// Entry tracking a transient connection that hasn't been added to the ring topology yet. + /// Transient connections are typically unsolicited inbound connections to gateways. #[allow(dead_code)] pub opened_at: Instant, /// Advertised location for the transient peer, if known at admission time. @@ -394,8 +396,8 @@ impl ConnectionManager { true } - /// Record a transient connection for bookkeeping (kept out of routing/topology counts). - /// Used when the caller already reserved budget via `try_register_transient`. + /// Registers a new transient connection that is not yet part of the ring topology. + /// Transient connections are tracked separately and subject to budget and TTL limits. pub fn register_transient(&self, peer: PeerId, location: Option) { if !self.try_register_transient(peer.clone(), location) { tracing::warn!(%peer, "register_transient: budget exhausted while updating"); @@ -415,7 +417,9 @@ impl ConnectionManager { removed } - /// Remove transient tracking for a peer, returning any stored metadata. + /// Deregisters a transient connection, removing it from tracking. + /// + /// Returns the removed entry if it existed. pub fn deregister_transient(&self, peer: &PeerId) -> Option { self.drop_transient(peer) } From 0f38826efbe1f6a7ab5ea3d7defd7887078b608f Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 13:02:23 -0600 Subject: [PATCH 05/18] fix: remove unused transient helpers --- crates/core/src/ring/connection_manager.rs | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 6594330f3..d361e2ca2 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -396,14 +396,6 @@ impl ConnectionManager { true } - /// Registers a new transient connection that is not yet part of the ring topology. - /// Transient connections are tracked separately and subject to budget and TTL limits. - pub fn register_transient(&self, peer: PeerId, location: Option) { - if !self.try_register_transient(peer.clone(), location) { - tracing::warn!(%peer, "register_transient: budget exhausted while updating"); - } - } - /// Drops a transient connection and returns its metadata, if it existed. /// Also decrements the transient budget counter. pub fn drop_transient(&self, peer: &PeerId) -> Option { @@ -417,22 +409,11 @@ impl ConnectionManager { removed } - /// Deregisters a transient connection, removing it from tracking. - /// - /// Returns the removed entry if it existed. - pub fn deregister_transient(&self, peer: &PeerId) -> Option { - self.drop_transient(peer) - } - /// Check whether a peer is currently tracked as transient. pub fn is_transient(&self, peer: &PeerId) -> bool { self.transient_connections.contains_key(peer) } - pub fn is_transient_peer(&self, peer: &PeerId) -> bool { - self.is_transient(peer) - } - /// Current number of tracked transient connections. pub fn transient_count(&self) -> usize { self.transient_in_use.load(Ordering::Acquire) From 29fe5b07059af6438e3de9169337714d2b746fb4 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 15:54:10 -0600 Subject: [PATCH 06/18] test: add large network soak test with diagnostic snapshots --- AGENTS.md | 1 + crates/core/Cargo.toml | 3 +- crates/core/tests/large_network.rs | 275 +++++++++++++++++++++++++++++ 3 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 crates/core/tests/large_network.rs diff --git a/AGENTS.md b/AGENTS.md index 81c864d48..54150a2ee 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -157,6 +157,7 @@ Run these in any worktree before pushing a branch or opening a PR. ``` - Tests can share the static network and access `NETWORK.gateway(0).ws_url()` to communicate via `freenet_stdlib::client_api::WebApi`. - Run the crate’s suite with `cargo test -p freenet-test-network`. When `preserve_temp_dirs_on_failure(true)` is set, failing startups keep logs under `/tmp/freenet-test-network-/` for inspection. +- A larger soak test lives in `crates/core/tests/large_network.rs`. It is `#[ignore]` by default—run it manually with `cargo test -p freenet --test large_network -- --ignored --nocapture` once you have `riverctl` installed. The test writes diagnostics snapshots to the network’s `run_root()` directory for later analysis. ## Pull Requests & Reviews diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 803670669..bfea19de1 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -90,7 +90,7 @@ arbitrary = { features = ["derive"], version = "1" } chrono = { features = ["arbitrary"], workspace = true } freenet-stdlib = { features = ["net", "testing"], workspace = true } freenet-macros = { path = "../freenet-macros" } -freenet-test-network = "0.1.1" +freenet-test-network = { version = "0.1.1", path = "../../../../freenet-test-network" } httptest = "0.16" statrs = "0.18" tempfile = "3" @@ -100,6 +100,7 @@ tokio-tungstenite = "0.27.0" # console-subscriber = { version = "0.4" } ureq = { version = "3.1", features = ["json"] } which = "8.0" +regex = "1" [features] default = ["redb", "trace", "websocket"] diff --git a/crates/core/tests/large_network.rs b/crates/core/tests/large_network.rs new file mode 100644 index 000000000..b60c86798 --- /dev/null +++ b/crates/core/tests/large_network.rs @@ -0,0 +1,275 @@ +//! Large-scale soak test using `freenet-test-network`. +//! +//! This test intentionally spins up a sizable network (2 gateways + N peers) and exercises the +//! cluster for several minutes while capturing diagnostics snapshots and running River client +//! workflows via `riverctl`. +//! +//! ## Running Manually +//! ```text +//! cargo test -p freenet --test large_network -- --ignored --nocapture +//! ``` +//! Environment overrides: +//! - `SOAK_PEER_COUNT` – number of non-gateway peers (default: 38). +//! - `SOAK_SNAPSHOT_INTERVAL_SECS` – seconds between diagnostics snapshots (default: 60). +//! - `SOAK_SNAPSHOT_ITERATIONS` – number of snapshots to capture (default: 5). +//! - `SOAK_CONNECTIVITY_TARGET` – minimum ratio of peers that must report >=1 connection (default: 0.75). +//! +//! Requirements: +//! - `riverctl` must be installed and in PATH (`cargo install riverctl`). +//! - Enough CPU/RAM to host ~40 peers locally. +//! +//! The snapshots are stored under the network's `run_root()/large-soak/` directory for later +//! inspection or visualization. + +use anyhow::{anyhow, bail, ensure, Context}; +use freenet_test_network::{BuildProfile, FreenetBinary, TestNetwork}; +use regex::Regex; +use serde_json::to_string_pretty; +use std::{ + env, fs, + path::PathBuf, + time::{Duration, Instant}, +}; +use tempfile::TempDir; +use tokio::time::sleep; +use which::which; + +const DEFAULT_PEER_COUNT: usize = 38; +const DEFAULT_SNAPSHOT_INTERVAL: Duration = Duration::from_secs(60); +const DEFAULT_SNAPSHOT_ITERATIONS: usize = 5; +const DEFAULT_CONNECTIVITY_TARGET: f64 = 0.75; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[ignore = "Large soak test - run manually (see file header for instructions)"] +async fn large_network_soak() -> anyhow::Result<()> { + let peer_count = env::var("SOAK_PEER_COUNT") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_PEER_COUNT); + let snapshot_interval = env::var("SOAK_SNAPSHOT_INTERVAL_SECS") + .ok() + .and_then(|val| val.parse().ok()) + .map(Duration::from_secs) + .unwrap_or(DEFAULT_SNAPSHOT_INTERVAL); + let snapshot_iterations = env::var("SOAK_SNAPSHOT_ITERATIONS") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_SNAPSHOT_ITERATIONS); + let connectivity_target = env::var("SOAK_CONNECTIVITY_TARGET") + .ok() + .and_then(|val| val.parse::().ok()) + .unwrap_or(DEFAULT_CONNECTIVITY_TARGET); + + let network = TestNetwork::builder() + .gateways(2) + .peers(peer_count) + .require_connectivity(connectivity_target) + .connectivity_timeout(Duration::from_secs(120)) + .preserve_temp_dirs_on_failure(true) + .preserve_temp_dirs_on_success(true) + .binary(FreenetBinary::CurrentCrate(BuildProfile::Debug)) + .build() + .await + .context("failed to start soak test network")?; + + println!( + "Started soak network with {} gateways and {} peers (run root: {})", + 2, + peer_count, + network.run_root().display() + ); + + let riverctl_path = which("riverctl") + .context("riverctl not found in PATH; install via `cargo install riverctl`")?; + + let alice_url = format!("{}?encodingProtocol=native", network.gateway(0).ws_url()); + let bob_url = format!("{}?encodingProtocol=native", network.peer(0).ws_url()); + let session = RiverSession::initialize(riverctl_path, alice_url, bob_url).await?; + + let snapshots_dir = network.run_root().join("large-soak"); + fs::create_dir_all(&snapshots_dir)?; + + let mut iteration = 0usize; + let mut next_tick = Instant::now(); + while iteration < snapshot_iterations { + iteration += 1; + let snapshot = network.collect_diagnostics().await?; + let snapshot_path = snapshots_dir.join(format!("snapshot-{iteration:02}.json")); + fs::write(&snapshot_path, to_string_pretty(&snapshot)?)?; + + let healthy = snapshot + .peers + .iter() + .filter(|peer| peer.error.is_none() && !peer.connected_peer_ids.is_empty()) + .count(); + let ratio = healthy as f64 / snapshot.peers.len().max(1) as f64; + println!( + "Snapshot {iteration}/{snapshot_iterations}: {:.1}% peers healthy ({} / {}), wrote {}", + ratio * 100.0, + healthy, + snapshot.peers.len(), + snapshot_path.display() + ); + ensure!( + ratio >= connectivity_target, + "Connectivity dropped below {:.0}% (actual: {:.1}%). Inspect {}", + connectivity_target * 100.0, + ratio * 100.0, + snapshot_path.display() + ); + + // Exercise River application flows to ensure contracts stay responsive. + session + .send_message( + RiverUser::Alice, + &format!("Large soak heartbeat {} from Alice", iteration), + ) + .await?; + session + .send_message( + RiverUser::Bob, + &format!("Large soak heartbeat {} from Bob", iteration), + ) + .await?; + session.list_messages(RiverUser::Alice).await?; + + next_tick += snapshot_interval; + let now = Instant::now(); + if next_tick > now { + sleep(next_tick - now).await; + } + } + + println!( + "Large network soak complete; inspect {} for diagnostics snapshots", + snapshots_dir.display() + ); + Ok(()) +} + +struct RiverSession { + riverctl: PathBuf, + alice_dir: TempDir, + bob_dir: TempDir, + alice_url: String, + bob_url: String, + room_key: String, + invite_regex: Regex, + room_regex: Regex, +} + +#[derive(Clone, Copy, Debug)] +enum RiverUser { + Alice, + Bob, +} + +impl RiverSession { + async fn initialize( + riverctl: PathBuf, + alice_url: String, + bob_url: String, + ) -> anyhow::Result { + let alice_dir = TempDir::new().context("failed to create Alice temp config dir")?; + let bob_dir = TempDir::new().context("failed to create Bob temp config dir")?; + + let mut session = Self { + riverctl, + alice_dir, + bob_dir, + alice_url, + bob_url, + room_key: String::new(), + invite_regex: Regex::new(r"[A-Za-z0-9+/=]{40,}").unwrap(), + room_regex: Regex::new(r"[A-Za-z0-9]{40,}").unwrap(), + }; + + session.setup_room().await?; + Ok(session) + } + + async fn setup_room(&mut self) -> anyhow::Result<()> { + let create_output = self + .run_riverctl( + RiverUser::Alice, + &[ + "room", + "create", + "--name", + "large-network-soak", + "--nickname", + "Alice", + ], + ) + .await?; + self.room_key = self + .room_regex + .find(&create_output) + .map(|m| m.as_str().to_string()) + .ok_or_else(|| anyhow!("failed to parse room owner key from riverctl output"))?; + + let invite_output = self + .run_riverctl( + RiverUser::Alice, + &["invite", "create", self.room_key.as_str()], + ) + .await?; + let invitation_code = self + .invite_regex + .find_iter(&invite_output) + .filter(|m| m.as_str() != self.room_key) + .last() + .map(|m| m.as_str().to_string()) + .ok_or_else(|| anyhow!("failed to parse invitation code from riverctl output"))?; + + self.run_riverctl( + RiverUser::Bob, + &["invite", "accept", &invitation_code, "--nickname", "Bob"], + ) + .await?; + + self.send_message(RiverUser::Alice, "Soak test initialized") + .await?; + self.send_message(RiverUser::Bob, "Bob joined the soak test") + .await?; + Ok(()) + } + + async fn send_message(&self, user: RiverUser, body: &str) -> anyhow::Result<()> { + self.run_riverctl(user, &["message", "send", self.room_key.as_str(), body]) + .await + .map(|_| ()) + } + + async fn list_messages(&self, user: RiverUser) -> anyhow::Result<()> { + self.run_riverctl(user, &["message", "list", self.room_key.as_str()]) + .await + .map(|_| ()) + } + + async fn run_riverctl<'a>(&self, user: RiverUser, args: &[&'a str]) -> anyhow::Result { + let (url, config_dir) = match user { + RiverUser::Alice => (&self.alice_url, self.alice_dir.path()), + RiverUser::Bob => (&self.bob_url, self.bob_dir.path()), + }; + + let mut cmd = tokio::process::Command::new(&self.riverctl); + cmd.arg("--node-url").arg(url); + cmd.args(args); + cmd.env("RIVER_CONFIG_DIR", config_dir); + + let output = cmd + .output() + .await + .context("failed to execute riverctl command")?; + if !output.status.success() { + bail!( + "riverctl failed (user {:?}): {}", + user, + String::from_utf8_lossy(&output.stderr) + ); + } + + Ok(String::from_utf8_lossy(&output.stdout).to_string()) + } +} From 83daf8978702134bb4cbbb1bc03ac4d527a08450 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 15:55:20 -0600 Subject: [PATCH 07/18] test: add large network soak test with diagnostic snapshots --- Cargo.lock | 47 +++++++++++++++---- crates/core/tests/large_network.rs | 2 +- crates/core/tests/test_network_integration.rs | 25 ++++------ 3 files changed, 47 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8ddd374c6..38a2c9f91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1227,7 +1227,7 @@ dependencies = [ "libc", "option-ext", "redox_users 0.5.2", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1429,7 +1429,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -1679,6 +1679,7 @@ dependencies = [ "pkcs8", "rand 0.9.2", "redb", + "regex", "reqwest", "rsa", "semver", @@ -1813,15 +1814,15 @@ dependencies = [ [[package]] name = "freenet-test-network" version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d06be6aef3bb0433a963d0cc0c0f9b7d05e50b54fcb929e405fefab10d3b2db9" dependencies = [ "anyhow", "chrono", "freenet-stdlib", "futures 0.3.31", + "regex", "serde", "serde_json", + "ssh2", "sysinfo", "thiserror 1.0.69", "tokio", @@ -2428,7 +2429,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.1", "system-configuration", "tokio", "tower-service", @@ -2680,7 +2681,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -2842,6 +2843,20 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libssh2-sys" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "220e4f05ad4a218192533b300327f5150e809b54c4ec83b5a1d91833601811b9" +dependencies = [ + "cc", + "libc", + "libz-sys", + "openssl-sys", + "pkg-config", + "vcpkg", +] + [[package]] name = "libunwind" version = "1.3.3" @@ -3184,7 +3199,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4305,7 +4320,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -4988,6 +5003,18 @@ dependencies = [ "url", ] +[[package]] +name = "ssh2" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f84d13b3b8a0d4e91a2629911e951db1bb8671512f5c09d7d4ba34500ba68c8" +dependencies = [ + "bitflags 2.10.0", + "libc", + "libssh2-sys", + "parking_lot", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -5166,7 +5193,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -6416,7 +6443,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/crates/core/tests/large_network.rs b/crates/core/tests/large_network.rs index b60c86798..d84c2c2d3 100644 --- a/crates/core/tests/large_network.rs +++ b/crates/core/tests/large_network.rs @@ -247,7 +247,7 @@ impl RiverSession { .map(|_| ()) } - async fn run_riverctl<'a>(&self, user: RiverUser, args: &[&'a str]) -> anyhow::Result { + async fn run_riverctl(&self, user: RiverUser, args: &[&str]) -> anyhow::Result { let (url, config_dir) = match user { RiverUser::Alice => (&self.alice_url, self.alice_dir.path()), RiverUser::Bob => (&self.bob_url, self.bob_dir.path()), diff --git a/crates/core/tests/test_network_integration.rs b/crates/core/tests/test_network_integration.rs index f433ec932..3373d5cd0 100644 --- a/crates/core/tests/test_network_integration.rs +++ b/crates/core/tests/test_network_integration.rs @@ -8,23 +8,16 @@ use testresult::TestResult; use tokio_tungstenite::connect_async; // Helper to get or create network -async fn get_network() -> &'static TestNetwork { - use tokio::sync::OnceCell; - static NETWORK: OnceCell = OnceCell::const_new(); - - NETWORK - .get_or_init(|| async { - TestNetwork::builder() - .gateways(1) - .peers(2) - .binary(freenet_test_network::FreenetBinary::CurrentCrate( - freenet_test_network::BuildProfile::Debug, - )) - .build() - .await - .expect("Failed to start test network") - }) +async fn get_network() -> TestNetwork { + TestNetwork::builder() + .gateways(1) + .peers(2) + .binary(freenet_test_network::FreenetBinary::CurrentCrate( + freenet_test_network::BuildProfile::Debug, + )) + .build() .await + .expect("Failed to start test network") } #[tokio::test] From 8bdfe184be43f75be4f8fcd5c92590b23909e2ef Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 16:12:45 -0600 Subject: [PATCH 08/18] test: harden soak riverctl retries --- crates/core/tests/large_network.rs | 44 +++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/crates/core/tests/large_network.rs b/crates/core/tests/large_network.rs index d84c2c2d3..20970a647 100644 --- a/crates/core/tests/large_network.rs +++ b/crates/core/tests/large_network.rs @@ -253,23 +253,41 @@ impl RiverSession { RiverUser::Bob => (&self.bob_url, self.bob_dir.path()), }; - let mut cmd = tokio::process::Command::new(&self.riverctl); - cmd.arg("--node-url").arg(url); - cmd.args(args); - cmd.env("RIVER_CONFIG_DIR", config_dir); + const MAX_RETRIES: usize = 3; + const RETRY_DELAY: Duration = Duration::from_secs(5); - let output = cmd - .output() - .await - .context("failed to execute riverctl command")?; - if !output.status.success() { - bail!( - "riverctl failed (user {:?}): {}", + for attempt in 1..=MAX_RETRIES { + let mut cmd = tokio::process::Command::new(&self.riverctl); + cmd.arg("--node-url").arg(url); + cmd.args(args); + cmd.env("RIVER_CONFIG_DIR", config_dir); + + let output = cmd + .output() + .await + .context("failed to execute riverctl command")?; + if output.status.success() { + return Ok(String::from_utf8_lossy(&output.stdout).to_string()); + } + + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + let retriable = stderr.contains("Timeout waiting for") + || stderr.contains("connection refused") + || stderr.contains("HTTP request failed"); + if attempt == MAX_RETRIES || !retriable { + bail!("riverctl failed (user {:?}): {}", user, stderr); + } + println!( + "riverctl attempt {}/{} failed for {:?}: {}; retrying in {}s", + attempt, + MAX_RETRIES, user, - String::from_utf8_lossy(&output.stderr) + stderr.trim(), + RETRY_DELAY.as_secs() ); + sleep(RETRY_DELAY).await; } - Ok(String::from_utf8_lossy(&output.stdout).to_string()) + unreachable!("riverctl retry loop should always return or bail") } } From 0c10e743f6388e5c8b00e3a30834d7ed51a33262 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 21:35:00 -0600 Subject: [PATCH 09/18] refactor: rename courtesy links to transient --- .../core/src/node/network_bridge/handshake.rs | 92 +++++++++++++------ .../src/node/network_bridge/p2p_protoc.rs | 64 +++++++------ crates/core/src/operations/connect.rs | 62 ++++++------- crates/core/src/ring/connection_manager.rs | 24 ++++- crates/core/src/ring/mod.rs | 7 +- 5 files changed, 155 insertions(+), 94 deletions(-) diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index d1c56b285..c2f3c8b31 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -5,7 +5,7 @@ //! connection attempts to/from the event loop. Higher-level routing decisions now live inside //! `ConnectOp`. -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -31,21 +31,21 @@ pub(crate) enum Event { transaction: Option, peer: Option, connection: PeerConnection, - courtesy: bool, + transient: bool, }, /// An outbound connection attempt succeeded. OutboundEstablished { transaction: Transaction, peer: PeerId, connection: PeerConnection, - courtesy: bool, + transient: bool, }, /// An outbound connection attempt failed. OutboundFailed { transaction: Transaction, peer: PeerId, error: ConnectionError, - courtesy: bool, + transient: bool, }, } @@ -56,13 +56,13 @@ pub(crate) enum Command { Connect { peer: PeerId, transaction: Transaction, - courtesy: bool, + transient: bool, }, /// Register expectation for an inbound connection from `peer`. ExpectInbound { peer: PeerId, transaction: Option, - courtesy: bool, + transient: bool, }, /// Remove state associated with `peer`. DropConnection { peer: PeerId }, @@ -122,32 +122,64 @@ impl Stream for HandshakeHandler { struct ExpectedInbound { peer: PeerId, transaction: Option, - courtesy: bool, + transient: bool, // TODO: rename to transient in protocol once we migrate terminology } #[derive(Default)] struct ExpectedInboundTracker { - entries: HashMap, + entries: HashMap>, } impl ExpectedInboundTracker { - fn register(&mut self, peer: PeerId, transaction: Option, courtesy: bool) { - self.entries.insert( - peer.addr, - ExpectedInbound { + fn register(&mut self, peer: PeerId, transaction: Option, transient: bool) { + tracing::debug!( + remote = %peer.addr, + transient, + tx = ?transaction, + "ExpectInbound: registering expectation" + ); + self.entries + .entry(peer.addr.ip()) + .or_default() + .push(ExpectedInbound { peer, transaction, - courtesy, - }, - ); + transient, + }); } fn drop_peer(&mut self, peer: &PeerId) { - self.entries.remove(&peer.addr); + if let Some(list) = self.entries.get_mut(&peer.addr.ip()) { + list.retain(|entry| entry.peer != *peer); + if list.is_empty() { + self.entries.remove(&peer.addr.ip()); + } + } } fn consume(&mut self, addr: SocketAddr) -> Option { - self.entries.remove(&addr) + let ip = addr.ip(); + let list = self.entries.get_mut(&ip)?; + if let Some(pos) = list + .iter() + .position(|entry| entry.peer.addr.port() == addr.port()) + { + let entry = list.remove(pos); + if list.is_empty() { + self.entries.remove(&ip); + } + tracing::debug!(remote = %addr, peer = %entry.peer.addr, transient = entry.transient, tx = ?entry.transaction, "ExpectInbound: matched by exact port"); + return Some(entry); + } + let entry = list.pop(); + if list.is_empty() { + self.entries.remove(&ip); + } + if let Some(entry) = entry { + tracing::debug!(remote = %addr, peer = %entry.peer.addr, transient = entry.transient, tx = ?entry.transaction, "ExpectInbound: matched by IP fallback"); + return Some(entry); + } + None } #[cfg(test)] @@ -170,12 +202,12 @@ async fn run_driver( loop { select! { command = commands_rx.recv() => match command { - Some(Command::Connect { peer, transaction, courtesy }) => { - spawn_outbound(outbound.clone(), events_tx.clone(), peer, transaction, courtesy, peer_ready.clone()); - } - Some(Command::ExpectInbound { peer, transaction, courtesy }) => { - expected_inbound.register(peer, transaction, courtesy); + Some(Command::Connect { peer, transaction, transient }) => { + spawn_outbound(outbound.clone(), events_tx.clone(), peer, transaction, transient, peer_ready.clone()); } + Some(Command::ExpectInbound { peer, transaction, transient }) => { + expected_inbound.register(peer, transaction, transient /* transient */); + } Some(Command::DropConnection { peer }) => { expected_inbound.drop_peer(&peer); } @@ -190,8 +222,8 @@ async fn run_driver( let remote_addr = conn.remote_addr(); let entry = expected_inbound.consume(remote_addr); - let (peer, transaction, courtesy) = if let Some(entry) = entry { - (Some(entry.peer), entry.transaction, entry.courtesy) + let (peer, transaction, transient) = if let Some(entry) = entry { + (Some(entry.peer), entry.transaction, entry.transient) } else { (None, None, false) }; @@ -200,7 +232,7 @@ async fn run_driver( transaction, peer, connection: conn, - courtesy, + transient, }).await.is_err() { break; } @@ -217,7 +249,7 @@ fn spawn_outbound( events_tx: mpsc::Sender, peer: PeerId, transaction: Transaction, - courtesy: bool, + transient: bool, peer_ready: Option>, ) { tokio::spawn(async move { @@ -241,13 +273,13 @@ fn spawn_outbound( transaction, peer: peer.clone(), connection, - courtesy, + transient, }, Err(error) => Event::OutboundFailed { transaction, peer: peer.clone(), error, - courtesy, + transient, }, }; @@ -280,7 +312,7 @@ mod tests { .expect("expected registered inbound entry"); assert_eq!(entry.peer, peer); assert_eq!(entry.transaction, Some(tx)); - assert!(entry.courtesy); + assert!(entry.transient); assert!(tracker.consume(peer.addr).is_none()); } @@ -308,6 +340,6 @@ mod tests { .consume(peer.addr) .expect("entry should be present after overwrite"); assert_eq!(entry.transaction, Some(new_tx)); - assert!(entry.courtesy); + assert!(entry.transient); } } diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 7c2f47a89..391560f66 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -665,13 +665,13 @@ impl P2pConnManager { peer, tx, callback, - is_gw: courtesy, + is_gw: transient, } => { tracing::info!( tx = %tx, remote = %peer, remote_addr = %peer.addr, - courtesy, + transient, "NodeEvent::ConnectPeer received" ); ctx.handle_connect_peer( @@ -680,7 +680,7 @@ impl P2pConnManager { tx, &handshake_cmd_sender, &mut state, - courtesy, + transient, ) .await?; } @@ -691,7 +691,7 @@ impl P2pConnManager { .send(HandshakeCommand::ExpectInbound { peer: peer.clone(), transaction: None, - courtesy: false, + transient: false, }) .await { @@ -1208,7 +1208,7 @@ impl P2pConnManager { tx: Transaction, handshake_commands: &HandshakeCommandSender, state: &mut EventListenerState, - courtesy: bool, + transient: bool, ) -> anyhow::Result<()> { let mut peer = peer; let mut peer_addr = peer.addr; @@ -1221,13 +1221,13 @@ impl P2pConnManager { tx = %tx, remote = %peer, fallback_addr = %peer_addr, - courtesy, + transient, "ConnectPeer provided unspecified address; using existing connection address" ); } else { tracing::debug!( tx = %tx, - courtesy, + transient, "ConnectPeer received unspecified address without existing connection reference" ); } @@ -1237,7 +1237,7 @@ impl P2pConnManager { tx = %tx, remote = %peer, remote_addr = %peer_addr, - courtesy, + transient, "Connecting to peer" ); if let Some(blocked_addrs) = &self.blocked_addresses { @@ -1273,7 +1273,7 @@ impl P2pConnManager { tracing::info!( tx = %tx, remote = %peer, - courtesy, + transient, "connect_peer: reusing existing transport / promoting transient if present" ); let connection_manager = &self.bridge.op_manager.ring.connection_manager; @@ -1316,7 +1316,7 @@ impl P2pConnManager { tx = %tx, remote = %peer_addr, pending = callbacks.get().len(), - courtesy, + transient, "Connection already pending, queuing additional requester" ); callbacks.get_mut().push(callback); @@ -1325,7 +1325,7 @@ impl P2pConnManager { remote = %peer_addr, pending = callbacks.get().len(), pending_txs = ?txs_entry, - courtesy, + transient, "connect_peer: connection already pending, queued callback" ); return Ok(()); @@ -1336,7 +1336,7 @@ impl P2pConnManager { tracing::debug!( tx = %tx, remote = %peer_addr, - courtesy, + transient, "connect_peer: registering new pending connection" ); entry.insert(vec![callback]); @@ -1345,7 +1345,7 @@ impl P2pConnManager { remote = %peer_addr, pending = 1, pending_txs = ?txs_entry, - courtesy, + transient, "connect_peer: registered new pending connection" ); state.outbound_handler.expect_incoming(peer_addr); @@ -1356,14 +1356,14 @@ impl P2pConnManager { .send(HandshakeCommand::Connect { peer: peer.clone(), transaction: tx, - courtesy, + transient, }) .await { tracing::warn!( tx = %tx, remote = %peer.addr, - courtesy, + transient, ?error, "Failed to enqueue connect command" ); @@ -1378,7 +1378,7 @@ impl P2pConnManager { tx = %tx, remote = %peer_addr, callbacks = callbacks.len(), - courtesy, + transient, "Cleaning up callbacks after connect command failure" ); for mut cb in callbacks { @@ -1405,7 +1405,7 @@ impl P2pConnManager { tracing::debug!( tx = %tx, remote = %peer_addr, - courtesy, + transient, "connect_peer: handshake command dispatched" ); } @@ -1424,16 +1424,17 @@ impl P2pConnManager { transaction, peer, connection, - courtesy, + transient, } => { - let conn_manager = &self.bridge.op_manager.ring.connection_manager; + tracing::info!(provided = ?peer, transient = transient, tx = ?transaction, "InboundConnection event"); + let _conn_manager = &self.bridge.op_manager.ring.connection_manager; let remote_addr = connection.remote_addr(); if let Some(blocked_addrs) = &self.blocked_addresses { if blocked_addrs.contains(&remote_addr) { tracing::info!( remote = %remote_addr, - courtesy, + transient = transient, transaction = ?transaction, "Inbound connection blocked by local policy" ); @@ -1441,11 +1442,11 @@ impl P2pConnManager { } } - let provided_peer = peer.clone(); + let _provided_peer = peer.clone(); let peer_id = peer.unwrap_or_else(|| { tracing::info!( remote = %remote_addr, - courtesy, + transient = transient, transaction = ?transaction, "Inbound connection arrived without matching expectation; accepting provisionally" ); @@ -1463,13 +1464,14 @@ impl P2pConnManager { tracing::info!( remote = %peer_id.addr, - courtesy, + transient = transient, transaction = ?transaction, "Inbound connection established" ); - let is_transient = - conn_manager.is_gateway() && provided_peer.is_none() && transaction.is_none(); + // Treat only transient connections as transient. Normal inbound dials (including + // gateway bootstrap from peers) should be promoted into the ring once established. + let is_transient = transient; self.handle_successful_connection(peer_id, connection, state, None, is_transient) .await?; @@ -1478,11 +1480,11 @@ impl P2pConnManager { transaction, peer, connection, - courtesy, + transient, } => { tracing::info!( remote = %peer.addr, - courtesy, + transient = transient, transaction = %transaction, "Outbound connection established" ); @@ -1493,11 +1495,11 @@ impl P2pConnManager { transaction, peer, error, - courtesy, + transient, } => { tracing::info!( remote = %peer.addr, - courtesy, + transient = transient, transaction = %transaction, ?error, "Outbound connection failed" @@ -1519,7 +1521,7 @@ impl P2pConnManager { remote = %peer.addr, callbacks = callbacks.len(), pending_txs = ?pending_txs, - courtesy, + transient, "Notifying callbacks after outbound failure" ); @@ -1710,9 +1712,11 @@ impl P2pConnManager { } if newly_inserted { + tracing::info!(remote = %peer_id, is_transient, "handle_successful_connection: inserted new connection entry"); let pending_loc = connection_manager.prune_in_transit_connection(&peer_id); if !is_transient { let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr)); + tracing::info!(remote = %peer_id, %loc, "handle_successful_connection: promoting connection into ring"); self.bridge .op_manager .ring diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index f0d055715..4d42d560c 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -86,9 +86,9 @@ impl fmt::Display for ConnectMsg { ), ConnectMsg::Response { sender, target, payload, .. } => write!( f, - "ConnectResponse {{ sender: {sender}, target: {target}, acceptor: {}, courtesy: {} }}", + "ConnectResponse {{ sender: {sender}, target: {target}, acceptor: {}, transient: {} }}", payload.acceptor, - payload.courtesy + payload.transient ), ConnectMsg::ObservedAddress { target, address, .. } => { write!(f, "ObservedAddress {{ target: {target}, address: {address} }}") @@ -126,8 +126,8 @@ pub(crate) struct ConnectRequest { pub(crate) struct ConnectResponse { /// The peer that accepted the join request. pub acceptor: PeerKeyLocation, - /// Whether this acceptance is a short-lived courtesy link. - pub courtesy: bool, + /// Whether this acceptance is a short-lived transient link. + pub transient: bool, } /// New minimal state machine the joiner tracks. @@ -154,7 +154,7 @@ pub(crate) struct RelayState { pub upstream: PeerKeyLocation, pub request: ConnectRequest, pub forwarded_to: Option, - pub courtesy_hint: bool, + pub transient_hint: bool, pub observed_sent: bool, pub accepted_locally: bool, } @@ -175,8 +175,8 @@ pub(crate) trait RelayContext { visited: &[PeerKeyLocation], ) -> Option; - /// Whether the acceptance should be treated as a short-lived courtesy link. - fn courtesy_hint(&self, acceptor: &PeerKeyLocation, joiner: &PeerKeyLocation) -> bool; + /// Whether the acceptance should be treated as a short-lived transient link. + fn transient_hint(&self, acceptor: &PeerKeyLocation, joiner: &PeerKeyLocation) -> bool; } /// Result of processing a request at a relay. @@ -215,11 +215,11 @@ impl RelayState { if !self.accepted_locally && ctx.should_accept(&self.request.joiner) { self.accepted_locally = true; let acceptor = ctx.self_location().clone(); - let courtesy = ctx.courtesy_hint(&acceptor, &self.request.joiner); - self.courtesy_hint = courtesy; + let transient = ctx.transient_hint(&acceptor, &self.request.joiner); + self.transient_hint = transient; actions.accept_response = Some(ConnectResponse { acceptor: acceptor.clone(), - courtesy, + transient, }); actions.expect_connection_from = Some(self.request.joiner.clone()); } @@ -299,10 +299,10 @@ impl RelayContext for RelayEnv<'_> { .routing(desired_location, None, skip, &router) } - fn courtesy_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool { + fn transient_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool { // Courtesy slots still piggyback on regular connections. Flag the first acceptance so the - // joiner can prioritise it, and keep the logic simple until dedicated courtesy tracking - // is wired in (see courtesy-connection-budget branch). + // joiner can prioritise it, and keep the logic simple until dedicated transient tracking + // is wired in (see transient-connection-budget branch). self.op_manager.ring.open_connections() == 0 } } @@ -310,7 +310,7 @@ impl RelayContext for RelayEnv<'_> { #[derive(Debug)] pub struct AcceptedPeer { pub peer: PeerKeyLocation, - pub courtesy: bool, + pub transient: bool, } #[derive(Debug, Default)] @@ -331,7 +331,7 @@ impl JoinerState { self.last_progress = now; acceptance.new_acceptor = Some(AcceptedPeer { peer: response.acceptor.clone(), - courtesy: response.courtesy, + transient: response.transient, }); acceptance.assigned_location = self.accepted.len() == 1; } @@ -391,7 +391,7 @@ impl ConnectOp { upstream, request, forwarded_to: None, - courtesy_hint: false, + transient_hint: false, observed_sent: false, accepted_locally: false, })); @@ -507,7 +507,7 @@ impl ConnectOp { upstream: upstream.clone(), request: request.clone(), forwarded_to: None, - courtesy_hint: false, + transient_hint: false, observed_sent: false, accepted_locally: false, }))); @@ -666,7 +666,7 @@ impl Operation for ConnectOp { peer: new_acceptor.peer.peer.clone(), tx: self.id, callback, - is_gw: new_acceptor.courtesy, + is_gw: new_acceptor.transient, }) .await?; @@ -960,7 +960,7 @@ mod tests { self_loc: PeerKeyLocation, accept: bool, next_hop: Option, - courtesy: bool, + transient: bool, } impl TestRelayContext { @@ -969,7 +969,7 @@ mod tests { self_loc, accept: true, next_hop: None, - courtesy: false, + transient: false, } } @@ -983,8 +983,8 @@ mod tests { self } - fn courtesy(mut self, courtesy: bool) -> Self { - self.courtesy = courtesy; + fn transient(mut self, transient: bool) -> Self { + self.transient = transient; self } } @@ -1006,8 +1006,8 @@ mod tests { self.next_hop.clone() } - fn courtesy_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool { - self.courtesy + fn transient_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool { + self.transient } } @@ -1034,17 +1034,17 @@ mod tests { observed_addr: Some(joiner.peer.addr), }, forwarded_to: None, - courtesy_hint: false, + transient_hint: false, observed_sent: false, accepted_locally: false, }; - let ctx = TestRelayContext::new(self_loc.clone()).courtesy(true); + let ctx = TestRelayContext::new(self_loc.clone()).transient(true); let actions = state.handle_request(&ctx, &joiner); let response = actions.accept_response.expect("expected acceptance"); assert_eq!(response.acceptor.peer, self_loc.peer); - assert!(response.courtesy); + assert!(response.transient); assert_eq!(actions.expect_connection_from.unwrap().peer, joiner.peer); assert!(actions.forward.is_none()); } @@ -1064,7 +1064,7 @@ mod tests { observed_addr: Some(joiner.peer.addr), }, forwarded_to: None, - courtesy_hint: false, + transient_hint: false, observed_sent: false, accepted_locally: false, }; @@ -1099,7 +1099,7 @@ mod tests { observed_addr: Some(observed_addr), }, forwarded_to: None, - courtesy_hint: false, + transient_hint: false, observed_sent: false, accepted_locally: false, }; @@ -1127,13 +1127,13 @@ mod tests { let response = ConnectResponse { acceptor: acceptor.clone(), - courtesy: false, + transient: false, }; let result = state.register_acceptance(&response, Instant::now()); assert!(result.satisfied); let new = result.new_acceptor.expect("expected new acceptor"); assert_eq!(new.peer.peer, acceptor.peer); - assert!(!new.courtesy); + assert!(!new.transient); } #[test] diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index d361e2ca2..8d2744bf0 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -358,6 +358,7 @@ impl ConnectionManager { self.peer_key.lock().clone() } + #[allow(dead_code)] pub fn is_gateway(&self) -> bool { self.is_gateway } @@ -536,8 +537,18 @@ impl ConnectionManager { tracing::debug!("no location found for peer, skip pruning"); return None; } else { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + let prev = self + .reserved_connections + .load(std::sync::atomic::Ordering::SeqCst); + if prev == 0 { + tracing::warn!( + %peer, + "prune_connection: no reserved slots to release for in-transit peer" + ); + } else { + self.reserved_connections + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + } } return None; }; @@ -582,6 +593,15 @@ impl ConnectionManager { router: &Router, ) -> Option { let connections = self.connections_by_location.read(); + tracing::debug!( + total_locations = connections.len(), + self_peer = self + .get_peer_key() + .as_ref() + .map(|id| id.to_string()) + .unwrap_or_else(|| "unknown".into()), + "routing: considering connections" + ); let peers = connections.values().filter_map(|conns| { let conn = conns.choose(&mut rand::rng())?; if self.is_transient(&conn.location.peer) { diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index 7c68f0248..8b81e84fb 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -603,13 +603,18 @@ impl Ring { %ideal_location, num_connections, skip_list_size = skip_list.len(), + self_peer = %self.connection_manager.get_peer_key().as_ref().map(|id| id.to_string()).unwrap_or_else(|| "unknown".into()), "Looking for peer to route through" ); if let Some(target) = self.connection_manager .routing(ideal_location, None, skip_list, &router) { - tracing::debug!(query_target = %target, "Found routing target"); + tracing::info!( + query_target = %target, + %ideal_location, + "connection_maintenance selected routing target" + ); target } else { tracing::warn!( From 42875ece51432fea764d27240269e94a6d66aad3 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 21:36:13 -0600 Subject: [PATCH 10/18] test: fix ExpectedInboundTracker helper for transient rename --- crates/core/src/node/network_bridge/handshake.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index c2f3c8b31..ef516a0ae 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -184,7 +184,7 @@ impl ExpectedInboundTracker { #[cfg(test)] fn contains(&self, addr: SocketAddr) -> bool { - self.entries.contains_key(&addr) + self.entries.contains_key(&addr.ip()) } } From 508e2d7a2a374e39e14a0bab8448cf8ab2f257b6 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 22:24:04 -0600 Subject: [PATCH 11/18] feat: expose connection tuning and bump test harness --- Cargo.lock | 2 +- apps/freenet-ping/app/tests/common/mod.rs | 2 + crates/core/Cargo.toml | 2 +- crates/core/src/config/mod.rs | 80 +++++++++++++++++++++++ crates/core/src/node/mod.rs | 4 +- crates/core/tests/error_notification.rs | 4 ++ crates/core/tests/operations.rs | 2 + crates/core/tests/ubertest.rs | 2 + crates/freenet-macros/src/codegen.rs | 4 ++ 9 files changed, 98 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 38a2c9f91..6f2dfb7f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1813,7 +1813,7 @@ dependencies = [ [[package]] name = "freenet-test-network" -version = "0.1.1" +version = "0.1.2" dependencies = [ "anyhow", "chrono", diff --git a/apps/freenet-ping/app/tests/common/mod.rs b/apps/freenet-ping/app/tests/common/mod.rs index 665962391..906e36832 100644 --- a/apps/freenet-ping/app/tests/common/mod.rs +++ b/apps/freenet-ping/app/tests/common/mod.rs @@ -125,6 +125,8 @@ pub async fn base_node_test_config_with_rng( ignore_protocol_checking: true, address: Some(Ipv4Addr::LOCALHOST.into()), network_port: public_port, // if None, node will pick a free one or use default + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses, transient_budget: None, diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index bfea19de1..98d266021 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -90,7 +90,7 @@ arbitrary = { features = ["derive"], version = "1" } chrono = { features = ["arbitrary"], workspace = true } freenet-stdlib = { features = ["net", "testing"], workspace = true } freenet-macros = { path = "../freenet-macros" } -freenet-test-network = { version = "0.1.1", path = "../../../../freenet-test-network" } +freenet-test-network = { version = "0.1.2", path = "../../../../freenet-test-network" } httptest = "0.16" statrs = "0.18" tempfile = "3" diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs index 36072d01f..907a8fa75 100644 --- a/crates/core/src/config/mod.rs +++ b/crates/core/src/config/mod.rs @@ -102,6 +102,8 @@ impl Default for ConfigArgs { blocked_addresses: None, transient_budget: Some(DEFAULT_TRANSIENT_BUDGET), transient_ttl_secs: Some(DEFAULT_TRANSIENT_TTL_SECS), + min_connections: None, + max_connections: None, }, ws_api: WebsocketApiArgs { address: Some(default_listening_address()), @@ -243,6 +245,38 @@ impl ConfigArgs { self.ws_api .token_cleanup_interval_seconds .get_or_insert(cfg.ws_api.token_cleanup_interval_seconds); + self.network_api + .address + .get_or_insert(cfg.network_api.address); + self.network_api + .network_port + .get_or_insert(cfg.network_api.port); + if let Some(addr) = cfg.network_api.public_address { + self.network_api.public_address.get_or_insert(addr); + } + if let Some(port) = cfg.network_api.public_port { + self.network_api.public_port.get_or_insert(port); + } + if let Some(limit) = cfg.network_api.bandwidth_limit { + self.network_api.bandwidth_limit.get_or_insert(limit); + } + if let Some(addrs) = cfg.network_api.blocked_addresses { + self.network_api + .blocked_addresses + .get_or_insert_with(|| addrs.into_iter().collect()); + } + self.network_api + .transient_budget + .get_or_insert(cfg.network_api.transient_budget); + self.network_api + .transient_ttl_secs + .get_or_insert(cfg.network_api.transient_ttl_secs); + self.network_api + .min_connections + .get_or_insert(cfg.network_api.min_connections); + self.network_api + .max_connections + .get_or_insert(cfg.network_api.max_connections); self.log_level.get_or_insert(cfg.log_level); self.config_paths.merge(cfg.config_paths.as_ref().clone()); } @@ -374,6 +408,14 @@ impl ConfigArgs { .network_api .transient_ttl_secs .unwrap_or(DEFAULT_TRANSIENT_TTL_SECS), + min_connections: self + .network_api + .min_connections + .unwrap_or(DEFAULT_MIN_CONNECTIONS), + max_connections: self + .network_api + .max_connections + .unwrap_or(DEFAULT_MAX_CONNECTIONS), }, ws_api: WebsocketApiConfig { // the websocket API is always local @@ -565,6 +607,22 @@ pub struct NetworkArgs { #[arg(long, env = "TRANSIENT_TTL_SECS")] #[serde(rename = "transient-ttl-secs", skip_serializing_if = "Option::is_none")] pub transient_ttl_secs: Option, + + /// Minimum desired connections for the ring topology. Defaults to 10. + #[arg(long = "min-number-of-connections", env = "MIN_NUMBER_OF_CONNECTIONS")] + #[serde( + rename = "min-number-of-connections", + skip_serializing_if = "Option::is_none" + )] + pub min_connections: Option, + + /// Maximum allowed connections for the ring topology. Defaults to 20. + #[arg(long = "max-number-of-connections", env = "MAX_NUMBER_OF_CONNECTIONS")] + #[serde( + rename = "max-number-of-connections", + skip_serializing_if = "Option::is_none" + )] + pub max_connections: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -639,6 +697,20 @@ pub struct NetworkApiConfig { /// Time (in seconds) before an unpromoted transient connection is dropped. #[serde(default = "default_transient_ttl_secs", rename = "transient-ttl-secs")] pub transient_ttl_secs: u64, + + /// Minimum desired connections for the ring topology. + #[serde( + default = "default_min_connections", + rename = "min-number-of-connections" + )] + pub min_connections: usize, + + /// Maximum allowed connections for the ring topology. + #[serde( + default = "default_max_connections", + rename = "max-number-of-connections" + )] + pub max_connections: usize, } mod port_allocation; @@ -656,6 +728,14 @@ fn default_transient_ttl_secs() -> u64 { DEFAULT_TRANSIENT_TTL_SECS } +fn default_min_connections() -> usize { + DEFAULT_MIN_CONNECTIONS +} + +fn default_max_connections() -> usize { + DEFAULT_MAX_CONNECTIONS +} + #[derive(clap::Parser, Debug, Default, Copy, Clone, Serialize, Deserialize)] pub struct WebsocketApiArgs { /// Address to bind to for the websocket API, default is 0.0.0.0 diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index e5446027b..16fabea01 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -192,8 +192,8 @@ impl NodeConfig { config: Arc::new(config.clone()), max_hops_to_live: None, rnd_if_htl_above: None, - max_number_conn: None, - min_number_conn: None, + max_number_conn: Some(config.network_api.max_connections), + min_number_conn: Some(config.network_api.min_connections), max_upstream_bandwidth: None, max_downstream_bandwidth: None, blocked_addresses: config.network_api.blocked_addresses.clone(), diff --git a/crates/core/tests/error_notification.rs b/crates/core/tests/error_notification.rs index 7f59a642c..392a78019 100644 --- a/crates/core/tests/error_notification.rs +++ b/crates/core/tests/error_notification.rs @@ -298,6 +298,8 @@ async fn test_connection_drop_error_notification() -> anyhow::Result<()> { ignore_protocol_checking: true, address: Some(Ipv4Addr::LOCALHOST.into()), network_port: Some(gateway_port), + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses: None, transient_budget: None, @@ -349,6 +351,8 @@ async fn test_connection_drop_error_notification() -> anyhow::Result<()> { ignore_protocol_checking: true, address: Some(Ipv4Addr::LOCALHOST.into()), network_port: None, + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses: None, transient_budget: None, diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index e200a2e6b..5224682e4 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -93,6 +93,8 @@ async fn base_node_test_config( ignore_protocol_checking: true, address: Some(Ipv4Addr::LOCALHOST.into()), network_port: public_port, + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses: None, transient_budget: None, diff --git a/crates/core/tests/ubertest.rs b/crates/core/tests/ubertest.rs index 5ed7fc2ef..de7ed05c3 100644 --- a/crates/core/tests/ubertest.rs +++ b/crates/core/tests/ubertest.rs @@ -186,6 +186,8 @@ async fn create_peer_config( ignore_protocol_checking: true, address: Some(peer_ip.into()), network_port: Some(network_port), + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses: None, transient_budget: None, diff --git a/crates/freenet-macros/src/codegen.rs b/crates/freenet-macros/src/codegen.rs index e7a7f76a7..cc9fc454b 100644 --- a/crates/freenet-macros/src/codegen.rs +++ b/crates/freenet-macros/src/codegen.rs @@ -147,6 +147,8 @@ fn generate_node_setup(args: &FreenetTestArgs) -> TokenStream { ignore_protocol_checking: true, address: Some(std::net::Ipv4Addr::LOCALHOST.into()), network_port: Some(network_port), + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses: None, transient_budget: None, @@ -259,6 +261,8 @@ fn generate_node_setup(args: &FreenetTestArgs) -> TokenStream { ignore_protocol_checking: true, address: Some(std::net::Ipv4Addr::LOCALHOST.into()), network_port: Some(network_port), + min_connections: None, + max_connections: None, bandwidth_limit: None, blocked_addresses: None, transient_budget: None, From 60aad026eeb5c2913cc4ead844b72440aaa8a646 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 22:34:56 -0600 Subject: [PATCH 12/18] fix: skip topology add when neighbor map empty --- crates/core/src/topology/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/core/src/topology/mod.rs b/crates/core/src/topology/mod.rs index 6f7f29b55..6ab1531a9 100644 --- a/crates/core/src/topology/mod.rs +++ b/crates/core/src/topology/mod.rs @@ -480,6 +480,11 @@ impl TopologyManager { &mut self, neighbor_locations: &BTreeMap>, ) -> anyhow::Result { + if neighbor_locations.is_empty() { + tracing::warn!("select_connections_to_add: neighbor map empty; skipping adjustment"); + return Ok(TopologyAdjustment::NoChange); + } + let function_span = span!(Level::INFO, "add_connections"); let _enter = function_span.enter(); From 07d8c67e58b5cee34b7f869f184633f3dcb93363 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Wed, 19 Nov 2025 23:08:46 -0600 Subject: [PATCH 13/18] test: instrument neighbor candidates and live tx tracking --- crates/core/src/ring/live_tx.rs | 4 ++++ crates/core/src/ring/mod.rs | 6 ++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/crates/core/src/ring/live_tx.rs b/crates/core/src/ring/live_tx.rs index 2a0988a1e..d9750683f 100644 --- a/crates/core/src/ring/live_tx.rs +++ b/crates/core/src/ring/live_tx.rs @@ -45,4 +45,8 @@ impl LiveTransactionTracker { pub(crate) fn still_alive(&self, tx: &Transaction) -> bool { self.tx_per_peer.iter().any(|e| e.value().contains(tx)) } + + pub(crate) fn len(&self) -> usize { + self.tx_per_peer.len() + } } diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index 8b81e84fb..ef9a04539 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -480,8 +480,10 @@ impl Ring { let neighbor_locations = { let peers = self.connection_manager.get_connections_by_location(); tracing::debug!( - "Maintenance task: current connections = {}, checking topology", - current_connections + "Maintenance task: current connections = {}, candidates = {}, live_tx_peers = {}", + current_connections, + peers.len(), + live_tx_tracker.len() ); peers .iter() From 729cf78608792b6e81a90a6522b30350b053a661 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Thu, 20 Nov 2025 18:33:01 -0600 Subject: [PATCH 14/18] fix: transient connection handling and viz tooling --- Cargo.lock | 19 +-- crates/core/Cargo.toml | 3 +- .../src/node/network_bridge/p2p_protoc.rs | 29 +++- crates/core/src/operations/connect.rs | 24 ++- crates/core/src/ring/connection.rs | 2 - crates/core/src/ring/connection_manager.rs | 139 +++++++----------- crates/core/src/ring/mod.rs | 73 +++++---- crates/core/src/topology/mod.rs | 48 +++++- .../core/src/transport/connection_handler.rs | 65 ++++---- crates/core/src/transport/crypto.rs | 12 +- 10 files changed, 238 insertions(+), 176 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f2dfb7f5..611aeb8bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1227,7 +1227,7 @@ dependencies = [ "libc", "option-ext", "redox_users 0.5.2", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -1429,7 +1429,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -1686,6 +1686,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "sha2", "sqlx", "statrs", "stretto", @@ -1813,7 +1814,7 @@ dependencies = [ [[package]] name = "freenet-test-network" -version = "0.1.2" +version = "0.1.3" dependencies = [ "anyhow", "chrono", @@ -2429,7 +2430,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.1", + "socket2 0.5.10", "system-configuration", "tokio", "tower-service", @@ -2681,7 +2682,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -3199,7 +3200,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -4320,7 +4321,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -5193,7 +5194,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -6443,7 +6444,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 98d266021..f86cfcc81 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -65,6 +65,7 @@ xz2 = { version = "0.1" } reqwest = { version = "0.12", features = ["json"] } rsa = { version = "0.9", features = ["serde", "pem"] } pkcs8 = { version = "0.10", features = ["std", "pem"] } +sha2 = "0.10" # Tracing deps opentelemetry = "0.31" @@ -90,7 +91,7 @@ arbitrary = { features = ["derive"], version = "1" } chrono = { features = ["arbitrary"], workspace = true } freenet-stdlib = { features = ["net", "testing"], workspace = true } freenet-macros = { path = "../freenet-macros" } -freenet-test-network = { version = "0.1.2", path = "../../../../freenet-test-network" } +freenet-test-network = { version = "0.1.3", path = "../../../../freenet-test-network" } httptest = "0.16" statrs = "0.18" tempfile = "3" diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 391560f66..473575ed1 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -635,6 +635,12 @@ impl P2pConnManager { "Failed to enqueue DropConnection command" ); } + // Immediately prune topology counters so we don't leak open connection slots. + ctx.bridge + .op_manager + .ring + .prune_connection(peer.clone()) + .await; if let Some(conn) = ctx.connections.remove(&peer) { // TODO: review: this could potentially leave garbage tasks in the background with peer listener match timeout( @@ -1716,11 +1722,32 @@ impl P2pConnManager { let pending_loc = connection_manager.prune_in_transit_connection(&peer_id); if !is_transient { let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr)); + // Re-apply admission logic on promotion to avoid bypassing capacity/heuristic checks. + let should_accept = connection_manager.should_accept(loc, &peer_id); + if !should_accept { + tracing::warn!( + %peer_id, + %loc, + "handle_successful_connection: promotion rejected by admission logic" + ); + return Ok(()); + } + let current = connection_manager.connection_count(); + if current >= connection_manager.max_connections { + tracing::warn!( + %peer_id, + current_connections = current, + max_connections = connection_manager.max_connections, + %loc, + "handle_successful_connection: rejecting new connection to enforce cap" + ); + return Ok(()); + } tracing::info!(remote = %peer_id, %loc, "handle_successful_connection: promoting connection into ring"); self.bridge .op_manager .ring - .add_connection(loc, peer_id.clone(), false) + .add_connection(loc, peer_id.clone(), true) .await; } else { // Update location now that we know it; budget was reserved before any work. diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 4d42d560c..4cf93d79b 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -215,6 +215,7 @@ impl RelayState { if !self.accepted_locally && ctx.should_accept(&self.request.joiner) { self.accepted_locally = true; let acceptor = ctx.self_location().clone(); + let dist = ring_distance(acceptor.location, self.request.joiner.location); let transient = ctx.transient_hint(&acceptor, &self.request.joiner); self.transient_hint = transient; actions.accept_response = Some(ConnectResponse { @@ -222,15 +223,27 @@ impl RelayState { transient, }); actions.expect_connection_from = Some(self.request.joiner.clone()); + tracing::info!( + acceptor_peer = %acceptor.peer, + joiner_peer = %self.request.joiner.peer, + acceptor_loc = ?acceptor.location, + joiner_loc = ?self.request.joiner.location, + ring_distance = ?dist, + transient, + "connect: acceptance issued" + ); } if self.forwarded_to.is_none() && self.request.ttl > 0 { match ctx.select_next_hop(self.request.desired_location, &self.request.visited) { Some(next) => { - tracing::debug!( + let dist = ring_distance(next.location, Some(self.request.desired_location)); + tracing::info!( target = %self.request.desired_location, ttl = self.request.ttl, next_peer = %next.peer, + next_loc = ?next.location, + ring_distance_to_target = ?dist, "connect: forwarding join request to next hop" ); let mut forward_req = self.request.clone(); @@ -242,7 +255,7 @@ impl RelayState { actions.forward = Some((next, forward_snapshot)); } None => { - tracing::debug!( + tracing::info!( target = %self.request.desired_location, ttl = self.request.ttl, visited = ?self.request.visited, @@ -768,6 +781,13 @@ fn store_operation_state_with_msg(op: &mut ConnectOp, msg: Option) - } } +fn ring_distance(a: Option, b: Option) -> Option { + match (a, b) { + (Some(a), Some(b)) => Some(a.distance(b).as_f64()), + _ => None, + } +} + #[tracing::instrument(fields(peer = %op_manager.ring.connection_manager.pub_key), skip_all)] pub(crate) async fn join_ring_request( backoff: Option, diff --git a/crates/core/src/ring/connection.rs b/crates/core/src/ring/connection.rs index 2629886d0..270d3c9e9 100644 --- a/crates/core/src/ring/connection.rs +++ b/crates/core/src/ring/connection.rs @@ -1,8 +1,6 @@ use super::PeerKeyLocation; -use std::time::Instant; #[derive(Clone, Debug)] pub struct Connection { pub(crate) location: PeerKeyLocation, - pub(crate) open_at: Instant, } diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 8d2744bf0..4e515f696 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -22,8 +22,7 @@ pub(crate) struct TransientEntry { #[derive(Clone)] pub(crate) struct ConnectionManager { - open_connections: Arc, - reserved_connections: Arc, + pending_reservations: Arc>>, pub(super) location_for_peer: Arc>>, pub(super) topology_manager: Arc>, connections_by_location: Arc>>>, @@ -124,8 +123,7 @@ impl ConnectionManager { Self { connections_by_location: Arc::new(RwLock::new(BTreeMap::new())), location_for_peer: Arc::new(RwLock::new(BTreeMap::new())), - open_connections: Arc::new(AtomicUsize::new(0)), - reserved_connections: Arc::new(AtomicUsize::new(0)), + pending_reservations: Arc::new(RwLock::new(BTreeMap::new())), topology_manager, own_location: own_location.into(), peer_key: Arc::new(Mutex::new(peer_id)), @@ -148,12 +146,8 @@ impl ConnectionManager { /// Will panic if the node checking for this condition has no location assigned. pub fn should_accept(&self, location: Location, peer_id: &PeerId) -> bool { tracing::info!("Checking if should accept connection"); - let open = self - .open_connections - .load(std::sync::atomic::Ordering::SeqCst); - let reserved_before = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst); + let open = self.connection_count(); + let reserved_before = self.pending_reservations.read().len(); tracing::info!( %peer_id, @@ -175,34 +169,16 @@ impl ConnectionManager { ); } - let reserved_before = loop { - let current = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst); - if current == usize::MAX { - tracing::error!( - %peer_id, - "reserved connection counter overflowed; rejecting new connection" - ); - return false; - } - match self.reserved_connections.compare_exchange( - current, - current + 1, - std::sync::atomic::Ordering::SeqCst, - std::sync::atomic::Ordering::SeqCst, - ) { - Ok(_) => break current, - Err(actual) => { - tracing::debug!( - %peer_id, - expected = current, - actual, - "reserved connection counter changed concurrently; retrying" - ); - } - } - }; + if self.location_for_peer.read().get(peer_id).is_some() { + // We've already accepted this peer (pending or active); treat as a no-op acceptance. + tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); + return true; + } + + { + let mut pending = self.pending_reservations.write(); + pending.insert(peer_id.clone(), location); + } let total_conn = match reserved_before .checked_add(1) @@ -216,8 +192,7 @@ impl ConnectionManager { open, "connection counters would overflow; rejecting connection" ); - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.pending_reservations.write().remove(peer_id); return false; } }; @@ -238,19 +213,12 @@ impl ConnectionManager { limit = GATEWAY_DIRECT_ACCEPT_LIMIT, "Gateway reached direct-accept limit; forwarding join request instead" ); - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.pending_reservations.write().remove(peer_id); tracing::info!(%peer_id, "should_accept: gateway direct-accept limit hit, forwarding instead"); return false; } } - if self.location_for_peer.read().get(peer_id).is_some() { - // We've already accepted this peer (pending or active); treat as a no-op acceptance. - tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); - return true; - } - let accepted = if total_conn < self.min_connections { tracing::info!(%peer_id, total_conn, "should_accept: accepted (below min connections)"); true @@ -277,14 +245,13 @@ impl ConnectionManager { accepted, total_conn, open_connections = open, - reserved_connections = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst), + reserved_connections = self.pending_reservations.read().len(), + max_connections = self.max_connections, + min_connections = self.min_connections, "should_accept: final decision" ); if !accepted { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.pending_reservations.write().remove(peer_id); } else { tracing::info!(%peer_id, total_conn, "should_accept: accepted (reserving spot)"); self.record_pending_location(peer_id, location); @@ -453,20 +420,30 @@ impl ConnectionManager { tracing::info!(%peer, %loc, %was_reserved, "Adding connection to topology"); debug_assert!(self.get_peer_key().expect("should be set") != peer); if was_reserved { - let old = self - .reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - #[cfg(debug_assertions)] - { - tracing::debug!(old, "Decremented reserved connections"); - if old == 0 { - panic!("Underflow of reserved connections"); + self.pending_reservations.write().remove(&peer); + } + let mut lop = self.location_for_peer.write(); + let previous_location = lop.insert(peer.clone(), loc); + drop(lop); + + if let Some(prev_loc) = previous_location { + tracing::info!( + %peer, + %prev_loc, + %loc, + "add_connection: replacing existing connection for peer" + ); + let mut cbl = self.connections_by_location.write(); + if let Some(prev_list) = cbl.get_mut(&prev_loc) { + if let Some(pos) = prev_list.iter().position(|c| c.location.peer == peer) { + prev_list.swap_remove(pos); + } + if prev_list.is_empty() { + cbl.remove(&prev_loc); } } - let _ = old; } - let mut lop = self.location_for_peer.write(); - lop.insert(peer.clone(), loc); + { let mut cbl = self.connections_by_location.write(); cbl.entry(loc).or_default().push(Connection { @@ -474,12 +451,8 @@ impl ConnectionManager { peer: peer.clone(), location: Some(loc), }, - open_at: Instant::now(), }); } - self.open_connections - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - std::mem::drop(lop); } pub fn update_peer_identity(&self, old_peer: &PeerId, new_peer: PeerId) -> bool { @@ -519,7 +492,6 @@ impl ConnectionManager { peer: new_peer, location: Some(loc), }, - open_at: Instant::now(), }); } @@ -537,17 +509,12 @@ impl ConnectionManager { tracing::debug!("no location found for peer, skip pruning"); return None; } else { - let prev = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst); - if prev == 0 { + let removed = self.pending_reservations.write().remove(peer).is_some(); + if !removed { tracing::warn!( %peer, - "prune_connection: no reserved slots to release for in-transit peer" + "prune_connection: no pending reservation to release for in-transit peer" ); - } else { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); } } return None; @@ -560,20 +527,20 @@ impl ConnectionManager { } } - if is_alive { - self.open_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - } else { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + if !is_alive { + self.pending_reservations.write().remove(peer); } Some(loc) } - pub(super) fn get_open_connections(&self) -> usize { - self.open_connections - .load(std::sync::atomic::Ordering::SeqCst) + pub(crate) fn connection_count(&self) -> usize { + // Count only established connections tracked by location buckets. + self.connections_by_location + .read() + .values() + .map(|conns| conns.len()) + .sum() } pub(super) fn get_connections_by_location(&self) -> BTreeMap> { diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index ef9a04539..210b16e39 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -3,7 +3,7 @@ //! Mainly maintains a healthy and optimal pool of connections to other peers in the network //! and routes requests to the optimal peers. -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::{ sync::{atomic::AtomicU64, Arc, Weak}, time::{Duration, Instant}, @@ -157,7 +157,7 @@ impl Ring { } pub fn open_connections(&self) -> usize { - self.connection_manager.get_open_connections() + self.connection_manager.connection_count() } async fn refresh_router(router: Arc>, register: ER) { @@ -383,11 +383,6 @@ impl Ring { tracing::info!("Initializing connection maintenance task"); let is_gateway = self.is_gateway; tracing::info!(is_gateway, "Connection maintenance task starting"); - #[cfg(not(test))] - const CONNECTION_AGE_THRESOLD: Duration = Duration::from_secs(60 * 5); - #[cfg(test)] - const CONNECTION_AGE_THRESOLD: Duration = Duration::from_secs(5); - #[cfg(not(test))] const CHECK_TICK_DURATION: Duration = Duration::from_secs(60); #[cfg(test)] @@ -458,7 +453,7 @@ impl Ring { error })?; if live_tx.is_none() { - let conns = self.connection_manager.get_open_connections(); + let conns = self.connection_manager.connection_count(); tracing::warn!( "acquire_new returned None - likely no peers to query through (connections: {})", conns @@ -475,32 +470,50 @@ impl Ring { } } - let current_connections = self.connection_manager.get_open_connections(); + let current_connections = self.connection_manager.connection_count(); let pending_connection_targets = pending_conn_adds.len(); - let neighbor_locations = { - let peers = self.connection_manager.get_connections_by_location(); - tracing::debug!( - "Maintenance task: current connections = {}, candidates = {}, live_tx_peers = {}", + let peers = self.connection_manager.get_connections_by_location(); + let connections_considered: usize = peers.values().map(|c| c.len()).sum(); + + let mut neighbor_locations: BTreeMap<_, Vec<_>> = peers + .iter() + .map(|(loc, conns)| { + let conns: Vec<_> = conns + .iter() + .filter(|conn| !live_tx_tracker.has_live_connection(&conn.location.peer)) + .cloned() + .collect(); + (*loc, conns) + }) + .filter(|(_, conns)| !conns.is_empty()) + .collect(); + + if neighbor_locations.is_empty() && connections_considered > 0 { + tracing::warn!( current_connections, - peers.len(), - live_tx_tracker.len() + connections_considered, + live_tx_peers = live_tx_tracker.len(), + "Neighbor filtering removed all candidates; using all connections" ); - peers + + neighbor_locations = peers .iter() - .map(|(loc, conns)| { - let conns: Vec<_> = conns - .iter() - .filter(|conn| { - conn.open_at.elapsed() > CONNECTION_AGE_THRESOLD - && !live_tx_tracker.has_live_connection(&conn.location.peer) - }) - .cloned() - .collect(); - (*loc, conns) - }) + .map(|(loc, conns)| (*loc, conns.clone())) .filter(|(_, conns)| !conns.is_empty()) - .collect() - }; + .collect(); + } + + if current_connections > self.connection_manager.max_connections { + // When over capacity, consider all connections for removal regardless of live_tx filter. + neighbor_locations = peers.clone(); + } + + tracing::debug!( + "Maintenance task: current connections = {}, candidates = {}, live_tx_peers = {}", + current_connections, + peers.len(), + live_tx_tracker.len() + ); let adjustment = self .connection_manager @@ -589,7 +602,7 @@ impl Ring { live_tx_tracker: &LiveTransactionTracker, op_manager: &Arc, ) -> anyhow::Result> { - let current_connections = self.connection_manager.get_open_connections(); + let current_connections = self.connection_manager.connection_count(); let is_gateway = self.is_gateway; tracing::info!( diff --git a/crates/core/src/topology/mod.rs b/crates/core/src/topology/mod.rs index 6ab1531a9..447302010 100644 --- a/crates/core/src/topology/mod.rs +++ b/crates/core/src/topology/mod.rs @@ -9,7 +9,7 @@ use std::{ collections::{BTreeMap, HashMap}, time::Instant, }; -use tracing::{debug, error, event, info, span, Level}; +use tracing::{debug, error, event, info, span, warn, Level}; pub mod connection_evaluator; mod constants; @@ -449,6 +449,28 @@ impl TopologyManager { } } + if current_connections > self.limits.max_connections { + let mut adj = adjustment.unwrap_or(TopologyAdjustment::NoChange); + if matches!(adj, TopologyAdjustment::NoChange) { + if let Some(peer) = select_fallback_peer_to_drop(neighbor_locations, my_location) { + info!( + current_connections, + max_allowed = self.limits.max_connections, + %peer.peer, + "Enforcing max-connections cap via fallback removal" + ); + adj = TopologyAdjustment::RemoveConnections(vec![peer]); + } else { + warn!( + current_connections, + max_allowed = self.limits.max_connections, + "Over capacity but no removable peer found; leaving topology unchanged" + ); + } + } + return adj; + } + adjustment.unwrap_or(TopologyAdjustment::NoChange) } @@ -584,6 +606,30 @@ impl TopologyManager { } } +fn select_fallback_peer_to_drop( + neighbor_locations: &BTreeMap>, + my_location: &Option, +) -> Option { + let mut candidate: Option<(PeerKeyLocation, f64)> = None; + for (loc, conns) in neighbor_locations.iter() { + for conn in conns { + let score = match my_location { + Some(me) => me.distance(*loc).as_f64(), + None => 0.0, + }; + if let Some((_, best_score)) = &mut candidate { + if score > *best_score { + *best_score = score; + candidate = Some((conn.location.clone(), score)); + } + } else { + candidate = Some((conn.location.clone(), score)); + } + } + } + candidate.map(|(peer, _)| peer) +} + #[derive(PartialEq, Debug, Clone, Copy)] pub(crate) enum ConnectionAcquisitionStrategy { /// Acquire new connections slowly, be picky diff --git a/crates/core/src/transport/connection_handler.rs b/crates/core/src/transport/connection_handler.rs index c9aa84132..7b26e75fd 100644 --- a/crates/core/src/transport/connection_handler.rs +++ b/crates/core/src/transport/connection_handler.rs @@ -416,46 +416,41 @@ impl UdpPacketsListener { continue; } - if !self.is_gateway { - let allow = self.expected_non_gateway.contains(&remote_addr.ip()); - let gateway_allow = self - .known_gateway_addrs - .as_ref() - .map(|set| set.contains(&remote_addr)) - .unwrap_or(false); - if !allow && gateway_allow { - tracing::debug!( - %remote_addr, - "allowing inbound handshake from known gateway without prior expectation" - ); - } - if !allow && !gateway_allow { - tracing::warn!( - %remote_addr, - %size, - "unexpected packet from non-gateway node; dropping intro packet" - ); - self.expected_non_gateway.insert(remote_addr.ip()); + let is_known_gateway = self + .known_gateway_addrs + .as_ref() + .map(|set| set.contains(&remote_addr)) + .unwrap_or(false); + + if self.is_gateway || is_known_gateway { + // Handle gateway-intro packets (peer -> gateway) + + // Check if we already have a gateway connection in progress + if ongoing_gw_connections.contains_key(&remote_addr) { + tracing::debug!(%remote_addr, "gateway connection already in progress, ignoring duplicate packet"); continue; } - } - // Check if we already have a gateway connection in progress - if ongoing_gw_connections.contains_key(&remote_addr) { - tracing::debug!(%remote_addr, "gateway connection already in progress, ignoring duplicate packet"); + let inbound_key_bytes = key_from_addr(&remote_addr); + let (gw_ongoing_connection, packets_sender) = self.gateway_connection(packet_data, remote_addr, inbound_key_bytes); + let task = tokio::spawn(gw_ongoing_connection + .instrument(tracing::span!(tracing::Level::DEBUG, "gateway_connection")) + .map_err(move |error| { + tracing::warn!(%remote_addr, %error, "gateway connection error"); + (error, remote_addr) + })); + ongoing_gw_connections.insert(remote_addr, packets_sender); + gw_connection_tasks.push(task); + continue; + } else { + // Non-gateway peers: mark as expected and wait for the normal peer handshake flow. + self.expected_non_gateway.insert(remote_addr.ip()); + tracing::debug!( + %remote_addr, + "unexpected peer intro; marking expected_non_gateway" + ); continue; } - - let inbound_key_bytes = key_from_addr(&remote_addr); - let (gw_ongoing_connection, packets_sender) = self.gateway_connection(packet_data, remote_addr, inbound_key_bytes); - let task = tokio::spawn(gw_ongoing_connection - .instrument(tracing::span!(tracing::Level::DEBUG, "gateway_connection")) - .map_err(move |error| { - tracing::warn!(%remote_addr, %error, "gateway connection error"); - (error, remote_addr) - })); - ongoing_gw_connections.insert(remote_addr, packets_sender); - gw_connection_tasks.push(task); } Err(e) => { tracing::error!("Failed to receive UDP packet: {:?}", e); diff --git a/crates/core/src/transport/crypto.rs b/crates/core/src/transport/crypto.rs index 79cb3673b..342d3791e 100644 --- a/crates/core/src/transport/crypto.rs +++ b/crates/core/src/transport/crypto.rs @@ -6,6 +6,7 @@ use rsa::{ Pkcs1v15Encrypt, RsaPrivateKey, RsaPublicKey, }; use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct TransportKeypair { @@ -112,15 +113,8 @@ impl std::fmt::Display for TransportPublicKey { use pkcs8::EncodePublicKey; let encoded = self.0.to_public_key_der().map_err(|_| std::fmt::Error)?; - if encoded.as_bytes().len() >= 16 { - let bytes = encoded.as_bytes(); - let first_six = &bytes[..6]; - let last_six = &bytes[bytes.len() - 6..]; - let to_encode = [first_six, last_six].concat(); - write!(f, "{}", bs58::encode(to_encode).into_string()) - } else { - write!(f, "{}", bs58::encode(encoded.as_bytes()).into_string()) - } + let digest = Sha256::digest(encoded.as_bytes()); + write!(f, "{}", bs58::encode(digest).into_string()) } } From 54ec5db10733ac0920f5d0f4177930e198214e1c Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Thu, 20 Nov 2025 19:02:55 -0600 Subject: [PATCH 15/18] fix: enforce caps on transient promotion and add cap repro test --- .../src/node/network_bridge/p2p_protoc.rs | 49 ++++++++++++++++++- crates/core/src/ring/connection_manager.rs | 16 ++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 473575ed1..a01c0653b 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1287,10 +1287,57 @@ impl P2pConnManager { let loc = entry .location .unwrap_or_else(|| Location::from_address(&peer.addr)); + // Re-run admission + cap guard when promoting a transient connection. + let should_accept = connection_manager.should_accept(loc, &peer); + if !should_accept { + tracing::warn!( + tx = %tx, + %peer, + %loc, + "connect_peer: promotion rejected by admission logic" + ); + callback + .send_result(Err(())) + .await + .inspect_err(|err| { + tracing::debug!( + tx = %tx, + remote = %peer, + ?err, + "connect_peer: failed to notify rejected-promotion callback" + ); + }) + .ok(); + return Ok(()); + } + let current = connection_manager.connection_count(); + if current >= connection_manager.max_connections { + tracing::warn!( + tx = %tx, + %peer, + current_connections = current, + max_connections = connection_manager.max_connections, + %loc, + "connect_peer: rejecting transient promotion to enforce cap" + ); + callback + .send_result(Err(())) + .await + .inspect_err(|err| { + tracing::debug!( + tx = %tx, + remote = %peer, + ?err, + "connect_peer: failed to notify cap-rejection callback" + ); + }) + .ok(); + return Ok(()); + } self.bridge .op_manager .ring - .add_connection(loc, peer.clone(), false) + .add_connection(loc, peer.clone(), true) .await; tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient"); } diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 4e515f696..e0260fd56 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -426,6 +426,22 @@ impl ConnectionManager { let previous_location = lop.insert(peer.clone(), loc); drop(lop); + // Enforce the global cap when adding a new peer (not a relocation). + if previous_location.is_none() && self.connection_count() >= self.max_connections { + tracing::warn!( + %peer, + %loc, + max = self.max_connections, + "add_connection: rejecting new connection to enforce cap" + ); + // Roll back bookkeeping since we're refusing the connection. + self.location_for_peer.write().remove(&peer); + if was_reserved { + self.pending_reservations.write().remove(&peer); + } + return; + } + if let Some(prev_loc) = previous_location { tracing::info!( %peer, From 9340e3b813a27a649deb8c3ceb135b545de4c9c1 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Thu, 20 Nov 2025 19:04:31 -0600 Subject: [PATCH 16/18] test: add small cap repro harness --- crates/core/tests/connection_cap.rs | 38 +++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 crates/core/tests/connection_cap.rs diff --git a/crates/core/tests/connection_cap.rs b/crates/core/tests/connection_cap.rs new file mode 100644 index 000000000..82186c451 --- /dev/null +++ b/crates/core/tests/connection_cap.rs @@ -0,0 +1,38 @@ +//! Minimal repro harness for connection-cap enforcement. +//! +//! This test spins up a tiny network (2 gateways + 6 peers) with a low max-connections +//! setting (min=5, max=6) and waits for connectivity. It then inspects diagnostics to +//! ensure no peer reports more than `max` connections. This is intended to quickly catch +//! admission/cap bypass regressions without running the full soak. + +use freenet_test_network::{BuildProfile, FreenetBinary, NetworkBuilder}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn connection_cap_respected() -> anyhow::Result<()> { + let max_connections = 6usize; + let net = NetworkBuilder::new() + .gateways(2) + .peers(6) + .min_connections(4) + .max_connections(max_connections) + .start_stagger(std::time::Duration::from_millis(300)) + .require_connectivity(0.9) + .connectivity_timeout(std::time::Duration::from_secs(40)) + .binary(FreenetBinary::CurrentCrate(BuildProfile::Debug)) + .build() + .await?; + + let snapshot = net.collect_diagnostics().await?; + for peer in snapshot.peers { + let count = peer.connected_peer_ids.len(); + assert!( + count <= max_connections, + "peer {} exceeds max connections ({} > {})", + peer.peer_id, + count, + max_connections + ); + } + + Ok(()) +} From ae7f98d51861f0a984f3d1d9761ecda8d59aa275 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Thu, 20 Nov 2025 19:23:25 -0600 Subject: [PATCH 17/18] fix: report ring connections in diagnostics and bound soak caps --- .../src/node/network_bridge/p2p_protoc.rs | 61 +++++++++++++------ crates/core/src/ring/connection_manager.rs | 2 +- crates/core/tests/large_network.rs | 16 +++++ 3 files changed, 59 insertions(+), 20 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index a01c0653b..9f4f03e9b 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -834,15 +834,23 @@ impl P2pConnManager { // Collect network information if config.include_network_info { - let connected_peers: Vec<_> = ctx - .connections - .keys() - .map(|p| (p.to_string(), p.addr.to_string())) - .collect(); + let cm = &op_manager.ring.connection_manager; + let connections_by_loc = cm.get_connections_by_location(); + let mut connected_peers = Vec::new(); + for conns in connections_by_loc.values() { + for conn in conns { + connected_peers.push(( + conn.location.peer.to_string(), + conn.location.peer.addr.to_string(), + )); + } + } + connected_peers.sort_by(|a, b| a.0.cmp(&b.0)); + connected_peers.dedup_by(|a, b| a.0 == b.0); response.network_info = Some(NetworkInfo { + active_connections: connected_peers.len(), connected_peers, - active_connections: ctx.connections.len(), }); } @@ -921,28 +929,43 @@ impl P2pConnManager { } } + // Collect topology-backed connection info (exclude transient transports). + let cm = &op_manager.ring.connection_manager; + let connections_by_loc = cm.get_connections_by_location(); + let mut connected_peer_ids = Vec::new(); + if config.include_detailed_peer_info { + use freenet_stdlib::client_api::ConnectedPeerInfo; + for conns in connections_by_loc.values() { + for conn in conns { + connected_peer_ids.push(conn.location.peer.to_string()); + response.connected_peers_detailed.push( + ConnectedPeerInfo { + peer_id: conn.location.peer.to_string(), + address: conn.location.peer.addr.to_string(), + }, + ); + } + } + } else { + for conns in connections_by_loc.values() { + connected_peer_ids.extend( + conns.iter().map(|c| c.location.peer.to_string()), + ); + } + } + connected_peer_ids.sort(); + connected_peer_ids.dedup(); + // Collect system metrics if config.include_system_metrics { let seeding_contracts = op_manager.ring.all_network_subscriptions().len() as u32; response.system_metrics = Some(SystemMetrics { - active_connections: ctx.connections.len() as u32, + active_connections: connected_peer_ids.len() as u32, seeding_contracts, }); } - // Collect detailed peer information if requested - if config.include_detailed_peer_info { - use freenet_stdlib::client_api::ConnectedPeerInfo; - // Populate detailed peer information from actual connections - for peer in ctx.connections.keys() { - response.connected_peers_detailed.push(ConnectedPeerInfo { - peer_id: peer.to_string(), - address: peer.addr.to_string(), - }); - } - } - match timeout( Duration::from_secs(2), callback.send(QueryResult::NodeDiagnostics(response)), diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index e0260fd56..214f9715f 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -559,7 +559,7 @@ impl ConnectionManager { .sum() } - pub(super) fn get_connections_by_location(&self) -> BTreeMap> { + pub(crate) fn get_connections_by_location(&self) -> BTreeMap> { self.connections_by_location.read().clone() } diff --git a/crates/core/tests/large_network.rs b/crates/core/tests/large_network.rs index 20970a647..dc2f6d1d3 100644 --- a/crates/core/tests/large_network.rs +++ b/crates/core/tests/large_network.rs @@ -38,6 +38,8 @@ const DEFAULT_PEER_COUNT: usize = 38; const DEFAULT_SNAPSHOT_INTERVAL: Duration = Duration::from_secs(60); const DEFAULT_SNAPSHOT_ITERATIONS: usize = 5; const DEFAULT_CONNECTIVITY_TARGET: f64 = 0.75; +const DEFAULT_MIN_CONNECTIONS: usize = 5; +const DEFAULT_MAX_CONNECTIONS: usize = 7; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[ignore = "Large soak test - run manually (see file header for instructions)"] @@ -59,10 +61,20 @@ async fn large_network_soak() -> anyhow::Result<()> { .ok() .and_then(|val| val.parse::().ok()) .unwrap_or(DEFAULT_CONNECTIVITY_TARGET); + let min_connections = env::var("SOAK_MIN_CONNECTIONS") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_MIN_CONNECTIONS); + let max_connections = env::var("SOAK_MAX_CONNECTIONS") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(DEFAULT_MAX_CONNECTIONS); let network = TestNetwork::builder() .gateways(2) .peers(peer_count) + .min_connections(min_connections) + .max_connections(max_connections) .require_connectivity(connectivity_target) .connectivity_timeout(Duration::from_secs(120)) .preserve_temp_dirs_on_failure(true) @@ -78,6 +90,10 @@ async fn large_network_soak() -> anyhow::Result<()> { peer_count, network.run_root().display() ); + println!( + "Min connections: {}, max connections: {} (override via SOAK_MIN_CONNECTIONS / SOAK_MAX_CONNECTIONS)", + min_connections, max_connections + ); let riverctl_path = which("riverctl") .context("riverctl not found in PATH; install via `cargo install riverctl`")?; From c084c205f8cf840cec1e10ce621991d2af6eaf7b Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Thu, 20 Nov 2025 20:03:20 -0600 Subject: [PATCH 18/18] test: add warmup and ring snapshots to soak --- crates/core/tests/large_network.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/crates/core/tests/large_network.rs b/crates/core/tests/large_network.rs index dc2f6d1d3..53bf5eb46 100644 --- a/crates/core/tests/large_network.rs +++ b/crates/core/tests/large_network.rs @@ -37,6 +37,7 @@ use which::which; const DEFAULT_PEER_COUNT: usize = 38; const DEFAULT_SNAPSHOT_INTERVAL: Duration = Duration::from_secs(60); const DEFAULT_SNAPSHOT_ITERATIONS: usize = 5; +const DEFAULT_SNAPSHOT_WARMUP: Duration = Duration::from_secs(60); const DEFAULT_CONNECTIVITY_TARGET: f64 = 0.75; const DEFAULT_MIN_CONNECTIONS: usize = 5; const DEFAULT_MAX_CONNECTIONS: usize = 7; @@ -61,6 +62,11 @@ async fn large_network_soak() -> anyhow::Result<()> { .ok() .and_then(|val| val.parse::().ok()) .unwrap_or(DEFAULT_CONNECTIVITY_TARGET); + let snapshot_warmup = env::var("SOAK_SNAPSHOT_WARMUP_SECS") + .ok() + .and_then(|val| val.parse().ok()) + .map(Duration::from_secs) + .unwrap_or(DEFAULT_SNAPSHOT_WARMUP); let min_connections = env::var("SOAK_MIN_CONNECTIONS") .ok() .and_then(|val| val.parse().ok()) @@ -105,6 +111,13 @@ async fn large_network_soak() -> anyhow::Result<()> { let snapshots_dir = network.run_root().join("large-soak"); fs::create_dir_all(&snapshots_dir)?; + // Allow topology maintenance to run before the first snapshot. + println!( + "Waiting {:?} before first snapshot to allow topology maintenance to converge", + snapshot_warmup + ); + sleep(snapshot_warmup).await; + let mut iteration = 0usize; let mut next_tick = Instant::now(); while iteration < snapshot_iterations { @@ -113,6 +126,11 @@ async fn large_network_soak() -> anyhow::Result<()> { let snapshot_path = snapshots_dir.join(format!("snapshot-{iteration:02}.json")); fs::write(&snapshot_path, to_string_pretty(&snapshot)?)?; + // Also capture ring topology for visualizing evolution over time. + let ring_snapshot = network.ring_snapshot().await?; + let ring_path = snapshots_dir.join(format!("ring-{iteration:02}.json")); + fs::write(&ring_path, to_string_pretty(&ring_snapshot)?)?; + let healthy = snapshot .peers .iter()