Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions crates/core/src/operations/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1096,22 +1096,48 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
return Ok(());
}

// At least one peer found - forward to network
// At least one peer found - cache locally first, then forward to network
let target_peer = target.unwrap();

tracing::debug!(
tx = %id,
%key,
target_peer = %target_peer.peer,
target_location = ?target_peer.location,
"Forwarding PUT to target peer"
"Caching state locally before forwarding PUT to target peer"
);

// Cache the contract state locally before forwarding
// This ensures the publishing node has immediate access to the new state
let updated_value = put_contract(
op_manager,
key,
value.clone(),
related_contracts.clone(),
&contract,
)
.await
.map_err(|e| {
tracing::error!(
tx = %id,
%key,
error = %e,
"Failed to cache state locally before forwarding PUT"
);
e
})?;

tracing::debug!(
tx = %id,
%key,
"Local cache updated, now forwarding PUT to target peer"
);

put_op.state = Some(PutState::AwaitingResponse {
key,
upstream: None,
contract: contract.clone(),
state: value.clone(),
state: updated_value.clone(),
subscribe,
});

Expand All @@ -1121,7 +1147,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
sender: own_location,
contract,
related_contracts,
value,
value: updated_value,
htl,
target: target_peer,
};
Expand Down
31 changes: 30 additions & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,36 @@ pub(crate) async fn request_update(
};

// Normal case: we found a remote target
// Apply the update locally first to ensure the initiating peer has the updated state
let id = update_op.id;

tracing::debug!(
tx = %id,
%key,
target_peer = %target.peer,
"Applying UPDATE locally before forwarding to target peer"
);

// Apply update locally - this ensures the initiating peer serves the updated state
// even if the remote UPDATE times out or fails
let updated_value = update_contract(op_manager, key, value.clone(), related_contracts.clone())
.await
.map_err(|e| {
tracing::error!(
tx = %id,
%key,
error = %e,
"Failed to apply update locally before forwarding UPDATE"
);
e
})?;

tracing::debug!(
tx = %id,
%key,
"Local update complete, now forwarding UPDATE to target peer"
);

if let Some(stats) = &mut update_op.stats {
stats.target = Some(target.clone());
}
Expand All @@ -1026,7 +1055,7 @@ pub(crate) async fn request_update(
sender,
related_contracts,
target,
value,
value: updated_value, // Send the updated value, not the original
};

let op = UpdateOp {
Expand Down
11 changes: 11 additions & 0 deletions crates/core/tests/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,17 @@ async fn test_gateway_reconnection() -> TestResult {
recv_contract.as_ref().expect("Contract should exist").key(),
contract_key
);
if recv_state != wrapped_state {
eprintln!("State mismatch!");
eprintln!(
"Expected state: {:?}",
String::from_utf8_lossy(wrapped_state.as_ref())
);
eprintln!(
"Received state: {:?}",
String::from_utf8_lossy(recv_state.as_ref())
);
}
assert_eq!(recv_state, wrapped_state);
tracing::info!("Initial GET successful");
}
Expand Down
30 changes: 19 additions & 11 deletions tests/test-contract-integration/src/lib.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this tests should use the new test infrastructure and annotate each peer futures with an identifier using instrument.

Look at connectivity.rs and operations.ra integration tests in core for example.

Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ impl ContractInterface for Contract {
data: Vec<UpdateData<'static>>,
) -> Result<UpdateModification<'static>, ContractError> {
// Deserialize the current state
let mut todo_list: TodoList = match serde_json::from_slice(state.as_ref()) {
let original_state_bytes = state.as_ref();
let mut todo_list: TodoList = match serde_json::from_slice(original_state_bytes) {
Ok(list) => list,
Err(e) => return Err(ContractError::Deser(e.to_string())),
};
Expand Down Expand Up @@ -161,16 +162,23 @@ impl ContractInterface for Contract {
}
}

// Increment the state version
todo_list.version += 1;

// Serialize the new state
let new_state = match serde_json::to_vec(&todo_list) {
Ok(bytes) => State::from(bytes),
Err(e) => return Err(ContractError::Other(e.to_string())),
};

Ok(UpdateModification::valid(new_state))
// Check if the state actually changed by comparing serialized forms
let new_state_bytes =
serde_json::to_vec(&todo_list).map_err(|e| ContractError::Other(e.to_string()))?;
let state_changed = original_state_bytes != new_state_bytes.as_slice();

// Only increment version if the state actually changed
// This prevents double-incrementing when the same state is merged at multiple peers
if state_changed {
todo_list.version += 1;
// Re-serialize with incremented version
let new_state =
serde_json::to_vec(&todo_list).map_err(|e| ContractError::Other(e.to_string()))?;
Ok(UpdateModification::valid(State::from(new_state)))
} else {
// Reuse already serialized bytes since state didn't change
Ok(UpdateModification::valid(State::from(new_state_bytes)))
}
}

fn summarize_state(
Expand Down
Loading