diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index ba64a17c9..5b83206dd 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -84,10 +84,22 @@ pub(crate) async fn request_subscribe( tracing::debug!(%key, "No remote peers available for subscription, checking locally"); if super::has_contract(op_manager, *key).await? { - // We have the contract locally - complete operation immediately - // Following Nacho's suggestion: use notify_node_event to tap into - // result router directly without state transitions - tracing::info!(%key, tx = %id, "Contract available locally, completing subscription via result router"); + // We have the contract locally - register subscription and complete immediately + tracing::info!(%key, tx = %id, "Contract available locally, registering local subscription"); + + // CRITICAL FIX for issue #2001: Register subscriber in DashMap before completing + // Without this, UPDATE operations won't find subscribers for locally-cached contracts + let subscriber = op_manager.ring.connection_manager.own_location(); + if op_manager + .ring + .add_subscriber(key, subscriber.clone()) + .is_err() + { + tracing::error!(%key, tx = %id, "Failed to add local subscriber - max subscribers reached"); + // Continue anyway - client requested subscription and contract is local + } else { + tracing::debug!(%key, tx = %id, subscriber = %subscriber.peer, "Successfully registered local subscriber"); + } // Use notify_node_event to deliver SubscribeResponse directly to client // This avoids the problem with notify_op_change overwriting the operation diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index ad3597a64..f08ea647a 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -1148,6 +1148,8 @@ async fn test_multiple_clients_subscription() -> TestResult { } } + tracing::info!("All clients subscribed, proceeding with UPDATE operation"); + // Create a new to-do list by deserializing the current state, adding a task, and serializing it back let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref()) .unwrap_or_else(|_| test_utils::TodoList {