diff --git a/crates/core/src/client_events/mod.rs b/crates/core/src/client_events/mod.rs index e5c37673a..2b95e8490 100644 --- a/crates/core/src/client_events/mod.rs +++ b/crates/core/src/client_events/mod.rs @@ -889,33 +889,29 @@ async fn process_open_request( return Ok(None); }; - // Use RequestRouter for deduplication if in actor mode, otherwise direct operation - if let Some(router) = &request_router { + // SUBSCRIBE: Skip router deduplication due to instant-completion race conditions + // When contracts are local, Subscribe completes instantly which breaks deduplication: + // - Client 1 subscribes → operation completes → result delivered → TX removed + // - Client 2 subscribes → tries to reuse TX → but TX already gone + // Solution: Each client gets their own Subscribe operation (they're lightweight) + if let Some(_router) = &request_router { tracing::debug!( peer_id = %peer_id, key = %key, - "Routing SUBSCRIBE request through deduplication layer (actor mode)", + "Processing SUBSCRIBE without deduplication (actor mode - instant-completion safe)", ); - let request = crate::node::DeduplicatedRequest::Subscribe { - key, - client_id, - request_id, - }; - - let (_transaction_id, should_start_operation) = - router.route_request(request).await.map_err(|e| { - Error::Node(format!("Request routing failed: {}", e)) - })?; + // Create operation with new transaction ID + let tx = crate::message::Transaction::new::< + crate::operations::subscribe::SubscribeMsg, + >(); - // Register this client for the subscription result with proper WaitingTransaction type + // CRITICAL: Register BEFORE starting operation to avoid race with instant-completion use crate::contract::WaitingTransaction; op_manager .ch_outbound .waiting_for_transaction_result( - WaitingTransaction::Subscription { - contract_key: *key.id(), - }, + WaitingTransaction::Transaction(tx), client_id, request_id, ) @@ -927,37 +923,24 @@ async fn process_open_request( ); })?; - // Only start new network operation if this is a new operation - if should_start_operation { - tracing::debug!( - peer_id = %peer_id, - key = %key, - "Starting new SUBSCRIBE network operation via RequestRouter", - ); - - let op_id = crate::node::subscribe( - op_manager.clone(), - key, - Some(client_id), - ) - .await - .inspect_err(|err| { - tracing::error!("Subscribe error: {}", err); - })?; + // Start dedicated operation for this client AFTER registration + let _result_tx = crate::node::subscribe_with_id( + op_manager.clone(), + key, + None, // No legacy registration + Some(tx), + ) + .await + .inspect_err(|err| { + tracing::error!("Subscribe error: {}", err); + })?; - tracing::debug!( - request_id = %request_id, - transaction_id = %op_id, - operation = "subscribe", - "Request-Transaction correlation" - ); - } else { - tracing::debug!( - peer_id = %peer_id, - key = %key, - "Reusing existing SUBSCRIBE operation via RequestRouter - client registered for result", - ); - } + tracing::debug!( + request_id = %request_id, + transaction_id = %tx, + operation = "subscribe", + "SUBSCRIBE operation started with dedicated transaction for this client" + ); } else { tracing::debug!( peer_id = %peer_id, diff --git a/crates/core/src/client_events/result_router.rs b/crates/core/src/client_events/result_router.rs index 78ac24f70..e438d0ad1 100644 --- a/crates/core/src/client_events/result_router.rs +++ b/crates/core/src/client_events/result_router.rs @@ -28,10 +28,15 @@ impl ResultRouter { /// Main routing loop pub async fn run(mut self) { while let Some((tx, host_result)) = self.network_results.recv().await { + tracing::info!("ResultRouter received result for transaction: {}", tx); let msg = SessionMessage::DeliverHostResponse { tx, response: std::sync::Arc::new(host_result), }; + tracing::info!( + "ResultRouter sending result to SessionActor for transaction: {}", + tx + ); if let Err(e) = self.session_actor_tx.send(msg).await { // TODO: Add metric for router send failures // metrics::ROUTER_SEND_FAILURES.increment(); diff --git a/crates/core/src/client_events/session_actor.rs b/crates/core/src/client_events/session_actor.rs index dac1fe0cf..f5ba76cfa 100644 --- a/crates/core/src/client_events/session_actor.rs +++ b/crates/core/src/client_events/session_actor.rs @@ -67,7 +67,7 @@ impl SessionActor { // Track RequestId correlation self.client_request_ids.insert((tx, client_id), request_id); - debug!( + tracing::info!( "Registered transaction {} for client {} (request {}), total clients: {}", tx, client_id, @@ -103,9 +103,10 @@ impl SessionActor { tx: Transaction, result: std::sync::Arc, ) { + tracing::info!("Session actor attempting to deliver result for transaction {}, registered transactions: {}", tx, self.client_transactions.len()); if let Some(waiting_clients) = self.client_transactions.remove(&tx) { let client_count = waiting_clients.len(); - tracing::debug!( + tracing::info!( "Delivering result for transaction {} to {} clients", tx, client_count diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index 05aaa13c6..cde9ca162 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -328,7 +328,8 @@ impl ContractHandlerChannel { // Route to session actor if session adapter is installed if let Some(session_tx) = &self.session_adapter_tx { - // Only mirror Transaction variants, handle Subscription separately later + // Register all Transaction variants with the session actor + // Note: Subscription variant is legacy and should not be used in actor mode if let WaitingTransaction::Transaction(tx) = waiting_tx { let msg = SessionMessage::RegisterTransaction { tx, diff --git a/crates/core/src/message.rs b/crates/core/src/message.rs index 5df11c1b0..3e49151f3 100644 --- a/crates/core/src/message.rs +++ b/crates/core/src/message.rs @@ -326,6 +326,12 @@ pub(crate) enum NodeEvent { callback: tokio::sync::mpsc::Sender, }, TransactionTimedOut(Transaction), + /// Local subscription completed - deliver SubscribeResponse to client via result router + LocalSubscribeComplete { + tx: Transaction, + key: ContractKey, + subscribed: bool, + }, } #[derive(Debug, Clone)] @@ -389,6 +395,16 @@ impl Display for NodeEvent { NodeEvent::TransactionTimedOut(transaction) => { write!(f, "Transaction timed out ({transaction})") } + NodeEvent::LocalSubscribeComplete { + tx, + key, + subscribed, + } => { + write!( + f, + "Local subscribe complete (tx: {tx}, key: {key}, subscribed: {subscribed})" + ) + } } } } diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index 0d395b814..67fe7d186 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -1132,7 +1132,20 @@ pub async fn subscribe( key: ContractKey, client_id: Option, ) -> Result { - let op = subscribe::start_op(key); + subscribe_with_id(op_manager, key, client_id, None).await +} + +/// Attempts to subscribe to a contract with a specific transaction ID (for deduplication) +pub async fn subscribe_with_id( + op_manager: Arc, + key: ContractKey, + client_id: Option, + transaction_id: Option, +) -> Result { + let op = match transaction_id { + Some(id) => subscribe::start_op_with_id(key, id), + None => subscribe::start_op(key), + }; let id = op.id; if let Some(client_id) = client_id { use crate::client_events::RequestId; diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 66088854c..9a57a2977 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -589,6 +589,32 @@ impl P2pConnManager { ))?; } } + NodeEvent::LocalSubscribeComplete { + tx, + key, + subscribed, + } => { + tracing::info!("Received LocalSubscribeComplete event for transaction: {tx}, contract: {key}"); + + // Deliver SubscribeResponse directly to result router (actor mode) + // Following Nacho's suggestion to tap into result router without state transitions + if let Some(result_router) = &op_manager.result_router_tx { + tracing::info!("Sending SubscribeResponse to result router for transaction: {tx}"); + use freenet_stdlib::client_api::{ + ContractResponse, HostResponse, + }; + let response = Ok(HostResponse::ContractResponse( + ContractResponse::SubscribeResponse { key, subscribed }, + )); + + match result_router.send((tx, response)).await { + Ok(()) => tracing::info!("Successfully sent SubscribeResponse to result router for transaction: {tx}"), + Err(e) => tracing::error!("Failed to send local subscribe response to result router: {}", e), + } + } else { + tracing::warn!("No result router available for local subscribe completion (legacy mode)"); + } + } NodeEvent::Disconnect { cause } => { tracing::info!( "Disconnecting from network{}", diff --git a/crates/core/src/node/request_router.rs b/crates/core/src/node/request_router.rs index 73dc2a4b4..a682bcf2f 100644 --- a/crates/core/src/node/request_router.rs +++ b/crates/core/src/node/request_router.rs @@ -125,6 +125,9 @@ pub enum DeduplicatedRequest { client_id: ClientId, request_id: RequestId, }, + /// Note: Currently unused - Subscribe operations bypass deduplication to avoid + /// race conditions with instant-completion. Kept for potential future use. + #[allow(dead_code)] Subscribe { key: ContractKey, client_id: ClientId, diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index 22c62a54d..8ba650a6e 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -924,6 +924,9 @@ where NodeEvent::TransactionTimedOut(_) => { unimplemented!() } + NodeEvent::LocalSubscribeComplete { .. } => { + unimplemented!() + } NodeEvent::QuerySubscriptions { .. } => { unimplemented!() } diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 0df101da7..ba64a17c9 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -60,6 +60,12 @@ pub(crate) fn start_op(key: ContractKey) -> SubscribeOp { SubscribeOp { id, state } } +/// Create a Subscribe operation with a specific transaction ID (for operation deduplication) +pub(crate) fn start_op_with_id(key: ContractKey, id: Transaction) -> SubscribeOp { + let state = Some(SubscribeState::PrepareRequest { id, key }); + SubscribeOp { id, state } +} + /// Request to subscribe to value changes from a contract. pub(crate) async fn request_subscribe( op_manager: &OpManager, @@ -68,14 +74,44 @@ pub(crate) async fn request_subscribe( if let Some(SubscribeState::PrepareRequest { id, key }) = &sub_op.state { // Use k_closest_potentially_caching to try multiple candidates const EMPTY: &[PeerId] = &[]; - let candidates = op_manager.ring.k_closest_potentially_caching(key, EMPTY, 3); // Try up to 3 candidates + // Try up to 3 candidates + let candidates = op_manager.ring.k_closest_potentially_caching(key, EMPTY, 3); let target = match candidates.first() { Some(peer) => peer.clone(), None => { - // No remote peers available - this may happen when node is isolated - tracing::warn!(%key, "No remote peers available for subscription - node may be isolated"); - return Err(RingError::NoCachingPeers(*key).into()); + // No remote peers available - check if we have the contract locally + tracing::debug!(%key, "No remote peers available for subscription, checking locally"); + + if super::has_contract(op_manager, *key).await? { + // We have the contract locally - complete operation immediately + // Following Nacho's suggestion: use notify_node_event to tap into + // result router directly without state transitions + tracing::info!(%key, tx = %id, "Contract available locally, completing subscription via result router"); + + // Use notify_node_event to deliver SubscribeResponse directly to client + // This avoids the problem with notify_op_change overwriting the operation + match op_manager + .notify_node_event(crate::message::NodeEvent::LocalSubscribeComplete { + tx: *id, + key: *key, + subscribed: true, + }) + .await + { + Ok(()) => { + tracing::info!(%key, tx = %id, "Successfully sent LocalSubscribeComplete event") + } + Err(e) => { + tracing::error!(%key, tx = %id, error = %e, "Failed to send LocalSubscribeComplete event") + } + } + + return Ok(()); + } else { + tracing::debug!(%key, "Contract not available locally and no remote peers"); + return Err(RingError::NoCachingPeers(*key).into()); + } } }; diff --git a/crates/core/tests/isolated_node_regression.rs b/crates/core/tests/isolated_node_regression.rs index b2f127f67..74712cbe4 100644 --- a/crates/core/tests/isolated_node_regression.rs +++ b/crates/core/tests/isolated_node_regression.rs @@ -4,13 +4,14 @@ //! 1. PUT operations cache contracts locally without network timeouts //! 2. GET operations retrieve from local cache without self-routing attempts //! 3. Complete PUT→GET workflow functions properly on isolated nodes +//! 4. SUBSCRIBE operations complete successfully for local contracts use freenet::{ config::{ConfigArgs, NetworkArgs, SecretArgs, WebsocketApiArgs}, dev_tool::TransportKeypair, local_node::NodeConfig, server::serve_gateway, - test_utils::{load_contract, make_get, make_put}, + test_utils::{load_contract, make_get, make_put, make_subscribe}, }; use freenet_stdlib::{ client_api::{ClientRequest, ContractResponse, HostResponse, WebApi}, @@ -233,3 +234,180 @@ async fn test_isolated_node_put_get_workflow() -> anyhow::Result<()> { Ok(()) } + +/// Test subscription operations on isolated node with local contracts +/// +/// This regression test verifies that Subscribe operations complete successfully +/// when the contract exists locally but no remote peers are available. +/// Tests the fix in PR #1844 where SubscribeResponse messages were not being +/// delivered to WebSocket clients for local contracts. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_isolated_node_local_subscription() -> anyhow::Result<()> { + freenet::config::set_logger(Some(tracing::level_filters::LevelFilter::INFO), None); + + // Start a single isolated node (no peers) + let ws_port = 50800; + let network_port = 50801; + let (config, _temp_dir) = create_test_node_config(true, ws_port, Some(network_port)).await?; + + // Load test contract and state + const TEST_CONTRACT: &str = "test-contract-integration"; + let contract = load_contract(TEST_CONTRACT, vec![].into())?; + let contract_key = contract.key(); + let initial_state = freenet::test_utils::create_empty_todo_list(); + let wrapped_state = WrappedState::from(initial_state); + + // Start the node + let node_handle = { + let config = config.clone(); + async move { + let built_config = config.build().await?; + let node = NodeConfig::new(built_config.clone()) + .await? + .build(serve_gateway(built_config.ws_api).await) + .await?; + node.run().await + } + .boxed_local() + }; + + // Run the test with timeout + let test_result = timeout(Duration::from_secs(60), async { + // Give node time to start - critical for proper initialization + println!("Waiting for node to start up..."); + tokio::time::sleep(Duration::from_secs(10)).await; + println!("Node should be ready, proceeding with test..."); + + // Connect first client to the node + let url = format!( + "ws://localhost:{}/v1/contract/command?encodingProtocol=native", + ws_port + ); + let (ws_stream1, _) = connect_async(&url).await?; + let mut client1 = WebApi::start(ws_stream1); + + // Connect second client to test that subscriptions work for multiple clients + let (ws_stream2, _) = connect_async(&url).await?; + let mut client2 = WebApi::start(ws_stream2); + + println!("Step 1: Performing PUT operation to cache contract locally"); + + // Perform PUT operation - this should cache the contract locally + make_put(&mut client1, wrapped_state.clone(), contract.clone(), false).await?; + + // Wait for PUT response + let put_result = timeout(Duration::from_secs(30), client1.recv()).await; + + match put_result { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => { + assert_eq!(key, contract_key); + println!("PUT operation successful"); + } + Ok(Ok(other)) => { + panic!("Unexpected PUT response: {:?}", other); + } + Ok(Err(e)) => { + panic!("PUT operation failed: {}", e); + } + Err(_) => { + panic!("PUT operation timed out"); + } + } + + println!("Step 2: Testing SUBSCRIBE operation on locally cached contract"); + + // Subscribe first client to the contract - should work with local contract + let subscribe_start = std::time::Instant::now(); + make_subscribe(&mut client1, contract_key).await?; + + // Wait for SUBSCRIBE response - THIS IS THE KEY TEST + // The fix ensures this completes successfully for local contracts + let subscribe_result = timeout(Duration::from_secs(10), client1.recv()).await; + let subscribe_elapsed = subscribe_start.elapsed(); + + match subscribe_result { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse { + key, + subscribed, + }))) => { + assert_eq!(key, contract_key); + println!( + "Client 1: SUBSCRIBE operation successful in {:?}", + subscribe_elapsed + ); + + // Verify we got the subscribed confirmation (contract exists locally) + assert!( + subscribed, + "Should receive subscribed=true when subscribing to local contract" + ); + } + Ok(Ok(other)) => { + panic!("Unexpected SUBSCRIBE response: {:?}", other); + } + Ok(Err(e)) => { + panic!("SUBSCRIBE operation failed: {}", e); + } + Err(_) => { + panic!( + "SUBSCRIBE operation timed out - SubscribeResponse not delivered! \ + This indicates the bug from PR #1844 has regressed." + ); + } + } + + println!("Step 3: Testing second client subscription"); + + // Subscribe second client - verifies multiple clients can subscribe locally + make_subscribe(&mut client2, contract_key).await?; + + let subscribe2_result = timeout(Duration::from_secs(10), client2.recv()).await; + + match subscribe2_result { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse { + key, + subscribed, + }))) => { + assert_eq!(key, contract_key); + println!("Client 2: SUBSCRIBE operation successful"); + assert!(subscribed); + } + _ => { + panic!("Client 2: SUBSCRIBE operation failed or timed out"); + } + } + + // NOTE: Update/notification testing is skipped because UPDATE operations + // timeout on isolated nodes (see issue #1884). The core Subscribe functionality + // has been validated - both clients successfully receive SubscribeResponse. + // Update notification delivery can be tested once UPDATE is fixed for isolated nodes. + + println!("Local subscription test completed successfully - both clients received SubscribeResponse"); + + // Properly close clients + client1 + .send(ClientRequest::Disconnect { cause: None }) + .await?; + client2 + .send(ClientRequest::Disconnect { cause: None }) + .await?; + tokio::time::sleep(Duration::from_millis(100)).await; + + Ok::<(), anyhow::Error>(()) + }); + + // Run node and test concurrently + select! { + _ = node_handle => { + error!("Node exited unexpectedly"); + panic!("Node should not exit during test"); + } + result = test_result => { + result??; + // Give time for cleanup + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + + Ok(()) +}