From e7a19f24064a413a3705a19738a4521fad482284 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 28 Oct 2025 01:24:43 +0100 Subject: [PATCH 1/5] fix: cache contract state locally before forwarding client-initiated PUT to remote peer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a client publishes a contract update via `fdev publish`, the local node now caches the new state before forwarding to the optimal target peer. This ensures the publishing node serves the updated state immediately, even if the remote PUT times out. Fixes #2010 [AI-assisted debugging and comment] 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- crates/core/src/operations/put.rs | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 16ad8e538..be328141b 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -1096,7 +1096,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re return Ok(()); } - // At least one peer found - forward to network + // At least one peer found - cache locally first, then forward to network let target_peer = target.unwrap(); tracing::debug!( @@ -1104,14 +1104,31 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re %key, target_peer = %target_peer.peer, target_location = ?target_peer.location, - "Forwarding PUT to target peer" + "Caching state locally before forwarding PUT to target peer" + ); + + // Cache the contract state locally before forwarding + // This ensures the publishing node has immediate access to the new state + let updated_value = put_contract( + op_manager, + key, + value.clone(), + related_contracts.clone(), + &contract, + ) + .await?; + + tracing::debug!( + tx = %id, + %key, + "Local cache updated, now forwarding PUT to target peer" ); put_op.state = Some(PutState::AwaitingResponse { key, upstream: None, contract: contract.clone(), - state: value.clone(), + state: updated_value.clone(), subscribe, }); @@ -1121,7 +1138,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re sender: own_location, contract, related_contracts, - value, + value: updated_value, htl, target: target_peer, }; From 5eca27387fbfce4fbb96357054c77b5d57638c1a Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 28 Oct 2025 01:47:12 +0100 Subject: [PATCH 2/5] fix: test contract should not increment version for full state replacements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a contract's update_state() receives UpdateData::State (a full state replacement), it should not increment the version counter because the incoming state already has its own version. This prevents double-incrementing when the same state is merged at multiple peers. This fixes the test_gateway_reconnection test failure caused by the previous commit which correctly caches state locally before forwarding client-initiated PUTs to remote peers. [AI-assisted debugging and comment] 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- crates/core/tests/connectivity.rs | 11 +++++++++++ tests/test-contract-integration/src/lib.rs | 12 ++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/crates/core/tests/connectivity.rs b/crates/core/tests/connectivity.rs index d4798445d..174138736 100644 --- a/crates/core/tests/connectivity.rs +++ b/crates/core/tests/connectivity.rs @@ -226,6 +226,17 @@ async fn test_gateway_reconnection() -> TestResult { recv_contract.as_ref().expect("Contract should exist").key(), contract_key ); + if recv_state != wrapped_state { + eprintln!("State mismatch!"); + eprintln!( + "Expected state: {:?}", + String::from_utf8_lossy(wrapped_state.as_ref()) + ); + eprintln!( + "Received state: {:?}", + String::from_utf8_lossy(recv_state.as_ref()) + ); + } assert_eq!(recv_state, wrapped_state); tracing::info!("Initial GET successful"); } diff --git a/tests/test-contract-integration/src/lib.rs b/tests/test-contract-integration/src/lib.rs index c880752dc..553263954 100644 --- a/tests/test-contract-integration/src/lib.rs +++ b/tests/test-contract-integration/src/lib.rs @@ -92,9 +92,11 @@ impl ContractInterface for Contract { }; // Process each update operation + let mut had_delta_updates = false; for update in data { match update { UpdateData::Delta(delta) => { + had_delta_updates = true; let operation: TodoOperation = match serde_json::from_slice(delta.as_ref()) { Ok(op) => op, Err(e) => return Err(ContractError::Deser(e.to_string())), @@ -156,13 +158,19 @@ impl ContractInterface for Contract { return Err(ContractError::InvalidUpdate); } } + + // For full state replacements, don't increment version + // The incoming state already has its own version } _ => return Err(ContractError::InvalidUpdate), } } - // Increment the state version - todo_list.version += 1; + // Increment the state version for delta updates only + // Full state replacements already have their version set + if had_delta_updates { + todo_list.version += 1; + } // Serialize the new state let new_state = match serde_json::to_vec(&todo_list) { From 21c6b1a0ad06e668bfb45165a7435ac09867999c Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 28 Oct 2025 02:04:20 +0100 Subject: [PATCH 3/5] fix: only increment version when state actually changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous fix prevented version increment for full state replacements, but this broke the test_update_contract test which expects version to increment when applying updates. The correct behavior is to only increment the version when the state actually changes. This is detected by comparing the serialized state before and after the update operation. This approach: - Prevents double-incrementing when the same state is merged at multiple peers - Allows version to increment for actual state changes (updates) - Works correctly for both PUT and UPDATE operations [AI-assisted debugging and comment] 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- tests/test-contract-integration/src/lib.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/test-contract-integration/src/lib.rs b/tests/test-contract-integration/src/lib.rs index 553263954..7504c5fc4 100644 --- a/tests/test-contract-integration/src/lib.rs +++ b/tests/test-contract-integration/src/lib.rs @@ -86,17 +86,16 @@ impl ContractInterface for Contract { data: Vec>, ) -> Result, ContractError> { // Deserialize the current state - let mut todo_list: TodoList = match serde_json::from_slice(state.as_ref()) { + let original_state_bytes = state.as_ref(); + let mut todo_list: TodoList = match serde_json::from_slice(original_state_bytes) { Ok(list) => list, Err(e) => return Err(ContractError::Deser(e.to_string())), }; // Process each update operation - let mut had_delta_updates = false; for update in data { match update { UpdateData::Delta(delta) => { - had_delta_updates = true; let operation: TodoOperation = match serde_json::from_slice(delta.as_ref()) { Ok(op) => op, Err(e) => return Err(ContractError::Deser(e.to_string())), @@ -158,17 +157,19 @@ impl ContractInterface for Contract { return Err(ContractError::InvalidUpdate); } } - - // For full state replacements, don't increment version - // The incoming state already has its own version } _ => return Err(ContractError::InvalidUpdate), } } - // Increment the state version for delta updates only - // Full state replacements already have their version set - if had_delta_updates { + // Check if the state actually changed by comparing serialized forms + let new_state_bytes = + serde_json::to_vec(&todo_list).map_err(|e| ContractError::Other(e.to_string()))?; + let state_changed = original_state_bytes != new_state_bytes.as_slice(); + + // Only increment version if the state actually changed + // This prevents double-incrementing when the same state is merged at multiple peers + if state_changed { todo_list.version += 1; } From 938c65153400777a33e5ae3a1bb9208f2db8601c Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 28 Oct 2025 02:23:50 +0100 Subject: [PATCH 4/5] fix: apply UPDATE locally before forwarding to remote peer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Similar to the PUT fix, when a client initiates an UPDATE and the target peer is remote, the initiating peer now applies the update locally before forwarding. This ensures the initiating peer serves the updated state immediately, even if the remote UPDATE times out or fails. ### Changes - Modified request_update() in update.rs to call update_contract() before forwarding to the target peer - The updated (merged) value is forwarded, not the original value - Added logging to trace local update and forwarding steps ### Impact - Fixes UPDATE operations initiated via WebSocket API when running in network mode - Ensures publishing node has immediate access to updated state - Mirrors the PUT fix from the previous commit [AI-assisted debugging and comment] 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- crates/core/src/operations/update.rs | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index e67be5c16..de83f3834 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -1011,7 +1011,27 @@ pub(crate) async fn request_update( }; // Normal case: we found a remote target + // Apply the update locally first to ensure the initiating peer has the updated state let id = update_op.id; + + tracing::debug!( + tx = %id, + %key, + target_peer = %target.peer, + "Applying UPDATE locally before forwarding to target peer" + ); + + // Apply update locally - this ensures the initiating peer serves the updated state + // even if the remote UPDATE times out or fails + let updated_value = + update_contract(op_manager, key, value.clone(), related_contracts.clone()).await?; + + tracing::debug!( + tx = %id, + %key, + "Local update complete, now forwarding UPDATE to target peer" + ); + if let Some(stats) = &mut update_op.stats { stats.target = Some(target.clone()); } @@ -1026,7 +1046,7 @@ pub(crate) async fn request_update( sender, related_contracts, target, - value, + value: updated_value, // Send the updated value, not the original }; let op = UpdateOp { From c60e3622e7772ef1659ad3b6fa7dfc9e77eca311 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 28 Oct 2025 02:44:09 +0100 Subject: [PATCH 5/5] perf: improve error handling and reduce redundant serialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Based on Copilot's review comments on PR #2011: 1. Fixed redundant serialization in test contract (lib.rs:167-177): - Only serialize once when state doesn't change (reuse bytes) - Only re-serialize when version increments after actual state change - Improves performance by avoiding unnecessary serialization 2. Added error context to PUT operation (put.rs:1112): - Wrap put_contract() errors with descriptive logging - Helps identify failures during local caching before forwarding 3. Added error context to UPDATE operation (update.rs:1026): - Wrap update_contract() errors with descriptive logging - Helps identify failures during local update before forwarding All connectivity tests pass. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- crates/core/src/operations/put.rs | 11 ++++++++++- crates/core/src/operations/update.rs | 13 +++++++++++-- tests/test-contract-integration/src/lib.rs | 15 +++++++-------- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index be328141b..5b9ffb9f9 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -1116,7 +1116,16 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re related_contracts.clone(), &contract, ) - .await?; + .await + .map_err(|e| { + tracing::error!( + tx = %id, + %key, + error = %e, + "Failed to cache state locally before forwarding PUT" + ); + e + })?; tracing::debug!( tx = %id, diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index de83f3834..dd870257c 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -1023,8 +1023,17 @@ pub(crate) async fn request_update( // Apply update locally - this ensures the initiating peer serves the updated state // even if the remote UPDATE times out or fails - let updated_value = - update_contract(op_manager, key, value.clone(), related_contracts.clone()).await?; + let updated_value = update_contract(op_manager, key, value.clone(), related_contracts.clone()) + .await + .map_err(|e| { + tracing::error!( + tx = %id, + %key, + error = %e, + "Failed to apply update locally before forwarding UPDATE" + ); + e + })?; tracing::debug!( tx = %id, diff --git a/tests/test-contract-integration/src/lib.rs b/tests/test-contract-integration/src/lib.rs index 7504c5fc4..5c3f1d61b 100644 --- a/tests/test-contract-integration/src/lib.rs +++ b/tests/test-contract-integration/src/lib.rs @@ -171,15 +171,14 @@ impl ContractInterface for Contract { // This prevents double-incrementing when the same state is merged at multiple peers if state_changed { todo_list.version += 1; + // Re-serialize with incremented version + let new_state = + serde_json::to_vec(&todo_list).map_err(|e| ContractError::Other(e.to_string()))?; + Ok(UpdateModification::valid(State::from(new_state))) + } else { + // Reuse already serialized bytes since state didn't change + Ok(UpdateModification::valid(State::from(new_state_bytes))) } - - // Serialize the new state - let new_state = match serde_json::to_vec(&todo_list) { - Ok(bytes) => State::from(bytes), - Err(e) => return Err(ContractError::Other(e.to_string())), - }; - - Ok(UpdateModification::valid(new_state)) } fn summarize_state(