From 0a08ba3dc636bfb2ec66d150049ffb008b791219 Mon Sep 17 00:00:00 2001 From: hsantos Date: Fri, 5 Dec 2025 18:00:45 +0100 Subject: [PATCH 01/12] rebase changes --- apps/freenet-ping/Cargo.lock | 2 +- crates/core/src/operations/subscribe.rs | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/apps/freenet-ping/Cargo.lock b/apps/freenet-ping/Cargo.lock index 37c540d55..ec50d48e0 100644 --- a/apps/freenet-ping/Cargo.lock +++ b/apps/freenet-ping/Cargo.lock @@ -1302,7 +1302,7 @@ dependencies = [ [[package]] name = "freenet" -version = "0.1.42" +version = "0.1.43" dependencies = [ "aes-gcm", "ahash", diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 81f234cd8..708f91bdb 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -309,7 +309,12 @@ async fn complete_local_subscription( key, subscribed: true, }) - .await + .await?; + + // Mark operation as completed. This will handle parent completion if this is a sub-operation. + op_manager.completed(id); + + Ok(()) } pub(crate) struct SubscribeOp { From c506498fcd762762b8f7296aa6daa04faa576725 Mon Sep 17 00:00:00 2001 From: hsantos Date: Thu, 20 Nov 2025 17:17:43 +0100 Subject: [PATCH 02/12] #2106 refactor subscription listener registration for GET/PUT operations --- .../app/tests/run_app_blocked_peers.rs | 3 - crates/core/src/client_events/mod.rs | 159 +++++++++++------- 2 files changed, 95 insertions(+), 67 deletions(-) diff --git a/apps/freenet-ping/app/tests/run_app_blocked_peers.rs b/apps/freenet-ping/app/tests/run_app_blocked_peers.rs index d6676c951..4e6924bfe 100644 --- a/apps/freenet-ping/app/tests/run_app_blocked_peers.rs +++ b/apps/freenet-ping/app/tests/run_app_blocked_peers.rs @@ -788,7 +788,6 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { /// Standard blocked peers test (baseline) #[test_log::test(tokio::test(flavor = "multi_thread"))] -#[ignore] async fn test_ping_blocked_peers() -> TestResult { run_blocked_peers_test(BlockedPeersConfig { test_name: "baseline", @@ -808,7 +807,6 @@ async fn test_ping_blocked_peers() -> TestResult { /// Simple blocked peers test #[test_log::test(tokio::test(flavor = "multi_thread"))] -#[ignore] async fn test_ping_blocked_peers_simple() -> TestResult { run_blocked_peers_test(BlockedPeersConfig { test_name: "simple", @@ -835,7 +833,6 @@ async fn test_ping_blocked_peers_simple() -> TestResult { // fails with "Connection reset without closing handshake" during cleanup. // Likely a test teardown race rather than functional bug. #[test_log::test(tokio::test(flavor = "multi_thread"))] -#[ignore] async fn test_ping_blocked_peers_solution() -> TestResult { run_blocked_peers_test(BlockedPeersConfig { test_name: "solution", diff --git a/crates/core/src/client_events/mod.rs b/crates/core/src/client_events/mod.rs index 9c38ed737..193a4bf0b 100644 --- a/crates/core/src/client_events/mod.rs +++ b/crates/core/src/client_events/mod.rs @@ -203,6 +203,47 @@ pub trait ClientEventsProxy { ) -> BoxFuture<'_, Result<(), ClientError>>; } +/// Helper function to register a subscription listener for GET/PUT operations with auto-subscribe +async fn register_subscription_listener( + op_manager: &OpManager, + key: ContractKey, + client_id: ClientId, + subscription_listener: UnboundedSender, + operation_type: &str, +) -> Result<(), Error> { + tracing::debug!(%client_id, %key, "Registering subscription for {} with auto-subscribe", operation_type); + let register_listener = op_manager + .notify_contract_handler(ContractHandlerEvent::RegisterSubscriberListener { + key, + client_id, + summary: None, // No summary for GET/PUT-based subscriptions + subscriber_listener: subscription_listener, + }) + .await + .inspect_err(|err| { + tracing::error!( + %client_id, %key, + "Register subscriber listener error for {}: {}", operation_type, err + ); + }); + match register_listener { + Ok(ContractHandlerEvent::RegisterSubscriberListenerResponse) => { + tracing::debug!( + %client_id, %key, + "Subscriber listener registered successfully for {}", operation_type + ); + Ok(()) + } + _ => { + tracing::error!( + %client_id, %key, + "Subscriber listener registration failed for {}", operation_type + ); + Err(Error::Op(OpError::UnexpectedOpState)) + } + } +} + /// Process client events. /// /// # Architecture: Dual-Mode Client Handling @@ -222,6 +263,8 @@ where ClientEv: ClientEventsProxy + Send + 'static, { // Create RequestRouter for centralized request deduplication + // NOTE: This is ALWAYS Some, so the legacy mode code paths below are never executed. + // TODO: Consider removing legacy mode in a future PR and making request_router non-optional. let request_router = Some(std::sync::Arc::new(crate::node::RequestRouter::new())); let mut results = FuturesUnordered::new(); loop { @@ -564,6 +607,8 @@ async fn process_open_request( ); } } else { + // TODO: This legacy mode block is never executed since request_router is always Some. + // Consider removing in a future PR. tracing::debug!( peer_id = %peer_id, key = %contract_key, @@ -613,39 +658,14 @@ async fn process_open_request( // Register subscription listener if subscribe=true if subscribe { if let Some(subscription_listener) = subscription_listener { - tracing::debug!(%client_id, %contract_key, "Registering subscription for PUT with auto-subscribe"); - let register_listener = op_manager - .notify_contract_handler( - ContractHandlerEvent::RegisterSubscriberListener { - key: contract_key, - client_id, - summary: None, // No summary for PUT-based subscriptions - subscriber_listener: subscription_listener, - }, - ) - .await - .inspect_err(|err| { - tracing::error!( - %client_id, %contract_key, - "Register subscriber listener error for PUT: {}", err - ); - }); - match register_listener { - Ok( - ContractHandlerEvent::RegisterSubscriberListenerResponse, - ) => { - tracing::debug!( - %client_id, %contract_key, - "Subscriber listener registered successfully for PUT" - ); - } - _ => { - tracing::error!( - %client_id, %contract_key, - "Subscriber listener registration failed for PUT" - ); - } - } + register_subscription_listener( + &op_manager, + contract_key, + client_id, + subscription_listener, + "PUT", + ) + .await?; } else { tracing::warn!(%client_id, %contract_key, "PUT with subscribe=true but no subscription_listener"); } @@ -889,37 +909,14 @@ async fn process_open_request( // Handle subscription for locally found contracts if subscribe { if let Some(subscription_listener) = subscription_listener { - tracing::debug!(%client_id, %key, "Subscribing to locally found contract"); - let register_listener = op_manager - .notify_contract_handler( - ContractHandlerEvent::RegisterSubscriberListener { - key, - client_id, - summary: None, // No summary for GET-based subscriptions - subscriber_listener: subscription_listener, - }, - ) - .await - .inspect_err(|err| { - tracing::error!( - %client_id, %key, - "Register subscriber listener error for local GET: {}", err - ); - }); - match register_listener { - Ok(ContractHandlerEvent::RegisterSubscriberListenerResponse) => { - tracing::debug!( - %client_id, %key, - "Subscriber listener registered successfully for local GET" - ); - } - _ => { - tracing::error!( - %client_id, %key, - "Subscriber listener registration failed for local GET" - ); - } - } + register_subscription_listener( + &op_manager, + key, + client_id, + subscription_listener, + "local GET", + ) + .await?; } else { tracing::warn!(%client_id, %key, "GET with subscribe=true but no subscription_listener"); } @@ -1010,7 +1007,25 @@ async fn process_open_request( "Reusing existing GET operation via RequestRouter - client registered for result", ); } + + // Register subscription listener if subscribe=true + if subscribe { + if let Some(subscription_listener) = subscription_listener { + register_subscription_listener( + &op_manager, + key, + client_id, + subscription_listener, + "network GET", + ) + .await?; + } else { + tracing::warn!(%client_id, %key, "GET with subscribe=true but no subscription_listener"); + } + } } else { + // TODO: This legacy mode block is never executed since request_router is always Some. + // Consider removing in a future PR. tracing::debug!( this_peer = %peer_id, "Contract not found, starting direct GET operation (legacy mode)", @@ -1050,6 +1065,22 @@ async fn process_open_request( ); } } + + // Register subscription listener if subscribe=true + if subscribe { + if let Some(subscription_listener) = subscription_listener { + register_subscription_listener( + &op_manager, + key, + client_id, + subscription_listener, + "legacy GET", + ) + .await?; + } else { + tracing::warn!(%client_id, %key, "GET with subscribe=true but no subscription_listener"); + } + } } } ContractRequest::Subscribe { key, summary } => { From f7f2fc693e62d5ffb23a5ce0a09df325481a00c5 Mon Sep 17 00:00:00 2001 From: hsantos Date: Thu, 20 Nov 2025 20:06:23 +0100 Subject: [PATCH 03/12] #2106 address WebSocket backpressure in blocked peers tests with check_interval --- .../app/tests/run_app_blocked_peers.rs | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/apps/freenet-ping/app/tests/run_app_blocked_peers.rs b/apps/freenet-ping/app/tests/run_app_blocked_peers.rs index 4e6924bfe..a4503f7b3 100644 --- a/apps/freenet-ping/app/tests/run_app_blocked_peers.rs +++ b/apps/freenet-ping/app/tests/run_app_blocked_peers.rs @@ -789,6 +789,24 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { /// Standard blocked peers test (baseline) #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_ping_blocked_peers() -> TestResult { + // FIXME: WebSocket backpressure issue requires check_interval workaround + // + // Problem: Sending multiple update rounds without reading responses causes WebSocket + // buffer to fill, making .send() block indefinitely. This happens with even 1 round + // of updates (3 initial + 3 round 1 = 6 total messages). + // + // Systematic testing showed: + // - TEST 1 (subscribe_immediately only): FAILS - blocks at Gateway round 2 + // - TEST 2 (check_interval added): PASSES in 28.24s + // - TEST 3 (0 rounds): PASSES in 35.39s + // - TEST 4 (1 round): FAILS - timeout at 300s + // + // check_interval is the MINIMAL fix (send_refresh_updates is NOT needed): + // 1. Drains WebSocket buffer through periodic get_state() .recv() calls + // 2. Provides early exit when updates propagate (typically after 1 check) + // + // Ideal solution: WebApi should handle backpressure internally (e.g., bounded channel + // with automatic draining or async send with timeout). run_blocked_peers_test(BlockedPeersConfig { test_name: "baseline", initial_wait: Duration::from_secs(10), @@ -797,7 +815,7 @@ async fn test_ping_blocked_peers() -> TestResult { update_wait: Duration::from_secs(5), propagation_wait: Duration::from_secs(8), verbose_logging: false, - check_interval: None, + check_interval: Some(Duration::from_secs(3)), // FIXME: Required for WebSocket backpressure (see above) send_refresh_updates: false, send_final_updates: true, subscribe_immediately: false, @@ -808,18 +826,20 @@ async fn test_ping_blocked_peers() -> TestResult { /// Simple blocked peers test #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_ping_blocked_peers_simple() -> TestResult { + // FIXME: Same WebSocket backpressure issue as baseline test + // See detailed explanation in test_ping_blocked_peers above run_blocked_peers_test(BlockedPeersConfig { test_name: "simple", - initial_wait: Duration::from_secs(10), - operation_timeout: Duration::from_secs(15), - update_rounds: 1, // Only one round of updates - update_wait: Duration::from_secs(3), - propagation_wait: Duration::from_secs(10), // Longer wait for simpler flow + initial_wait: Duration::from_secs(12), + operation_timeout: Duration::from_secs(25), + update_rounds: 1, + update_wait: Duration::from_secs(4), + propagation_wait: Duration::from_secs(12), verbose_logging: false, - check_interval: None, + check_interval: Some(Duration::from_secs(3)), // FIXME: Required for WebSocket backpressure send_refresh_updates: false, send_final_updates: false, - subscribe_immediately: true, + subscribe_immediately: false, }) .await } From 14c4cc5edc84f4ec4f9e5e542136f04fb90ba410 Mon Sep 17 00:00:00 2001 From: hsantos Date: Sun, 23 Nov 2025 18:08:22 +0100 Subject: [PATCH 04/12] use correct subscription tracking in PUT operations --- crates/core/src/operations/put.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 68c1d330d..ec10dfc9e 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -478,8 +478,7 @@ impl Operation for PutOp { // Start subscription // Note: skip_list is no longer used here as subscriptions handle their own routing - let child_tx = - super::start_subscription_request_internal(op_manager, *id, key, false); + let child_tx = super::start_subscription_request(op_manager, *id, key); tracing::debug!(tx = %id, %child_tx, "started subscription as child operation"); op_manager.ring.seed_contract(key); @@ -755,9 +754,8 @@ impl Operation for PutOp { %key, "starting child subscription for PUT operation" ); - let child_tx = super::start_subscription_request_internal( - op_manager, *id, key, false, - ); + let child_tx = + super::start_subscription_request(op_manager, *id, key); tracing::debug!(tx = %id, %child_tx, "started subscription as child operation"); } else { tracing::warn!( @@ -904,9 +902,7 @@ impl Operation for PutOp { // Start subscription and handle dropped contracts let (dropped_contract, old_subscribers) = { - let child_tx = super::start_subscription_request_internal( - op_manager, *id, key, false, - ); + let child_tx = super::start_subscription_request(op_manager, *id, key); tracing::debug!(tx = %id, %child_tx, "started subscription as child operation"); op_manager.ring.seed_contract(key) }; From 99a565af30dc390d7c754782717a3f4d9dbe895c Mon Sep 17 00:00:00 2001 From: hsantos Date: Mon, 1 Dec 2025 22:49:52 +0100 Subject: [PATCH 05/12] improve UPDATE operation handling and logging for asynchronous processing --- crates/core/src/client_events/mod.rs | 46 ++++++++++++++++++---------- crates/core/src/node/mod.rs | 12 ++++++++ 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/crates/core/src/client_events/mod.rs b/crates/core/src/client_events/mod.rs index 193a4bf0b..01a1d745d 100644 --- a/crates/core/src/client_events/mod.rs +++ b/crates/core/src/client_events/mod.rs @@ -781,25 +781,39 @@ async fn process_open_request( "Request-Transaction correlation" ); - if let Err(err) = update::request_update(&op_manager, op).await { - tracing::error!("request update error {}", err); - - // Notify client of error via result router - let error_response = Err(ErrorKind::OperationError { - cause: format!("UPDATE operation failed: {}", err).into(), + match update::request_update(&op_manager, op).await { + Ok(()) => { + // UPDATE completed synchronously } - .into()); - - if let Err(e) = op_manager - .result_router_tx - .send((transaction_id, error_response)) - .await - { - tracing::error!( - "Failed to send UPDATE error to result router: {}. Transaction: {}", - e, transaction_id + Err(OpError::StatePushed) => { + // StatePushed is a control flow signal, not an error + // The operation continues asynchronously via notify_op_change + tracing::debug!( + transaction_id = %transaction_id, + "UPDATE operation continuing asynchronously (StatePushed)" ); } + Err(err) => { + tracing::error!("request update error {}", err); + + // Notify client of error via result router + let error_response = Err(ErrorKind::OperationError { + cause: format!("UPDATE operation failed: {}", err) + .into(), + } + .into()); + + if let Err(e) = op_manager + .result_router_tx + .send((transaction_id, error_response)) + .await + { + tracing::error!( + "Failed to send UPDATE error to result router: {}. Transaction: {}", + e, transaction_id + ); + } + } } } else { tracing::debug!( diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index fd86d08e9..24c4775cb 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -1141,6 +1141,18 @@ async fn handle_pure_network_result( Ok(None) => { tracing::debug!("Network operation returned no result"); } + Err(OpError::StatePushed) => { + // StatePushed is NOT an error - it's a control flow signal indicating + // the operation continues asynchronously via notify_op_change. + // The operation state has been pushed to the op stack and a message + // has been queued for local processing. + tracing::debug!( + "Network operation continuing asynchronously (StatePushed) for transaction: {:?}", + tx + ); + // Return Ok(None) since the operation will continue processing + return Ok(None); + } Err(e) => { tracing::error!("Network operation failed: {}", e); // TODO: Register error event properly From f0de1fd38b7f5c390f24b0e0f3fb40145cac6929 Mon Sep 17 00:00:00 2001 From: hsantos Date: Thu, 4 Dec 2025 18:25:45 +0100 Subject: [PATCH 06/12] some fixes adn tets changes --- apps/freenet-ping/Cargo.lock | 13 ++--- apps/freenet-ping/app/tests/common/mod.rs | 39 +++++++++----- .../app/tests/run_app_blocked_peers.rs | 54 ++++++------------- crates/core/src/client_events/mod.rs | 19 +------ crates/core/src/node/mod.rs | 9 ---- crates/core/src/operations/subscribe.rs | 2 - 6 files changed, 45 insertions(+), 91 deletions(-) diff --git a/apps/freenet-ping/Cargo.lock b/apps/freenet-ping/Cargo.lock index ec50d48e0..cd34fabab 100644 --- a/apps/freenet-ping/Cargo.lock +++ b/apps/freenet-ping/Cargo.lock @@ -74,12 +74,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "android-tzdata" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" - [[package]] name = "android_system_properties" version = "0.1.5" @@ -524,17 +518,16 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.41" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ - "android-tzdata", "iana-time-zone", "js-sys", "num-traits", "serde", "wasm-bindgen", - "windows-link 0.1.3", + "windows-link 0.2.1", ] [[package]] diff --git a/apps/freenet-ping/app/tests/common/mod.rs b/apps/freenet-ping/app/tests/common/mod.rs index 906e36832..8bb015efa 100644 --- a/apps/freenet-ping/app/tests/common/mod.rs +++ b/apps/freenet-ping/app/tests/common/mod.rs @@ -19,15 +19,17 @@ use futures::{future::BoxFuture, FutureExt}; use rand::{random, Rng, SeedableRng}; use std::io::{Read, Write}; use std::process::{Child, Command, Stdio}; -use std::sync::{Arc, LazyLock}; +use std::sync::{Arc, LazyLock, Mutex}; use std::{ collections::HashSet, io, net::{Ipv4Addr, SocketAddr, TcpListener}, path::{Path, PathBuf}, - sync::Mutex, time::Duration, }; + +/// Global lock to prevent concurrent contract compilation which causes race conditions +static COMPILE_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); use tokio::{select, time::sleep}; use tokio_tungstenite::connect_async; use tracing::{info, span, Instrument, Level}; @@ -36,11 +38,6 @@ use serde::{Deserialize, Serialize}; const TARGET_DIR_VAR: &str = "CARGO_TARGET_DIR"; -pub static RNG: LazyLock> = LazyLock::new(|| { - Mutex::new(rand::rngs::StdRng::from_seed( - *b"0102030405060708090a0b0c0d0e0f10", - )) -}); #[derive(Debug)] pub struct PresetConfig { @@ -58,7 +55,6 @@ pub fn get_free_socket_addr() -> Result { } #[allow(clippy::too_many_arguments)] -#[allow(clippy::await_holding_lock)] pub async fn base_node_test_config( is_gateway: bool, gateways: Vec, @@ -68,7 +64,13 @@ pub async fn base_node_test_config( base_tmp_dir: Option<&Path>, blocked_addresses: Option>, ) -> Result<(ConfigArgs, PresetConfig)> { - let mut rng = RNG.lock().unwrap(); + // Create RNG seeded from test name for reproducibility while maintaining isolation + use std::hash::{Hash, Hasher}; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + data_dir_suffix.hash(&mut hasher); + let seed = hasher.finish(); + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + base_node_test_config_with_rng( is_gateway, gateways, @@ -83,7 +85,7 @@ pub async fn base_node_test_config( } #[allow(clippy::too_many_arguments)] -pub async fn base_node_test_config_with_rng( +pub async fn base_node_test_config_with_rng( is_gateway: bool, gateways: Vec, public_port: Option, @@ -91,7 +93,7 @@ pub async fn base_node_test_config_with_rng( data_dir_suffix: &str, base_tmp_dir: Option<&Path>, blocked_addresses: Option>, - rng: &mut rand::rngs::StdRng, + rng: &mut R, ) -> Result<(ConfigArgs, PresetConfig)> { if is_gateway { assert!(public_port.is_some()); @@ -146,13 +148,19 @@ pub async fn base_node_test_config_with_rng( } pub fn gw_config_from_path(port: u16, path: &Path) -> Result { - gw_config_from_path_with_rng(port, path, &mut RNG.lock().unwrap()) + // Create RNG seeded from path for reproducibility while maintaining isolation + use std::hash::{Hash, Hasher}; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + path.hash(&mut hasher); + let seed = hasher.finish(); + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + gw_config_from_path_with_rng(port, path, &mut rng) } -pub fn gw_config_from_path_with_rng( +pub fn gw_config_from_path_with_rng( port: u16, path: &Path, - rng: &mut rand::rngs::StdRng, + rng: &mut R, ) -> Result { Ok(InlineGwConfig { address: (Ipv4Addr::LOCALHOST, port).into(), @@ -303,6 +311,9 @@ fn find_workspace_root() -> PathBuf { } fn compile_contract(contract_path: &PathBuf) -> anyhow::Result> { + // Acquire lock to prevent concurrent compilations which cause race conditions + let _lock = COMPILE_LOCK.lock().unwrap(); + ensure_target_dir_env(); println!("module path: {contract_path:?}"); let target = std::env::var(TARGET_DIR_VAR) diff --git a/apps/freenet-ping/app/tests/run_app_blocked_peers.rs b/apps/freenet-ping/app/tests/run_app_blocked_peers.rs index a4503f7b3..4e4c1e822 100644 --- a/apps/freenet-ping/app/tests/run_app_blocked_peers.rs +++ b/apps/freenet-ping/app/tests/run_app_blocked_peers.rs @@ -133,6 +133,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { let path = preset.temp_dir.path().to_path_buf(); (cfg, preset, gw_config_from_path(public_port, &path)?) }; + let ws_api_port_gw = config_gw.ws_api.ws_api_port.unwrap(); // Configure Node1 (blocks Node2) @@ -167,7 +168,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { tracing::info!("Node 1 blocks: {:?}", node2_network_addr); tracing::info!("Node 2 blocks: {:?}", node1_network_addr); - // Free socket resources + // Free socket resources before starting nodes std::mem::drop(network_socket_gw); std::mem::drop(ws_api_port_socket_gw); std::mem::drop(network_socket_node1); @@ -789,33 +790,15 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { /// Standard blocked peers test (baseline) #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_ping_blocked_peers() -> TestResult { - // FIXME: WebSocket backpressure issue requires check_interval workaround - // - // Problem: Sending multiple update rounds without reading responses causes WebSocket - // buffer to fill, making .send() block indefinitely. This happens with even 1 round - // of updates (3 initial + 3 round 1 = 6 total messages). - // - // Systematic testing showed: - // - TEST 1 (subscribe_immediately only): FAILS - blocks at Gateway round 2 - // - TEST 2 (check_interval added): PASSES in 28.24s - // - TEST 3 (0 rounds): PASSES in 35.39s - // - TEST 4 (1 round): FAILS - timeout at 300s - // - // check_interval is the MINIMAL fix (send_refresh_updates is NOT needed): - // 1. Drains WebSocket buffer through periodic get_state() .recv() calls - // 2. Provides early exit when updates propagate (typically after 1 check) - // - // Ideal solution: WebApi should handle backpressure internally (e.g., bounded channel - // with automatic draining or async send with timeout). run_blocked_peers_test(BlockedPeersConfig { test_name: "baseline", - initial_wait: Duration::from_secs(10), - operation_timeout: Duration::from_secs(20), + initial_wait: Duration::from_secs(25), + operation_timeout: Duration::from_secs(45), update_rounds: 3, update_wait: Duration::from_secs(5), - propagation_wait: Duration::from_secs(8), + propagation_wait: Duration::from_secs(15), verbose_logging: false, - check_interval: Some(Duration::from_secs(3)), // FIXME: Required for WebSocket backpressure (see above) + check_interval: Some(Duration::from_secs(4)), send_refresh_updates: false, send_final_updates: true, subscribe_immediately: false, @@ -826,17 +809,15 @@ async fn test_ping_blocked_peers() -> TestResult { /// Simple blocked peers test #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_ping_blocked_peers_simple() -> TestResult { - // FIXME: Same WebSocket backpressure issue as baseline test - // See detailed explanation in test_ping_blocked_peers above run_blocked_peers_test(BlockedPeersConfig { test_name: "simple", - initial_wait: Duration::from_secs(12), - operation_timeout: Duration::from_secs(25), + initial_wait: Duration::from_secs(25), + operation_timeout: Duration::from_secs(45), update_rounds: 1, - update_wait: Duration::from_secs(4), - propagation_wait: Duration::from_secs(12), + update_wait: Duration::from_secs(5), + propagation_wait: Duration::from_secs(15), verbose_logging: false, - check_interval: Some(Duration::from_secs(3)), // FIXME: Required for WebSocket backpressure + check_interval: Some(Duration::from_secs(4)), send_refresh_updates: false, send_final_updates: false, subscribe_immediately: false, @@ -844,9 +825,6 @@ async fn test_ping_blocked_peers_simple() -> TestResult { .await } -// Note: Redundant tests (optimized, improved, debug, reliable) were removed -// as they only varied in non-functional aspects like timeouts and logging - /// Solution/reference implementation for blocked peers // TODO-MUST-FIX: WebSocket connection reset during teardown - see issue #2108 // Test passes functionally (PUT/GET/Subscribe/state propagation all work) but @@ -856,13 +834,13 @@ async fn test_ping_blocked_peers_simple() -> TestResult { async fn test_ping_blocked_peers_solution() -> TestResult { run_blocked_peers_test(BlockedPeersConfig { test_name: "solution", - initial_wait: Duration::from_secs(12), - operation_timeout: Duration::from_secs(25), + initial_wait: Duration::from_secs(25), + operation_timeout: Duration::from_secs(60), update_rounds: 2, - update_wait: Duration::from_secs(4), - propagation_wait: Duration::from_secs(12), + update_wait: Duration::from_secs(6), + propagation_wait: Duration::from_secs(20), verbose_logging: false, - check_interval: Some(Duration::from_secs(3)), // Regular check intervals + check_interval: Some(Duration::from_secs(5)), send_refresh_updates: true, send_final_updates: true, subscribe_immediately: true, diff --git a/crates/core/src/client_events/mod.rs b/crates/core/src/client_events/mod.rs index 01a1d745d..fd1c16fd1 100644 --- a/crates/core/src/client_events/mod.rs +++ b/crates/core/src/client_events/mod.rs @@ -262,9 +262,6 @@ pub async fn client_event_handling( where ClientEv: ClientEventsProxy + Send + 'static, { - // Create RequestRouter for centralized request deduplication - // NOTE: This is ALWAYS Some, so the legacy mode code paths below are never executed. - // TODO: Consider removing legacy mode in a future PR and making request_router non-optional. let request_router = Some(std::sync::Arc::new(crate::node::RequestRouter::new())); let mut results = FuturesUnordered::new(); loop { @@ -607,8 +604,6 @@ async fn process_open_request( ); } } else { - // TODO: This legacy mode block is never executed since request_router is always Some. - // Consider removing in a future PR. tracing::debug!( peer_id = %peer_id, key = %contract_key, @@ -782,17 +777,7 @@ async fn process_open_request( ); match update::request_update(&op_manager, op).await { - Ok(()) => { - // UPDATE completed synchronously - } - Err(OpError::StatePushed) => { - // StatePushed is a control flow signal, not an error - // The operation continues asynchronously via notify_op_change - tracing::debug!( - transaction_id = %transaction_id, - "UPDATE operation continuing asynchronously (StatePushed)" - ); - } + Ok(()) | Err(OpError::StatePushed) => {} Err(err) => { tracing::error!("request update error {}", err); @@ -1038,8 +1023,6 @@ async fn process_open_request( } } } else { - // TODO: This legacy mode block is never executed since request_router is always Some. - // Consider removing in a future PR. tracing::debug!( this_peer = %peer_id, "Contract not found, starting direct GET operation (legacy mode)", diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index 24c4775cb..278560e9e 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -1142,15 +1142,6 @@ async fn handle_pure_network_result( tracing::debug!("Network operation returned no result"); } Err(OpError::StatePushed) => { - // StatePushed is NOT an error - it's a control flow signal indicating - // the operation continues asynchronously via notify_op_change. - // The operation state has been pushed to the op stack and a message - // has been queued for local processing. - tracing::debug!( - "Network operation continuing asynchronously (StatePushed) for transaction: {:?}", - tx - ); - // Return Ok(None) since the operation will continue processing return Ok(None); } Err(e) => { diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 708f91bdb..4af527aee 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -311,9 +311,7 @@ async fn complete_local_subscription( }) .await?; - // Mark operation as completed. This will handle parent completion if this is a sub-operation. op_manager.completed(id); - Ok(()) } From 698b13ab69959846247668089849502539cea4d9 Mon Sep 17 00:00:00 2001 From: hsantos Date: Sat, 6 Dec 2025 09:50:42 +0100 Subject: [PATCH 07/12] implement a contract storage notification system to handle race conditions in subscriptions --- crates/core/src/node/op_state_manager.rs | 40 +++++++- crates/core/src/operations/put.rs | 6 +- crates/core/src/operations/subscribe.rs | 116 ++++++++++++++++++++++- 3 files changed, 156 insertions(+), 6 deletions(-) diff --git a/crates/core/src/node/op_state_manager.rs b/crates/core/src/node/op_state_manager.rs index 424313cd2..a16bb461e 100644 --- a/crates/core/src/node/op_state_manager.rs +++ b/crates/core/src/node/op_state_manager.rs @@ -16,8 +16,8 @@ use std::{ use dashmap::{DashMap, DashSet}; use either::Either; use freenet_stdlib::prelude::ContractKey; -use parking_lot::RwLock; -use tokio::sync::mpsc; +use parking_lot::{Mutex, RwLock}; +use tokio::sync::{mpsc, oneshot}; use tracing::Instrument; use crate::{ @@ -213,6 +213,9 @@ pub(crate) struct OpManager { pub is_gateway: bool, /// Sub-operation tracking for atomic operation execution sub_op_tracker: SubOperationTracker, + /// Waiters for contract storage notification. + /// Operations can register to be notified when a specific contract is stored. + contract_waiters: Arc>>>>, } impl Clone for OpManager { @@ -228,6 +231,7 @@ impl Clone for OpManager { peer_ready: self.peer_ready.clone(), is_gateway: self.is_gateway, sub_op_tracker: self.sub_op_tracker.clone(), + contract_waiters: self.contract_waiters.clone(), } } } @@ -295,6 +299,7 @@ impl OpManager { peer_ready, is_gateway, sub_op_tracker, + contract_waiters: Arc::new(Mutex::new(std::collections::HashMap::new())), }) } @@ -649,6 +654,37 @@ impl OpManager { .add_transaction(peer_addr, *transaction); } } + + /// Register to be notified when a contract is stored. + /// Returns a receiver that will be signaled when the contract is stored. + /// This is used to handle race conditions where a subscription arrives before + /// the contract has been propagated via PUT. + pub fn wait_for_contract(&self, key: ContractKey) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel(); + let mut waiters = self.contract_waiters.lock(); + waiters.entry(key).or_default().push(tx); + rx + } + + /// Notify all waiters that a contract has been stored. + /// Called after successful contract storage in PUT operations. + pub fn notify_contract_stored(&self, key: &ContractKey) { + let mut waiters = self.contract_waiters.lock(); + if let Some(senders) = waiters.remove(key) { + let count = senders.len(); + for sender in senders { + // Ignore errors if receiver was dropped + let _ = sender.send(()); + } + if count > 0 { + tracing::debug!( + %key, + count, + "Notified waiters that contract has been stored" + ); + } + } + } } async fn notify_transaction_timeout( diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index ec10dfc9e..7cf7974ec 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -1433,7 +1433,11 @@ async fn put_contract( { Ok(ContractHandlerEvent::PutResponse { new_value: Ok(new_val), - }) => Ok(new_val), + }) => { + // Notify any waiters that this contract has been stored + op_manager.notify_contract_stored(&key); + Ok(new_val) + } Ok(ContractHandlerEvent::PutResponse { new_value: Err(err), }) => { diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 4af527aee..12cbfb76a 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -22,6 +22,9 @@ use tokio::time::{sleep, Duration}; const MAX_RETRIES: usize = 10; const LOCAL_FETCH_TIMEOUT_MS: u64 = 1_500; const LOCAL_FETCH_POLL_INTERVAL_MS: u64 = 25; +/// Timeout for waiting on contract storage notification. +/// Used when a subscription arrives before the contract has been propagated via PUT. +const CONTRACT_WAIT_TIMEOUT_MS: u64 = 2_000; fn subscribers_snapshot(op_manager: &OpManager, key: &ContractKey) -> Vec { op_manager @@ -553,6 +556,77 @@ impl Operation for SubscribeOp { ); } + // Contract not found locally. Wait briefly in case a PUT is in flight. + // This handles race conditions where subscription arrives before the contract. + // Uses channel-based notification instead of polling for efficiency. + tracing::debug!( + tx = %id, + %key, + "subscribe: contract not found, waiting for possible in-flight PUT" + ); + + // Register to be notified when the contract is stored + let contract_notifier = op_manager.wait_for_contract(*key); + + // Wait for either notification or timeout + let contract_arrived = tokio::select! { + _ = contract_notifier => { + // Notification received - contract was stored + true + } + _ = sleep(Duration::from_millis(CONTRACT_WAIT_TIMEOUT_MS)) => { + // Timeout - check one final time in case of race + super::has_contract(op_manager, *key).await.unwrap_or(false) + } + }; + + if contract_arrived { + tracing::info!( + tx = %id, + %key, + "subscribe: contract arrived via notification, handling locally" + ); + + // Contract arrived, register subscription + if op_manager + .ring + .add_subscriber(key, subscriber.clone(), None) + .is_err() + { + let return_msg = SubscribeMsg::ReturnSub { + id: *id, + key: *key, + target: subscriber.clone(), + subscribed: false, + }; + return Ok(OperationResult { + target_addr: return_msg.target_addr(), + return_msg: Some(NetMessage::from(return_msg)), + state: None, + }); + } + + let return_msg = SubscribeMsg::ReturnSub { + id: *id, + key: *key, + target: subscriber.clone(), + subscribed: true, + }; + return build_op_result( + self.id, + None, + Some(return_msg), + self.upstream_addr, + ); + } + + // Contract still not found after waiting, try to forward + tracing::debug!( + tx = %id, + %key, + "subscribe: contract not found after waiting, attempting to forward" + ); + let own_addr = own_loc .socket_addr() .expect("own location must have socket address"); @@ -569,9 +643,31 @@ impl Operation for SubscribeOp { .socket_addr() .map(|addr| addr != own_addr) .unwrap_or(false) - }) - .ok_or(RingError::NoCachingPeers(*key)) - .map_err(OpError::from)?; + }); + + // If no forward target available, send ReturnSub(subscribed: false) back + // This allows the subscriber to complete locally if they have the contract + let forward_target = match forward_target { + Some(target) => target, + None => { + tracing::warn!( + tx = %id, + %key, + "subscribe: no forward target available, returning unsubscribed" + ); + let return_msg = SubscribeMsg::ReturnSub { + id: *id, + key: *key, + target: subscriber.clone(), + subscribed: false, + }; + return Ok(OperationResult { + target_addr: return_msg.target_addr(), + return_msg: Some(NetMessage::from(return_msg)), + state: None, + }); + } + }; let forward_target_addr = forward_target .socket_addr() @@ -867,6 +963,20 @@ impl Operation for SubscribeOp { retries: retries + 1, }); } else { + // No more candidates - try to complete locally as fallback + if super::has_contract(op_manager, *key).await? { + tracing::info!( + tx = %id, + %key, + "No remote peers, completing subscription locally as fallback" + ); + complete_local_subscription(op_manager, *id, *key).await?; + return Ok(OperationResult { + return_msg: None, + target_addr: None, + state: None, + }); + } return Err(RingError::NoCachingPeers(*key).into()); } new_state = Some(SubscribeState::AwaitingResponse { From 99574502b54c741539cda34c5e717713682f9aba Mon Sep 17 00:00:00 2001 From: hsantos Date: Sat, 6 Dec 2025 10:14:36 +0100 Subject: [PATCH 08/12] fix dependencies in Cargo.toml for workspace compatibility and update import in ping_client.rs --- apps/freenet-ping/Cargo.toml | 3 ++- apps/freenet-ping/app/Cargo.toml | 2 +- apps/freenet-ping/app/src/ping_client.rs | 2 +- apps/freenet-ping/types/Cargo.toml | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/freenet-ping/Cargo.toml b/apps/freenet-ping/Cargo.toml index 79a379bde..3963ddeab 100644 --- a/apps/freenet-ping/Cargo.toml +++ b/apps/freenet-ping/Cargo.toml @@ -4,9 +4,10 @@ members = ["contracts/ping", "app", "types"] [workspace.dependencies] # freenet-stdlib = { path = "./../../stdlib/rust", features = ["contract"] } -freenet-stdlib = { version = "0.1.24" } +freenet-stdlib = { version = "0.1.24" } freenet-ping-types = { path = "types", default-features = false } chrono = { version = "0.4", default-features = false } +clap = "4" testresult = "0.4" [profile.dev.package."*"] diff --git a/apps/freenet-ping/app/Cargo.toml b/apps/freenet-ping/app/Cargo.toml index 97ab86cdc..345a15ef2 100644 --- a/apps/freenet-ping/app/Cargo.toml +++ b/apps/freenet-ping/app/Cargo.toml @@ -10,7 +10,7 @@ manual-tests = [] [dependencies] anyhow = "1.0" chrono = { workspace = true, features = ["default"] } -clap = { version = "4.5", features = ["derive"] } +clap = { workspace = true, features = ["derive"] } freenet-stdlib = { version = "0.1.24", features = ["net"] } freenet-ping-types = { path = "../types", features = ["std", "clap"] } futures = "0.3.31" diff --git a/apps/freenet-ping/app/src/ping_client.rs b/apps/freenet-ping/app/src/ping_client.rs index a8396a64f..f4aa8df15 100644 --- a/apps/freenet-ping/app/src/ping_client.rs +++ b/apps/freenet-ping/app/src/ping_client.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::time::{Duration, Instant}; -use chrono::{DateTime, Utc}; +use freenet_ping_types::chrono::{DateTime, Utc}; use freenet_ping_types::{Ping, PingContractOptions}; use freenet_stdlib::client_api::{ ClientRequest, ContractRequest, ContractResponse, HostResponse, WebApi, diff --git a/apps/freenet-ping/types/Cargo.toml b/apps/freenet-ping/types/Cargo.toml index 949a4da89..608138d17 100644 --- a/apps/freenet-ping/types/Cargo.toml +++ b/apps/freenet-ping/types/Cargo.toml @@ -17,6 +17,6 @@ humantime = "2" humantime-serde = "1" serde = { version = "1", features = ["derive"] } chrono = { workspace = true, features = ["serde"] } -clap = { version = "4", features = ["derive"], optional = true } +clap = { workspace = true, features = ["derive"], optional = true } freenet-stdlib = { workspace = true } From 1ca741f2e7934e9abbda23c2d6d05bde67475f56 Mon Sep 17 00:00:00 2001 From: hsantos Date: Sat, 6 Dec 2025 10:34:03 +0100 Subject: [PATCH 09/12] format code --- apps/freenet-ping/app/tests/common/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/freenet-ping/app/tests/common/mod.rs b/apps/freenet-ping/app/tests/common/mod.rs index 8bb015efa..e0003c77a 100644 --- a/apps/freenet-ping/app/tests/common/mod.rs +++ b/apps/freenet-ping/app/tests/common/mod.rs @@ -38,7 +38,6 @@ use serde::{Deserialize, Serialize}; const TARGET_DIR_VAR: &str = "CARGO_TARGET_DIR"; - #[derive(Debug)] pub struct PresetConfig { pub temp_dir: tempfile::TempDir, From 127b02ba40b2dc69c8d3348a1c7774415ae3dd94 Mon Sep 17 00:00:00 2001 From: hsantos Date: Sat, 6 Dec 2025 12:07:15 +0100 Subject: [PATCH 10/12] improve contract notification handling to address race conditions in subscriptions --- crates/core/src/node/op_state_manager.rs | 6 ++- crates/core/src/operations/subscribe.rs | 63 ++++++++++++++++-------- 2 files changed, 48 insertions(+), 21 deletions(-) diff --git a/crates/core/src/node/op_state_manager.rs b/crates/core/src/node/op_state_manager.rs index a16bb461e..f594abc7c 100644 --- a/crates/core/src/node/op_state_manager.rs +++ b/crates/core/src/node/op_state_manager.rs @@ -668,12 +668,16 @@ impl OpManager { /// Notify all waiters that a contract has been stored. /// Called after successful contract storage in PUT operations. + /// + /// Note: Stale waiters (from timed-out operations) are automatically cleaned up + /// here when we remove all senders for the key. The send() will fail silently + /// for dropped receivers, which is harmless. pub fn notify_contract_stored(&self, key: &ContractKey) { let mut waiters = self.contract_waiters.lock(); if let Some(senders) = waiters.remove(key) { let count = senders.len(); for sender in senders { - // Ignore errors if receiver was dropped + // Ignore errors if receiver was dropped (e.g., operation timed out) let _ = sender.send(()); } if count > 0 { diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 12cbfb76a..3928eb108 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -26,6 +26,43 @@ const LOCAL_FETCH_POLL_INTERVAL_MS: u64 = 25; /// Used when a subscription arrives before the contract has been propagated via PUT. const CONTRACT_WAIT_TIMEOUT_MS: u64 = 2_000; +/// Wait for a contract to become available, using channel-based notification. +/// +/// This handles the race condition where a subscription arrives before the contract +/// has been propagated via PUT. The flow is: +/// 1. Fast path: check if contract already exists +/// 2. Register notification waiter +/// 3. Check again (handles race between step 1 and 2) +/// 4. Wait for notification or timeout +/// 5. Final verification of actual state +async fn wait_for_contract_with_timeout( + op_manager: &OpManager, + key: ContractKey, + timeout_ms: u64, +) -> Result { + // Fast path - contract already exists + if super::has_contract(op_manager, key).await? { + return Ok(true); + } + + // Register waiter BEFORE second check to avoid race condition + let notifier = op_manager.wait_for_contract(key); + + // Check again - contract may have arrived between first check and registration + if super::has_contract(op_manager, key).await? { + return Ok(true); + } + + // Wait for notification or timeout (we don't care which triggers first) + let _ = tokio::select! { + _ = notifier => {} + _ = sleep(Duration::from_millis(timeout_ms)) => {} + }; + + // Always verify actual state - don't trust notification alone + super::has_contract(op_manager, key).await +} + fn subscribers_snapshot(op_manager: &OpManager, key: &ContractKey) -> Vec { op_manager .ring @@ -557,37 +594,23 @@ impl Operation for SubscribeOp { } // Contract not found locally. Wait briefly in case a PUT is in flight. - // This handles race conditions where subscription arrives before the contract. - // Uses channel-based notification instead of polling for efficiency. tracing::debug!( tx = %id, %key, "subscribe: contract not found, waiting for possible in-flight PUT" ); - // Register to be notified when the contract is stored - let contract_notifier = op_manager.wait_for_contract(*key); - - // Wait for either notification or timeout - let contract_arrived = tokio::select! { - _ = contract_notifier => { - // Notification received - contract was stored - true - } - _ = sleep(Duration::from_millis(CONTRACT_WAIT_TIMEOUT_MS)) => { - // Timeout - check one final time in case of race - super::has_contract(op_manager, *key).await.unwrap_or(false) - } - }; - - if contract_arrived { + // Wait for contract with timeout (handles race conditions internally) + if wait_for_contract_with_timeout(op_manager, *key, CONTRACT_WAIT_TIMEOUT_MS) + .await? + { tracing::info!( tx = %id, %key, - "subscribe: contract arrived via notification, handling locally" + "subscribe: contract arrived, handling locally" ); - // Contract arrived, register subscription + // Contract exists, register subscription if op_manager .ring .add_subscriber(key, subscriber.clone(), None) From a4186370659eb2e57a8745913525b15243e1671a Mon Sep 17 00:00:00 2001 From: hsantos Date: Sat, 6 Dec 2025 12:12:28 +0100 Subject: [PATCH 11/12] fix clippy error --- crates/core/src/operations/subscribe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 3928eb108..474d96e94 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -54,7 +54,7 @@ async fn wait_for_contract_with_timeout( } // Wait for notification or timeout (we don't care which triggers first) - let _ = tokio::select! { + tokio::select! { _ = notifier => {} _ = sleep(Duration::from_millis(timeout_ms)) => {} }; From 68f413d0f1fedf8b83c02f024311907ed75f9cdc Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 6 Dec 2025 13:17:04 -0600 Subject: [PATCH 12/12] fix: resolve flaky keypair collision tests and clippy warnings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two separate issues: 1. test_macro_example flaky tests: The duplicate keypair detection used Display impl of TransportPublicKey, which only captured 12 bytes of DER encoding. Analysis showed: - First 6 bytes: DER headers (identical for all 2048-bit RSA keys) - Last 6 bytes: Contains exponent 65537 (identical for all keys) - Only ~1 byte of actual entropy Fix: Use full public key comparison (via Hash impl) for uniqueness Also improved Display impl to use bytes from the modulus (actual random data) 2. transport_perf benchmark clippy errors (from merge): - explicit_counter_loop: Use for loop with range instead of manual counter - while_let_loop: Convert loop+match to while let 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- crates/core/benches/transport_perf.rs | 33 ++++++++------------ crates/core/src/transport/crypto.rs | 45 +++++++++++++++++++++++---- crates/freenet-macros/src/codegen.rs | 9 +++--- 3 files changed, 57 insertions(+), 30 deletions(-) diff --git a/crates/core/benches/transport_perf.rs b/crates/core/benches/transport_perf.rs index 701d2299e..21328df99 100644 --- a/crates/core/benches/transport_perf.rs +++ b/crates/core/benches/transport_perf.rs @@ -1284,15 +1284,13 @@ mod experimental_combined { }); // Sender: serialize, encrypt, send to channel - let mut nonce_counter = 0u64; - for _ in 0..PACKET_COUNT { + for nonce_counter in 0u64..PACKET_COUNT as u64 { // Allocate buffer for packet let mut packet = vec![0u8; size + 28]; // +28 for nonce+tag // Create nonce let mut nonce = [0u8; 12]; nonce[4..].copy_from_slice(&nonce_counter.to_le_bytes()); - nonce_counter += 1; // Copy nonce to packet packet[..12].copy_from_slice(&nonce); @@ -1493,24 +1491,19 @@ mod experimental_syscall_batching { let rt = tokio::runtime::Handle::current(); let mut batch = Vec::with_capacity(BATCH_SIZE); - loop { - // Try to receive up to BATCH_SIZE packets - match rt.block_on(rx.recv()) { - Some(packet) => { - batch.push(packet); - // Drain available packets up to batch size - while batch.len() < BATCH_SIZE { - match rx.try_recv() { - Ok(p) => batch.push(p), - Err(_) => break, - } - } - // Send batch - for packet in batch.drain(..) { - socket.send(&packet).unwrap(); - } + // Try to receive up to BATCH_SIZE packets + while let Some(packet) = rt.block_on(rx.recv()) { + batch.push(packet); + // Drain available packets up to batch size + while batch.len() < BATCH_SIZE { + match rx.try_recv() { + Ok(p) => batch.push(p), + Err(_) => break, } - None => break, + } + // Send batch + for packet in batch.drain(..) { + socket.send(&packet).unwrap(); } } }); diff --git a/crates/core/src/transport/crypto.rs b/crates/core/src/transport/crypto.rs index 79cb3673b..b7e68bba2 100644 --- a/crates/core/src/transport/crypto.rs +++ b/crates/core/src/transport/crypto.rs @@ -112,14 +112,23 @@ impl std::fmt::Display for TransportPublicKey { use pkcs8::EncodePublicKey; let encoded = self.0.to_public_key_der().map_err(|_| std::fmt::Error)?; - if encoded.as_bytes().len() >= 16 { - let bytes = encoded.as_bytes(); - let first_six = &bytes[..6]; - let last_six = &bytes[bytes.len() - 6..]; - let to_encode = [first_six, last_six].concat(); + let bytes = encoded.as_bytes(); + + // For RSA 2048-bit keys, DER encoding is ~294 bytes: + // - Bytes 0-~22: DER structure headers (same for all 2048-bit keys) + // - Bytes ~23-~279: Modulus (256 bytes of random data) + // - Bytes ~280-~293: Exponent (typically 65537, same for all keys) + // + // To create a short but unique display string, we use 12 bytes from + // the middle of the modulus where the actual random data lives. + if bytes.len() >= 150 { + // Take 12 bytes from the middle of the modulus (around byte 100-112) + let mid_start = 100; + let to_encode = &bytes[mid_start..mid_start + 12]; write!(f, "{}", bs58::encode(to_encode).into_string()) } else { - write!(f, "{}", bs58::encode(encoded.as_bytes()).into_string()) + // Fallback for smaller keys + write!(f, "{}", bs58::encode(bytes).into_string()) } } } @@ -162,3 +171,27 @@ fn key_sizes_and_decryption() { let bytes = pair.secret.decrypt(&encrypted).unwrap(); assert_eq!(bytes, sym_key_bytes.as_slice()); } + +#[cfg(test)] +mod display_uniqueness_tests { + use super::*; + use std::collections::HashSet; + + #[test] + fn display_produces_unique_strings_for_100_keys() { + // Verify the Display impl produces unique strings by checking 100 keys. + // This test was added after discovering the original Display impl only + // had ~1 byte of entropy (see PR #2233 for details). + let mut seen = HashSet::new(); + for i in 0..100 { + let keypair = TransportKeypair::new(); + let display_str = format!("{}", keypair.public()); + assert!( + seen.insert(display_str.clone()), + "Duplicate display string found at key {}: {}", + i, + display_str + ); + } + } +} diff --git a/crates/freenet-macros/src/codegen.rs b/crates/freenet-macros/src/codegen.rs index d3d376ba9..e6aba8038 100644 --- a/crates/freenet-macros/src/codegen.rs +++ b/crates/freenet-macros/src/codegen.rs @@ -199,17 +199,18 @@ fn generate_node_setup(args: &FreenetTestArgs) -> TokenStream { .collect(); // Verify all public keys are unique (sanity check against RNG issues) + // Note: We use the full public key for comparison (via its Hash impl), + // NOT the Display string which only captures 12 bytes of the DER encoding. { - let mut seen_keys = std::collections::HashSet::new(); + let mut seen_keys = std::collections::HashSet::::new(); for (idx, keypair) in __keypairs.iter().enumerate() { - let key_str = format!("{}", keypair.public()); - if !seen_keys.insert(key_str.clone()) { + if !seen_keys.insert(keypair.public().clone()) { return Err(anyhow::anyhow!( "FATAL: Generated duplicate transport keypair for node {} (key: {}). \ This indicates an RNG issue in the test environment. \ Please report this to the Freenet developers.", idx, - key_str + keypair.public() )); } }