diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 16ad8e538..5b9ffb9f9 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -1096,7 +1096,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re return Ok(()); } - // At least one peer found - forward to network + // At least one peer found - cache locally first, then forward to network let target_peer = target.unwrap(); tracing::debug!( @@ -1104,14 +1104,40 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re %key, target_peer = %target_peer.peer, target_location = ?target_peer.location, - "Forwarding PUT to target peer" + "Caching state locally before forwarding PUT to target peer" + ); + + // Cache the contract state locally before forwarding + // This ensures the publishing node has immediate access to the new state + let updated_value = put_contract( + op_manager, + key, + value.clone(), + related_contracts.clone(), + &contract, + ) + .await + .map_err(|e| { + tracing::error!( + tx = %id, + %key, + error = %e, + "Failed to cache state locally before forwarding PUT" + ); + e + })?; + + tracing::debug!( + tx = %id, + %key, + "Local cache updated, now forwarding PUT to target peer" ); put_op.state = Some(PutState::AwaitingResponse { key, upstream: None, contract: contract.clone(), - state: value.clone(), + state: updated_value.clone(), subscribe, }); @@ -1121,7 +1147,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re sender: own_location, contract, related_contracts, - value, + value: updated_value, htl, target: target_peer, }; diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index e67be5c16..dd870257c 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -1011,7 +1011,36 @@ pub(crate) async fn request_update( }; // Normal case: we found a remote target + // Apply the update locally first to ensure the initiating peer has the updated state let id = update_op.id; + + tracing::debug!( + tx = %id, + %key, + target_peer = %target.peer, + "Applying UPDATE locally before forwarding to target peer" + ); + + // Apply update locally - this ensures the initiating peer serves the updated state + // even if the remote UPDATE times out or fails + let updated_value = update_contract(op_manager, key, value.clone(), related_contracts.clone()) + .await + .map_err(|e| { + tracing::error!( + tx = %id, + %key, + error = %e, + "Failed to apply update locally before forwarding UPDATE" + ); + e + })?; + + tracing::debug!( + tx = %id, + %key, + "Local update complete, now forwarding UPDATE to target peer" + ); + if let Some(stats) = &mut update_op.stats { stats.target = Some(target.clone()); } @@ -1026,7 +1055,7 @@ pub(crate) async fn request_update( sender, related_contracts, target, - value, + value: updated_value, // Send the updated value, not the original }; let op = UpdateOp { diff --git a/crates/core/tests/connectivity.rs b/crates/core/tests/connectivity.rs index d4798445d..174138736 100644 --- a/crates/core/tests/connectivity.rs +++ b/crates/core/tests/connectivity.rs @@ -226,6 +226,17 @@ async fn test_gateway_reconnection() -> TestResult { recv_contract.as_ref().expect("Contract should exist").key(), contract_key ); + if recv_state != wrapped_state { + eprintln!("State mismatch!"); + eprintln!( + "Expected state: {:?}", + String::from_utf8_lossy(wrapped_state.as_ref()) + ); + eprintln!( + "Received state: {:?}", + String::from_utf8_lossy(recv_state.as_ref()) + ); + } assert_eq!(recv_state, wrapped_state); tracing::info!("Initial GET successful"); } diff --git a/tests/test-contract-integration/src/lib.rs b/tests/test-contract-integration/src/lib.rs index c880752dc..5c3f1d61b 100644 --- a/tests/test-contract-integration/src/lib.rs +++ b/tests/test-contract-integration/src/lib.rs @@ -86,7 +86,8 @@ impl ContractInterface for Contract { data: Vec>, ) -> Result, ContractError> { // Deserialize the current state - let mut todo_list: TodoList = match serde_json::from_slice(state.as_ref()) { + let original_state_bytes = state.as_ref(); + let mut todo_list: TodoList = match serde_json::from_slice(original_state_bytes) { Ok(list) => list, Err(e) => return Err(ContractError::Deser(e.to_string())), }; @@ -161,16 +162,23 @@ impl ContractInterface for Contract { } } - // Increment the state version - todo_list.version += 1; - - // Serialize the new state - let new_state = match serde_json::to_vec(&todo_list) { - Ok(bytes) => State::from(bytes), - Err(e) => return Err(ContractError::Other(e.to_string())), - }; - - Ok(UpdateModification::valid(new_state)) + // Check if the state actually changed by comparing serialized forms + let new_state_bytes = + serde_json::to_vec(&todo_list).map_err(|e| ContractError::Other(e.to_string()))?; + let state_changed = original_state_bytes != new_state_bytes.as_slice(); + + // Only increment version if the state actually changed + // This prevents double-incrementing when the same state is merged at multiple peers + if state_changed { + todo_list.version += 1; + // Re-serialize with incremented version + let new_state = + serde_json::to_vec(&todo_list).map_err(|e| ContractError::Other(e.to_string()))?; + Ok(UpdateModification::valid(State::from(new_state))) + } else { + // Reuse already serialized bytes since state didn't change + Ok(UpdateModification::valid(State::from(new_state_bytes))) + } } fn summarize_state(