diff --git a/apps/freenet-ping/Cargo.lock b/apps/freenet-ping/Cargo.lock index 37c540d55..cd34fabab 100644 --- a/apps/freenet-ping/Cargo.lock +++ b/apps/freenet-ping/Cargo.lock @@ -74,12 +74,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "android-tzdata" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" - [[package]] name = "android_system_properties" version = "0.1.5" @@ -524,17 +518,16 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.41" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ - "android-tzdata", "iana-time-zone", "js-sys", "num-traits", "serde", "wasm-bindgen", - "windows-link 0.1.3", + "windows-link 0.2.1", ] [[package]] @@ -1302,7 +1295,7 @@ dependencies = [ [[package]] name = "freenet" -version = "0.1.42" +version = "0.1.43" dependencies = [ "aes-gcm", "ahash", diff --git a/apps/freenet-ping/Cargo.toml b/apps/freenet-ping/Cargo.toml index 79a379bde..3963ddeab 100644 --- a/apps/freenet-ping/Cargo.toml +++ b/apps/freenet-ping/Cargo.toml @@ -4,9 +4,10 @@ members = ["contracts/ping", "app", "types"] [workspace.dependencies] # freenet-stdlib = { path = "./../../stdlib/rust", features = ["contract"] } -freenet-stdlib = { version = "0.1.24" } +freenet-stdlib = { version = "0.1.24" } freenet-ping-types = { path = "types", default-features = false } chrono = { version = "0.4", default-features = false } +clap = "4" testresult = "0.4" [profile.dev.package."*"] diff --git a/apps/freenet-ping/app/Cargo.toml b/apps/freenet-ping/app/Cargo.toml index 97ab86cdc..345a15ef2 100644 --- a/apps/freenet-ping/app/Cargo.toml +++ b/apps/freenet-ping/app/Cargo.toml @@ -10,7 +10,7 @@ manual-tests = [] [dependencies] anyhow = "1.0" chrono = { workspace = true, features = ["default"] } -clap = { version = "4.5", features = ["derive"] } +clap = { workspace = true, features = ["derive"] } freenet-stdlib = { version = "0.1.24", features = ["net"] } freenet-ping-types = { path = "../types", features = ["std", "clap"] } futures = "0.3.31" diff --git a/apps/freenet-ping/app/src/ping_client.rs b/apps/freenet-ping/app/src/ping_client.rs index a8396a64f..f4aa8df15 100644 --- a/apps/freenet-ping/app/src/ping_client.rs +++ b/apps/freenet-ping/app/src/ping_client.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::time::{Duration, Instant}; -use chrono::{DateTime, Utc}; +use freenet_ping_types::chrono::{DateTime, Utc}; use freenet_ping_types::{Ping, PingContractOptions}; use freenet_stdlib::client_api::{ ClientRequest, ContractRequest, ContractResponse, HostResponse, WebApi, diff --git a/apps/freenet-ping/app/tests/common/mod.rs b/apps/freenet-ping/app/tests/common/mod.rs index 906e36832..e0003c77a 100644 --- a/apps/freenet-ping/app/tests/common/mod.rs +++ b/apps/freenet-ping/app/tests/common/mod.rs @@ -19,15 +19,17 @@ use futures::{future::BoxFuture, FutureExt}; use rand::{random, Rng, SeedableRng}; use std::io::{Read, Write}; use std::process::{Child, Command, Stdio}; -use std::sync::{Arc, LazyLock}; +use std::sync::{Arc, LazyLock, Mutex}; use std::{ collections::HashSet, io, net::{Ipv4Addr, SocketAddr, TcpListener}, path::{Path, PathBuf}, - sync::Mutex, time::Duration, }; + +/// Global lock to prevent concurrent contract compilation which causes race conditions +static COMPILE_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); use tokio::{select, time::sleep}; use tokio_tungstenite::connect_async; use tracing::{info, span, Instrument, Level}; @@ -36,12 +38,6 @@ use serde::{Deserialize, Serialize}; const TARGET_DIR_VAR: &str = "CARGO_TARGET_DIR"; -pub static RNG: LazyLock> = LazyLock::new(|| { - Mutex::new(rand::rngs::StdRng::from_seed( - *b"0102030405060708090a0b0c0d0e0f10", - )) -}); - #[derive(Debug)] pub struct PresetConfig { pub temp_dir: tempfile::TempDir, @@ -58,7 +54,6 @@ pub fn get_free_socket_addr() -> Result { } #[allow(clippy::too_many_arguments)] -#[allow(clippy::await_holding_lock)] pub async fn base_node_test_config( is_gateway: bool, gateways: Vec, @@ -68,7 +63,13 @@ pub async fn base_node_test_config( base_tmp_dir: Option<&Path>, blocked_addresses: Option>, ) -> Result<(ConfigArgs, PresetConfig)> { - let mut rng = RNG.lock().unwrap(); + // Create RNG seeded from test name for reproducibility while maintaining isolation + use std::hash::{Hash, Hasher}; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + data_dir_suffix.hash(&mut hasher); + let seed = hasher.finish(); + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + base_node_test_config_with_rng( is_gateway, gateways, @@ -83,7 +84,7 @@ pub async fn base_node_test_config( } #[allow(clippy::too_many_arguments)] -pub async fn base_node_test_config_with_rng( +pub async fn base_node_test_config_with_rng( is_gateway: bool, gateways: Vec, public_port: Option, @@ -91,7 +92,7 @@ pub async fn base_node_test_config_with_rng( data_dir_suffix: &str, base_tmp_dir: Option<&Path>, blocked_addresses: Option>, - rng: &mut rand::rngs::StdRng, + rng: &mut R, ) -> Result<(ConfigArgs, PresetConfig)> { if is_gateway { assert!(public_port.is_some()); @@ -146,13 +147,19 @@ pub async fn base_node_test_config_with_rng( } pub fn gw_config_from_path(port: u16, path: &Path) -> Result { - gw_config_from_path_with_rng(port, path, &mut RNG.lock().unwrap()) + // Create RNG seeded from path for reproducibility while maintaining isolation + use std::hash::{Hash, Hasher}; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + path.hash(&mut hasher); + let seed = hasher.finish(); + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + gw_config_from_path_with_rng(port, path, &mut rng) } -pub fn gw_config_from_path_with_rng( +pub fn gw_config_from_path_with_rng( port: u16, path: &Path, - rng: &mut rand::rngs::StdRng, + rng: &mut R, ) -> Result { Ok(InlineGwConfig { address: (Ipv4Addr::LOCALHOST, port).into(), @@ -303,6 +310,9 @@ fn find_workspace_root() -> PathBuf { } fn compile_contract(contract_path: &PathBuf) -> anyhow::Result> { + // Acquire lock to prevent concurrent compilations which cause race conditions + let _lock = COMPILE_LOCK.lock().unwrap(); + ensure_target_dir_env(); println!("module path: {contract_path:?}"); let target = std::env::var(TARGET_DIR_VAR) diff --git a/apps/freenet-ping/app/tests/run_app_blocked_peers.rs b/apps/freenet-ping/app/tests/run_app_blocked_peers.rs index d6676c951..4e4c1e822 100644 --- a/apps/freenet-ping/app/tests/run_app_blocked_peers.rs +++ b/apps/freenet-ping/app/tests/run_app_blocked_peers.rs @@ -133,6 +133,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { let path = preset.temp_dir.path().to_path_buf(); (cfg, preset, gw_config_from_path(public_port, &path)?) }; + let ws_api_port_gw = config_gw.ws_api.ws_api_port.unwrap(); // Configure Node1 (blocks Node2) @@ -167,7 +168,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { tracing::info!("Node 1 blocks: {:?}", node2_network_addr); tracing::info!("Node 2 blocks: {:?}", node1_network_addr); - // Free socket resources + // Free socket resources before starting nodes std::mem::drop(network_socket_gw); std::mem::drop(ws_api_port_socket_gw); std::mem::drop(network_socket_node1); @@ -788,17 +789,16 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { /// Standard blocked peers test (baseline) #[test_log::test(tokio::test(flavor = "multi_thread"))] -#[ignore] async fn test_ping_blocked_peers() -> TestResult { run_blocked_peers_test(BlockedPeersConfig { test_name: "baseline", - initial_wait: Duration::from_secs(10), - operation_timeout: Duration::from_secs(20), + initial_wait: Duration::from_secs(25), + operation_timeout: Duration::from_secs(45), update_rounds: 3, update_wait: Duration::from_secs(5), - propagation_wait: Duration::from_secs(8), + propagation_wait: Duration::from_secs(15), verbose_logging: false, - check_interval: None, + check_interval: Some(Duration::from_secs(4)), send_refresh_updates: false, send_final_updates: true, subscribe_immediately: false, @@ -808,44 +808,39 @@ async fn test_ping_blocked_peers() -> TestResult { /// Simple blocked peers test #[test_log::test(tokio::test(flavor = "multi_thread"))] -#[ignore] async fn test_ping_blocked_peers_simple() -> TestResult { run_blocked_peers_test(BlockedPeersConfig { test_name: "simple", - initial_wait: Duration::from_secs(10), - operation_timeout: Duration::from_secs(15), - update_rounds: 1, // Only one round of updates - update_wait: Duration::from_secs(3), - propagation_wait: Duration::from_secs(10), // Longer wait for simpler flow + initial_wait: Duration::from_secs(25), + operation_timeout: Duration::from_secs(45), + update_rounds: 1, + update_wait: Duration::from_secs(5), + propagation_wait: Duration::from_secs(15), verbose_logging: false, - check_interval: None, + check_interval: Some(Duration::from_secs(4)), send_refresh_updates: false, send_final_updates: false, - subscribe_immediately: true, + subscribe_immediately: false, }) .await } -// Note: Redundant tests (optimized, improved, debug, reliable) were removed -// as they only varied in non-functional aspects like timeouts and logging - /// Solution/reference implementation for blocked peers // TODO-MUST-FIX: WebSocket connection reset during teardown - see issue #2108 // Test passes functionally (PUT/GET/Subscribe/state propagation all work) but // fails with "Connection reset without closing handshake" during cleanup. // Likely a test teardown race rather than functional bug. #[test_log::test(tokio::test(flavor = "multi_thread"))] -#[ignore] async fn test_ping_blocked_peers_solution() -> TestResult { run_blocked_peers_test(BlockedPeersConfig { test_name: "solution", - initial_wait: Duration::from_secs(12), - operation_timeout: Duration::from_secs(25), + initial_wait: Duration::from_secs(25), + operation_timeout: Duration::from_secs(60), update_rounds: 2, - update_wait: Duration::from_secs(4), - propagation_wait: Duration::from_secs(12), + update_wait: Duration::from_secs(6), + propagation_wait: Duration::from_secs(20), verbose_logging: false, - check_interval: Some(Duration::from_secs(3)), // Regular check intervals + check_interval: Some(Duration::from_secs(5)), send_refresh_updates: true, send_final_updates: true, subscribe_immediately: true, diff --git a/apps/freenet-ping/types/Cargo.toml b/apps/freenet-ping/types/Cargo.toml index 949a4da89..608138d17 100644 --- a/apps/freenet-ping/types/Cargo.toml +++ b/apps/freenet-ping/types/Cargo.toml @@ -17,6 +17,6 @@ humantime = "2" humantime-serde = "1" serde = { version = "1", features = ["derive"] } chrono = { workspace = true, features = ["serde"] } -clap = { version = "4", features = ["derive"], optional = true } +clap = { workspace = true, features = ["derive"], optional = true } freenet-stdlib = { workspace = true } diff --git a/crates/core/benches/transport_perf.rs b/crates/core/benches/transport_perf.rs index 701d2299e..21328df99 100644 --- a/crates/core/benches/transport_perf.rs +++ b/crates/core/benches/transport_perf.rs @@ -1284,15 +1284,13 @@ mod experimental_combined { }); // Sender: serialize, encrypt, send to channel - let mut nonce_counter = 0u64; - for _ in 0..PACKET_COUNT { + for nonce_counter in 0u64..PACKET_COUNT as u64 { // Allocate buffer for packet let mut packet = vec![0u8; size + 28]; // +28 for nonce+tag // Create nonce let mut nonce = [0u8; 12]; nonce[4..].copy_from_slice(&nonce_counter.to_le_bytes()); - nonce_counter += 1; // Copy nonce to packet packet[..12].copy_from_slice(&nonce); @@ -1493,24 +1491,19 @@ mod experimental_syscall_batching { let rt = tokio::runtime::Handle::current(); let mut batch = Vec::with_capacity(BATCH_SIZE); - loop { - // Try to receive up to BATCH_SIZE packets - match rt.block_on(rx.recv()) { - Some(packet) => { - batch.push(packet); - // Drain available packets up to batch size - while batch.len() < BATCH_SIZE { - match rx.try_recv() { - Ok(p) => batch.push(p), - Err(_) => break, - } - } - // Send batch - for packet in batch.drain(..) { - socket.send(&packet).unwrap(); - } + // Try to receive up to BATCH_SIZE packets + while let Some(packet) = rt.block_on(rx.recv()) { + batch.push(packet); + // Drain available packets up to batch size + while batch.len() < BATCH_SIZE { + match rx.try_recv() { + Ok(p) => batch.push(p), + Err(_) => break, } - None => break, + } + // Send batch + for packet in batch.drain(..) { + socket.send(&packet).unwrap(); } } }); diff --git a/crates/core/src/client_events/mod.rs b/crates/core/src/client_events/mod.rs index 9c38ed737..fd1c16fd1 100644 --- a/crates/core/src/client_events/mod.rs +++ b/crates/core/src/client_events/mod.rs @@ -203,6 +203,47 @@ pub trait ClientEventsProxy { ) -> BoxFuture<'_, Result<(), ClientError>>; } +/// Helper function to register a subscription listener for GET/PUT operations with auto-subscribe +async fn register_subscription_listener( + op_manager: &OpManager, + key: ContractKey, + client_id: ClientId, + subscription_listener: UnboundedSender, + operation_type: &str, +) -> Result<(), Error> { + tracing::debug!(%client_id, %key, "Registering subscription for {} with auto-subscribe", operation_type); + let register_listener = op_manager + .notify_contract_handler(ContractHandlerEvent::RegisterSubscriberListener { + key, + client_id, + summary: None, // No summary for GET/PUT-based subscriptions + subscriber_listener: subscription_listener, + }) + .await + .inspect_err(|err| { + tracing::error!( + %client_id, %key, + "Register subscriber listener error for {}: {}", operation_type, err + ); + }); + match register_listener { + Ok(ContractHandlerEvent::RegisterSubscriberListenerResponse) => { + tracing::debug!( + %client_id, %key, + "Subscriber listener registered successfully for {}", operation_type + ); + Ok(()) + } + _ => { + tracing::error!( + %client_id, %key, + "Subscriber listener registration failed for {}", operation_type + ); + Err(Error::Op(OpError::UnexpectedOpState)) + } + } +} + /// Process client events. /// /// # Architecture: Dual-Mode Client Handling @@ -221,7 +262,6 @@ pub async fn client_event_handling( where ClientEv: ClientEventsProxy + Send + 'static, { - // Create RequestRouter for centralized request deduplication let request_router = Some(std::sync::Arc::new(crate::node::RequestRouter::new())); let mut results = FuturesUnordered::new(); loop { @@ -613,39 +653,14 @@ async fn process_open_request( // Register subscription listener if subscribe=true if subscribe { if let Some(subscription_listener) = subscription_listener { - tracing::debug!(%client_id, %contract_key, "Registering subscription for PUT with auto-subscribe"); - let register_listener = op_manager - .notify_contract_handler( - ContractHandlerEvent::RegisterSubscriberListener { - key: contract_key, - client_id, - summary: None, // No summary for PUT-based subscriptions - subscriber_listener: subscription_listener, - }, - ) - .await - .inspect_err(|err| { - tracing::error!( - %client_id, %contract_key, - "Register subscriber listener error for PUT: {}", err - ); - }); - match register_listener { - Ok( - ContractHandlerEvent::RegisterSubscriberListenerResponse, - ) => { - tracing::debug!( - %client_id, %contract_key, - "Subscriber listener registered successfully for PUT" - ); - } - _ => { - tracing::error!( - %client_id, %contract_key, - "Subscriber listener registration failed for PUT" - ); - } - } + register_subscription_listener( + &op_manager, + contract_key, + client_id, + subscription_listener, + "PUT", + ) + .await?; } else { tracing::warn!(%client_id, %contract_key, "PUT with subscribe=true but no subscription_listener"); } @@ -761,24 +776,28 @@ async fn process_open_request( "Request-Transaction correlation" ); - if let Err(err) = update::request_update(&op_manager, op).await { - tracing::error!("request update error {}", err); + match update::request_update(&op_manager, op).await { + Ok(()) | Err(OpError::StatePushed) => {} + Err(err) => { + tracing::error!("request update error {}", err); - // Notify client of error via result router - let error_response = Err(ErrorKind::OperationError { - cause: format!("UPDATE operation failed: {}", err).into(), - } - .into()); + // Notify client of error via result router + let error_response = Err(ErrorKind::OperationError { + cause: format!("UPDATE operation failed: {}", err) + .into(), + } + .into()); - if let Err(e) = op_manager - .result_router_tx - .send((transaction_id, error_response)) - .await - { - tracing::error!( - "Failed to send UPDATE error to result router: {}. Transaction: {}", - e, transaction_id - ); + if let Err(e) = op_manager + .result_router_tx + .send((transaction_id, error_response)) + .await + { + tracing::error!( + "Failed to send UPDATE error to result router: {}. Transaction: {}", + e, transaction_id + ); + } } } } else { @@ -889,37 +908,14 @@ async fn process_open_request( // Handle subscription for locally found contracts if subscribe { if let Some(subscription_listener) = subscription_listener { - tracing::debug!(%client_id, %key, "Subscribing to locally found contract"); - let register_listener = op_manager - .notify_contract_handler( - ContractHandlerEvent::RegisterSubscriberListener { - key, - client_id, - summary: None, // No summary for GET-based subscriptions - subscriber_listener: subscription_listener, - }, - ) - .await - .inspect_err(|err| { - tracing::error!( - %client_id, %key, - "Register subscriber listener error for local GET: {}", err - ); - }); - match register_listener { - Ok(ContractHandlerEvent::RegisterSubscriberListenerResponse) => { - tracing::debug!( - %client_id, %key, - "Subscriber listener registered successfully for local GET" - ); - } - _ => { - tracing::error!( - %client_id, %key, - "Subscriber listener registration failed for local GET" - ); - } - } + register_subscription_listener( + &op_manager, + key, + client_id, + subscription_listener, + "local GET", + ) + .await?; } else { tracing::warn!(%client_id, %key, "GET with subscribe=true but no subscription_listener"); } @@ -1010,6 +1006,22 @@ async fn process_open_request( "Reusing existing GET operation via RequestRouter - client registered for result", ); } + + // Register subscription listener if subscribe=true + if subscribe { + if let Some(subscription_listener) = subscription_listener { + register_subscription_listener( + &op_manager, + key, + client_id, + subscription_listener, + "network GET", + ) + .await?; + } else { + tracing::warn!(%client_id, %key, "GET with subscribe=true but no subscription_listener"); + } + } } else { tracing::debug!( this_peer = %peer_id, @@ -1050,6 +1062,22 @@ async fn process_open_request( ); } } + + // Register subscription listener if subscribe=true + if subscribe { + if let Some(subscription_listener) = subscription_listener { + register_subscription_listener( + &op_manager, + key, + client_id, + subscription_listener, + "legacy GET", + ) + .await?; + } else { + tracing::warn!(%client_id, %key, "GET with subscribe=true but no subscription_listener"); + } + } } } ContractRequest::Subscribe { key, summary } => { diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index b190e4598..5a8199fec 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -1173,6 +1173,9 @@ async fn handle_pure_network_result( Ok(None) => { tracing::debug!("Network operation returned no result"); } + Err(OpError::StatePushed) => { + return Ok(None); + } Err(e) => { tracing::error!("Network operation failed: {}", e); // TODO: Register error event properly diff --git a/crates/core/src/node/op_state_manager.rs b/crates/core/src/node/op_state_manager.rs index 0a8a444cc..3556fb798 100644 --- a/crates/core/src/node/op_state_manager.rs +++ b/crates/core/src/node/op_state_manager.rs @@ -16,8 +16,8 @@ use std::{ use dashmap::{DashMap, DashSet}; use either::Either; use freenet_stdlib::prelude::ContractKey; -use parking_lot::RwLock; -use tokio::sync::mpsc; +use parking_lot::{Mutex, RwLock}; +use tokio::sync::{mpsc, oneshot}; use tracing::Instrument; use crate::{ @@ -216,6 +216,9 @@ pub(crate) struct OpManager { pub is_gateway: bool, /// Sub-operation tracking for atomic operation execution sub_op_tracker: SubOperationTracker, + /// Waiters for contract storage notification. + /// Operations can register to be notified when a specific contract is stored. + contract_waiters: Arc>>>>, /// Proximity cache manager for tracking neighbor contract caches pub proximity_cache: Arc, } @@ -233,6 +236,7 @@ impl Clone for OpManager { peer_ready: self.peer_ready.clone(), is_gateway: self.is_gateway, sub_op_tracker: self.sub_op_tracker.clone(), + contract_waiters: self.contract_waiters.clone(), proximity_cache: self.proximity_cache.clone(), } } @@ -303,6 +307,7 @@ impl OpManager { peer_ready, is_gateway, sub_op_tracker, + contract_waiters: Arc::new(Mutex::new(std::collections::HashMap::new())), proximity_cache, }) } @@ -658,6 +663,41 @@ impl OpManager { .add_transaction(peer_addr, *transaction); } } + + /// Register to be notified when a contract is stored. + /// Returns a receiver that will be signaled when the contract is stored. + /// This is used to handle race conditions where a subscription arrives before + /// the contract has been propagated via PUT. + pub fn wait_for_contract(&self, key: ContractKey) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel(); + let mut waiters = self.contract_waiters.lock(); + waiters.entry(key).or_default().push(tx); + rx + } + + /// Notify all waiters that a contract has been stored. + /// Called after successful contract storage in PUT operations. + /// + /// Note: Stale waiters (from timed-out operations) are automatically cleaned up + /// here when we remove all senders for the key. The send() will fail silently + /// for dropped receivers, which is harmless. + pub fn notify_contract_stored(&self, key: &ContractKey) { + let mut waiters = self.contract_waiters.lock(); + if let Some(senders) = waiters.remove(key) { + let count = senders.len(); + for sender in senders { + // Ignore errors if receiver was dropped (e.g., operation timed out) + let _ = sender.send(()); + } + if count > 0 { + tracing::debug!( + %key, + count, + "Notified waiters that contract has been stored" + ); + } + } + } } async fn notify_transaction_timeout( diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index e3abaa9e9..0602fc0f3 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -479,8 +479,7 @@ impl Operation for PutOp { // Start subscription // Note: skip_list is no longer used here as subscriptions handle their own routing - let child_tx = - super::start_subscription_request_internal(op_manager, *id, key, false); + let child_tx = super::start_subscription_request(op_manager, *id, key); tracing::debug!(tx = %id, %child_tx, "started subscription as child operation"); op_manager.ring.seed_contract(key); super::announce_contract_cached(op_manager, &key).await; @@ -758,9 +757,8 @@ impl Operation for PutOp { %key, "starting child subscription for PUT operation" ); - let child_tx = super::start_subscription_request_internal( - op_manager, *id, key, false, - ); + let child_tx = + super::start_subscription_request(op_manager, *id, key); tracing::debug!(tx = %id, %child_tx, "started subscription as child operation"); } else { tracing::warn!( @@ -907,9 +905,7 @@ impl Operation for PutOp { // Start subscription and handle dropped contracts let (dropped_contract, old_subscribers) = { - let child_tx = super::start_subscription_request_internal( - op_manager, *id, key, false, - ); + let child_tx = super::start_subscription_request(op_manager, *id, key); tracing::debug!(tx = %id, %child_tx, "started subscription as child operation"); let result = op_manager.ring.seed_contract(key); super::announce_contract_cached(op_manager, &key).await; @@ -1443,7 +1439,11 @@ async fn put_contract( { Ok(ContractHandlerEvent::PutResponse { new_value: Ok(new_val), - }) => Ok(new_val), + }) => { + // Notify any waiters that this contract has been stored + op_manager.notify_contract_stored(&key); + Ok(new_val) + } Ok(ContractHandlerEvent::PutResponse { new_value: Err(err), }) => { diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 81f234cd8..474d96e94 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -22,6 +22,46 @@ use tokio::time::{sleep, Duration}; const MAX_RETRIES: usize = 10; const LOCAL_FETCH_TIMEOUT_MS: u64 = 1_500; const LOCAL_FETCH_POLL_INTERVAL_MS: u64 = 25; +/// Timeout for waiting on contract storage notification. +/// Used when a subscription arrives before the contract has been propagated via PUT. +const CONTRACT_WAIT_TIMEOUT_MS: u64 = 2_000; + +/// Wait for a contract to become available, using channel-based notification. +/// +/// This handles the race condition where a subscription arrives before the contract +/// has been propagated via PUT. The flow is: +/// 1. Fast path: check if contract already exists +/// 2. Register notification waiter +/// 3. Check again (handles race between step 1 and 2) +/// 4. Wait for notification or timeout +/// 5. Final verification of actual state +async fn wait_for_contract_with_timeout( + op_manager: &OpManager, + key: ContractKey, + timeout_ms: u64, +) -> Result { + // Fast path - contract already exists + if super::has_contract(op_manager, key).await? { + return Ok(true); + } + + // Register waiter BEFORE second check to avoid race condition + let notifier = op_manager.wait_for_contract(key); + + // Check again - contract may have arrived between first check and registration + if super::has_contract(op_manager, key).await? { + return Ok(true); + } + + // Wait for notification or timeout (we don't care which triggers first) + tokio::select! { + _ = notifier => {} + _ = sleep(Duration::from_millis(timeout_ms)) => {} + }; + + // Always verify actual state - don't trust notification alone + super::has_contract(op_manager, key).await +} fn subscribers_snapshot(op_manager: &OpManager, key: &ContractKey) -> Vec { op_manager @@ -309,7 +349,10 @@ async fn complete_local_subscription( key, subscribed: true, }) - .await + .await?; + + op_manager.completed(id); + Ok(()) } pub(crate) struct SubscribeOp { @@ -550,6 +593,63 @@ impl Operation for SubscribeOp { ); } + // Contract not found locally. Wait briefly in case a PUT is in flight. + tracing::debug!( + tx = %id, + %key, + "subscribe: contract not found, waiting for possible in-flight PUT" + ); + + // Wait for contract with timeout (handles race conditions internally) + if wait_for_contract_with_timeout(op_manager, *key, CONTRACT_WAIT_TIMEOUT_MS) + .await? + { + tracing::info!( + tx = %id, + %key, + "subscribe: contract arrived, handling locally" + ); + + // Contract exists, register subscription + if op_manager + .ring + .add_subscriber(key, subscriber.clone(), None) + .is_err() + { + let return_msg = SubscribeMsg::ReturnSub { + id: *id, + key: *key, + target: subscriber.clone(), + subscribed: false, + }; + return Ok(OperationResult { + target_addr: return_msg.target_addr(), + return_msg: Some(NetMessage::from(return_msg)), + state: None, + }); + } + + let return_msg = SubscribeMsg::ReturnSub { + id: *id, + key: *key, + target: subscriber.clone(), + subscribed: true, + }; + return build_op_result( + self.id, + None, + Some(return_msg), + self.upstream_addr, + ); + } + + // Contract still not found after waiting, try to forward + tracing::debug!( + tx = %id, + %key, + "subscribe: contract not found after waiting, attempting to forward" + ); + let own_addr = own_loc .socket_addr() .expect("own location must have socket address"); @@ -566,9 +666,31 @@ impl Operation for SubscribeOp { .socket_addr() .map(|addr| addr != own_addr) .unwrap_or(false) - }) - .ok_or(RingError::NoCachingPeers(*key)) - .map_err(OpError::from)?; + }); + + // If no forward target available, send ReturnSub(subscribed: false) back + // This allows the subscriber to complete locally if they have the contract + let forward_target = match forward_target { + Some(target) => target, + None => { + tracing::warn!( + tx = %id, + %key, + "subscribe: no forward target available, returning unsubscribed" + ); + let return_msg = SubscribeMsg::ReturnSub { + id: *id, + key: *key, + target: subscriber.clone(), + subscribed: false, + }; + return Ok(OperationResult { + target_addr: return_msg.target_addr(), + return_msg: Some(NetMessage::from(return_msg)), + state: None, + }); + } + }; let forward_target_addr = forward_target .socket_addr() @@ -864,6 +986,20 @@ impl Operation for SubscribeOp { retries: retries + 1, }); } else { + // No more candidates - try to complete locally as fallback + if super::has_contract(op_manager, *key).await? { + tracing::info!( + tx = %id, + %key, + "No remote peers, completing subscription locally as fallback" + ); + complete_local_subscription(op_manager, *id, *key).await?; + return Ok(OperationResult { + return_msg: None, + target_addr: None, + state: None, + }); + } return Err(RingError::NoCachingPeers(*key).into()); } new_state = Some(SubscribeState::AwaitingResponse { diff --git a/crates/core/src/transport/crypto.rs b/crates/core/src/transport/crypto.rs index 79cb3673b..b7e68bba2 100644 --- a/crates/core/src/transport/crypto.rs +++ b/crates/core/src/transport/crypto.rs @@ -112,14 +112,23 @@ impl std::fmt::Display for TransportPublicKey { use pkcs8::EncodePublicKey; let encoded = self.0.to_public_key_der().map_err(|_| std::fmt::Error)?; - if encoded.as_bytes().len() >= 16 { - let bytes = encoded.as_bytes(); - let first_six = &bytes[..6]; - let last_six = &bytes[bytes.len() - 6..]; - let to_encode = [first_six, last_six].concat(); + let bytes = encoded.as_bytes(); + + // For RSA 2048-bit keys, DER encoding is ~294 bytes: + // - Bytes 0-~22: DER structure headers (same for all 2048-bit keys) + // - Bytes ~23-~279: Modulus (256 bytes of random data) + // - Bytes ~280-~293: Exponent (typically 65537, same for all keys) + // + // To create a short but unique display string, we use 12 bytes from + // the middle of the modulus where the actual random data lives. + if bytes.len() >= 150 { + // Take 12 bytes from the middle of the modulus (around byte 100-112) + let mid_start = 100; + let to_encode = &bytes[mid_start..mid_start + 12]; write!(f, "{}", bs58::encode(to_encode).into_string()) } else { - write!(f, "{}", bs58::encode(encoded.as_bytes()).into_string()) + // Fallback for smaller keys + write!(f, "{}", bs58::encode(bytes).into_string()) } } } @@ -162,3 +171,27 @@ fn key_sizes_and_decryption() { let bytes = pair.secret.decrypt(&encrypted).unwrap(); assert_eq!(bytes, sym_key_bytes.as_slice()); } + +#[cfg(test)] +mod display_uniqueness_tests { + use super::*; + use std::collections::HashSet; + + #[test] + fn display_produces_unique_strings_for_100_keys() { + // Verify the Display impl produces unique strings by checking 100 keys. + // This test was added after discovering the original Display impl only + // had ~1 byte of entropy (see PR #2233 for details). + let mut seen = HashSet::new(); + for i in 0..100 { + let keypair = TransportKeypair::new(); + let display_str = format!("{}", keypair.public()); + assert!( + seen.insert(display_str.clone()), + "Duplicate display string found at key {}: {}", + i, + display_str + ); + } + } +} diff --git a/crates/freenet-macros/src/codegen.rs b/crates/freenet-macros/src/codegen.rs index d3d376ba9..e6aba8038 100644 --- a/crates/freenet-macros/src/codegen.rs +++ b/crates/freenet-macros/src/codegen.rs @@ -199,17 +199,18 @@ fn generate_node_setup(args: &FreenetTestArgs) -> TokenStream { .collect(); // Verify all public keys are unique (sanity check against RNG issues) + // Note: We use the full public key for comparison (via its Hash impl), + // NOT the Display string which only captures 12 bytes of the DER encoding. { - let mut seen_keys = std::collections::HashSet::new(); + let mut seen_keys = std::collections::HashSet::::new(); for (idx, keypair) in __keypairs.iter().enumerate() { - let key_str = format!("{}", keypair.public()); - if !seen_keys.insert(key_str.clone()) { + if !seen_keys.insert(keypair.public().clone()) { return Err(anyhow::anyhow!( "FATAL: Generated duplicate transport keypair for node {} (key: {}). \ This indicates an RNG issue in the test environment. \ Please report this to the Freenet developers.", idx, - key_str + keypair.public() )); } }