diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 1ef1c7e35..8b2d55c92 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -295,15 +295,44 @@ impl Operation for PutOp { htl: *htl, }); - // Transition to AwaitingResponse state to handle future SuccessfulPut messages - new_state = Some(PutState::AwaitingResponse { - key, - upstream: Some(prev_sender.clone()), - contract: contract.clone(), - state: modified_value, - subscribe, - origin: origin.clone(), - }); + // When we're the origin node we already seeded the contract locally. + // Treat downstream SuccessfulPut messages as best-effort so River is unblocked. + if origin.peer == own_location.peer { + tracing::debug!( + tx = %id, + %key, + "Origin node finishing PUT without waiting for SuccessfulPut ack" + ); + + if subscribe { + if !op_manager.failed_parents().contains(id) { + let child_tx = + super::start_subscription_request(op_manager, *id, key); + tracing::debug!( + tx = %id, + %child_tx, + "started subscription as child operation" + ); + } else { + tracing::warn!( + tx = %id, + "not starting subscription for failed parent operation" + ); + } + } + + new_state = Some(PutState::Finished { key }); + } else { + // Transition to AwaitingResponse state to handle future SuccessfulPut messages + new_state = Some(PutState::AwaitingResponse { + key, + upstream: Some(prev_sender.clone()), + contract: contract.clone(), + state: modified_value, + subscribe, + origin: origin.clone(), + }); + } } else { // No other peers to forward to - we're the final destination tracing::warn!(