Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 68 additions & 64 deletions crates/core/src/operations/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -968,81 +968,85 @@ impl Operation for GetOp {

// Put contract locally if needed
if should_put {
tracing::debug!(tx = %id, %key, %is_original_requester, %subscribe_requested, "Putting contract at executor");
let res = op_manager
.notify_contract_handler(ContractHandlerEvent::PutQuery {
// First check if the local state matches the incoming state
// to avoid triggering validation errors in contracts that implement
// idempotency checks in their update_state() method (issue #2018)
let local_state = op_manager
.notify_contract_handler(ContractHandlerEvent::GetQuery {
key,
state: value.clone(),
related_contracts: RelatedContracts::default(), // fixme: i think we need to get the related contracts so the final put is ok
contract: contract.clone(),
return_contract_code: false,
})
.await?;

match res {
ContractHandlerEvent::PutResponse { new_value: Ok(_) } => {
tracing::debug!(tx = %id, %key, "Contract put at executor");
let is_subscribed_contract =
op_manager.ring.is_seeding_contract(&key);

// Start subscription if not already seeding
if !is_subscribed_contract {
tracing::debug!(tx = %id, %key, peer = %op_manager.ring.connection_manager.get_peer_key().unwrap(), "Contract not cached @ peer, caching");
op_manager.ring.seed_contract(key);
let mut new_skip_list = skip_list.clone();
new_skip_list.insert(sender.peer.clone());

super::start_subscription_request(op_manager, key).await;
}
.await;

let state_matches = match local_state {
Ok(ContractHandlerEvent::GetResponse {
response:
Ok(StoreResponse {
state: Some(local), ..
}),
..
}) => {
// Compare the actual state bytes
local.as_ref() == value.as_ref()
}
ContractHandlerEvent::PutResponse {
new_value: Err(err),
} => {
if is_original_requester {
// Original requester, return error
tracing::debug!(tx = %id, error = %err, "Failed put at executor");
return Err(OpError::ExecutorError(err));
} else {
// Forward error to requester
let mut new_skip_list = skip_list.clone();
new_skip_list.insert(sender.peer.clone());
_ => false, // No local state or error - we should try to cache
};

let requester = requester.unwrap();
if state_matches {
tracing::debug!(
tx = %id,
%key,
"Local state matches network state, skipping redundant cache"
);
// State already cached and identical, mark as seeded if needed
if !op_manager.ring.is_seeding_contract(&key) {
tracing::debug!(tx = %id, %key, "Marking contract as seeded");
op_manager.ring.seed_contract(key);
super::start_subscription_request(op_manager, key).await;
}
} else {
tracing::debug!(tx = %id, %key, %is_original_requester, %subscribe_requested, "Putting contract at executor - state differs from local cache");
let res = op_manager
.notify_contract_handler(ContractHandlerEvent::PutQuery {
key,
state: value.clone(),
related_contracts: RelatedContracts::default(), // fixme: i think we need to get the related contracts so the final put is ok
contract: contract.clone(),
})
.await?;

match res {
ContractHandlerEvent::PutResponse { new_value: Ok(_) } => {
tracing::debug!(tx = %id, %key, "Contract put at executor");
let is_subscribed_contract =
op_manager.ring.is_seeding_contract(&key);

// Start subscription if not already seeding
if !is_subscribed_contract {
tracing::debug!(tx = %id, %key, peer = %op_manager.ring.connection_manager.get_peer_key().unwrap(), "Contract not cached @ peer, caching");
op_manager.ring.seed_contract(key);
super::start_subscription_request(op_manager, key).await;
}
}
ContractHandlerEvent::PutResponse {
new_value: Err(err),
} => {
// Local caching failed, but GET operation succeeded
// Log warning and continue - caching is an optimization, not required
tracing::warn!(
tx = %id,
%key,
%sender.peer,
target = %requester,
"Failed put at executor, returning response to requester",
error = %err,
%is_original_requester,
"Failed to cache contract locally during GET - continuing with operation"
);

op_manager
.notify_op_change(
NetMessage::from(GetMsg::ReturnGet {
id,
key,
value: StoreResponse {
state: None,
contract: None,
},
sender: sender.clone(),
target: requester.clone(),
skip_list: new_skip_list,
}),
OpEnum::Get(GetOp {
id,
state: self.state,
result: None,
stats,
}),
)
.await?;
return Err(OpError::StatePushed);
// Don't return error - the GET succeeded, caching is optional
// Continue to process the GET result below
}
_ => unreachable!(
"PutQuery from Get operation should always return PutResponse"
),
}
_ => unreachable!(
"PutQuery from Get operation should always return PutResponse"
),
}
}

Expand Down
Loading