From cbd08496cc16fe1dff800b55852bb9fb2bb71055 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Mon, 27 Oct 2025 15:56:26 +0100 Subject: [PATCH 1/3] fix(test): add delay for cross-node subscription propagation in test_multiple_clients_subscription MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses issue #2001 where the test_multiple_clients_subscription integration test exhibits 20-40% flakiness due to a race condition. The test was starting UPDATE operations before cross-node subscriptions had fully propagated through the ring, causing notifications to be missed by Client 3 (on a different node). This fix adds a 5-second delay after all subscription confirmations are received, allowing time for subscriptions to propagate across nodes before the UPDATE operation begins. This is a pragmatic short-term solution. A proper architectural fix would involve making SUBSCRIBE operations truly synchronous with acknowledgment from the target node (see issue #2001 for discussion). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- crates/core/tests/operations.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index ad3597a64..b8eccf192 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -1148,6 +1148,14 @@ async fn test_multiple_clients_subscription() -> TestResult { } } + // Wait for subscriptions to fully propagate across nodes before UPDATE + // Issue #2001: Race condition where UPDATE can start before cross-node subscriptions + // are fully registered. Even though we receive SubscribeResponse, the subscription + // may not have propagated through the ring to all nodes yet. + tracing::info!("All clients subscribed, waiting 5 seconds for cross-node propagation..."); + tokio::time::sleep(Duration::from_secs(5)).await; + tracing::info!("Proceeding with UPDATE operation"); + // Create a new to-do list by deserializing the current state, adding a task, and serializing it back let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref()) .unwrap_or_else(|_| test_utils::TodoList { From 06925be41d9c9587f9c14c0ee32e8bd3f255751b Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Mon, 27 Oct 2025 20:08:18 +0100 Subject: [PATCH 2/3] fix: register subscribers for locally-cached contracts (issue #2001) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Root Cause When a contract was already cached locally, the SUBSCRIBE operation took a fast path that: 1. Verified contract exists locally 2. Sent SubscribeResponse to client 3. **Never called add_subscriber() to register the subscription** This caused UPDATE operations to find no subscribers for locally-cached contracts, silently dropping notifications. ## Why It Was Flaky - Contract on remote node: Normal flow → add_subscriber() called → works - Contract locally cached: Fast path → add_subscriber() skipped → breaks - Timing-dependent: Whether contract is local when subscription occurs ## The Fix Added call to add_subscriber() in the local subscription fast path (crates/core/src/operations/subscribe.rs:91-98), ensuring subscriptions are registered regardless of cache location. ## Verification Ran test 15 times - all passes (100% success rate vs. previous 60-80%) ## Impact - Fixes intermittent test failures - Ensures UPDATE notifications work correctly for locally-cached contracts - No performance impact (single DashMap insertion) - Removes need for 5-second delay workaround in test [AI-assisted debugging and comment] --- crates/core/src/operations/subscribe.rs | 20 ++++++++++++++++---- crates/core/tests/operations.rs | 8 +------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index ba64a17c9..5b83206dd 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -84,10 +84,22 @@ pub(crate) async fn request_subscribe( tracing::debug!(%key, "No remote peers available for subscription, checking locally"); if super::has_contract(op_manager, *key).await? { - // We have the contract locally - complete operation immediately - // Following Nacho's suggestion: use notify_node_event to tap into - // result router directly without state transitions - tracing::info!(%key, tx = %id, "Contract available locally, completing subscription via result router"); + // We have the contract locally - register subscription and complete immediately + tracing::info!(%key, tx = %id, "Contract available locally, registering local subscription"); + + // CRITICAL FIX for issue #2001: Register subscriber in DashMap before completing + // Without this, UPDATE operations won't find subscribers for locally-cached contracts + let subscriber = op_manager.ring.connection_manager.own_location(); + if op_manager + .ring + .add_subscriber(key, subscriber.clone()) + .is_err() + { + tracing::error!(%key, tx = %id, "Failed to add local subscriber - max subscribers reached"); + // Continue anyway - client requested subscription and contract is local + } else { + tracing::debug!(%key, tx = %id, subscriber = %subscriber.peer, "Successfully registered local subscriber"); + } // Use notify_node_event to deliver SubscribeResponse directly to client // This avoids the problem with notify_op_change overwriting the operation diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index b8eccf192..f08ea647a 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -1148,13 +1148,7 @@ async fn test_multiple_clients_subscription() -> TestResult { } } - // Wait for subscriptions to fully propagate across nodes before UPDATE - // Issue #2001: Race condition where UPDATE can start before cross-node subscriptions - // are fully registered. Even though we receive SubscribeResponse, the subscription - // may not have propagated through the ring to all nodes yet. - tracing::info!("All clients subscribed, waiting 5 seconds for cross-node propagation..."); - tokio::time::sleep(Duration::from_secs(5)).await; - tracing::info!("Proceeding with UPDATE operation"); + tracing::info!("All clients subscribed, proceeding with UPDATE operation"); // Create a new to-do list by deserializing the current state, adding a task, and serializing it back let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref()) From b128b48d0a1bf281d714a1ccc5ea56532c41e3f6 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Mon, 27 Oct 2025 20:28:58 +0100 Subject: [PATCH 3/3] ci: trigger CI re-run after title fix