Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,22 @@ pub(crate) async fn request_subscribe(
tracing::debug!(%key, "No remote peers available for subscription, checking locally");

if super::has_contract(op_manager, *key).await? {
// We have the contract locally - complete operation immediately
// Following Nacho's suggestion: use notify_node_event to tap into
// result router directly without state transitions
tracing::info!(%key, tx = %id, "Contract available locally, completing subscription via result router");
// We have the contract locally - register subscription and complete immediately
tracing::info!(%key, tx = %id, "Contract available locally, registering local subscription");

// CRITICAL FIX for issue #2001: Register subscriber in DashMap before completing
// Without this, UPDATE operations won't find subscribers for locally-cached contracts
let subscriber = op_manager.ring.connection_manager.own_location();
if op_manager
.ring
.add_subscriber(key, subscriber.clone())
.is_err()
{
tracing::error!(%key, tx = %id, "Failed to add local subscriber - max subscribers reached");
// Continue anyway - client requested subscription and contract is local
} else {
tracing::debug!(%key, tx = %id, subscriber = %subscriber.peer, "Successfully registered local subscriber");
}

// Use notify_node_event to deliver SubscribeResponse directly to client
// This avoids the problem with notify_op_change overwriting the operation
Expand Down
2 changes: 2 additions & 0 deletions crates/core/tests/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,8 @@ async fn test_multiple_clients_subscription() -> TestResult {
}
}

tracing::info!("All clients subscribed, proceeding with UPDATE operation");

// Create a new to-do list by deserializing the current state, adding a task, and serializing it back
let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref())
.unwrap_or_else(|_| test_utils::TodoList {
Expand Down
Loading