Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/core/src/contract/executor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ impl ContractExecutor for Executor<Runtime> {
if updated_state.as_ref() == current_state.as_ref() {
Ok(UpsertResult::NoChange)
} else {
// Persist the updated state before returning
self.state_store
.update(&key, updated_state.clone())
.await
.map_err(ExecutorError::other)?;

// todo: forward delta like we are doing with puts
tracing::warn!("Delta updates are not yet supported");
Ok(UpsertResult::Updated(updated_state))
Expand Down
256 changes: 256 additions & 0 deletions crates/core/tests/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,262 @@ async fn test_update_contract() -> TestResult {
Ok(())
}

/// Test that a second PUT to an already cached contract persists the merged state.
/// This is a regression test for issue #1995.
#[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 4))]
async fn test_put_merge_persists_state() -> TestResult {
// Load test contract
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();

// Create initial state with empty todo list
let initial_state = test_utils::create_empty_todo_list();
let initial_wrapped_state = WrappedState::from(initial_state);

// Create network sockets
let network_socket_b = TcpListener::bind("127.0.0.1:0")?;
let ws_api_port_socket_a = TcpListener::bind("127.0.0.1:0")?;
let ws_api_port_socket_b = TcpListener::bind("127.0.0.1:0")?;

// Configure gateway node B
let (config_b, preset_cfg_b, config_b_gw) = {
let (cfg, preset) = base_node_test_config(
true,
vec![],
Some(network_socket_b.local_addr()?.port()),
ws_api_port_socket_b.local_addr()?.port(),
)
.await?;
let public_port = cfg.network_api.public_port.unwrap();
let path = preset.temp_dir.path().to_path_buf();
(cfg, preset, gw_config(public_port, &path)?)
};
let ws_api_port_peer_b = config_b.ws_api.ws_api_port.unwrap();

// Configure peer node A
let (config_a, preset_cfg_a) = base_node_test_config(
false,
vec![serde_json::to_string(&config_b_gw)?],
None,
ws_api_port_socket_a.local_addr()?.port(),
)
.await?;
let ws_api_port_peer_a = config_a.ws_api.ws_api_port.unwrap();

tracing::info!("Node A data dir: {:?}", preset_cfg_a.temp_dir.path());
tracing::info!("Node B (gw) data dir: {:?}", preset_cfg_b.temp_dir.path());

// Start node A (peer)
std::mem::drop(ws_api_port_socket_a);
let node_a = async move {
let _span = with_peer_id("peer-a");
tracing::info!("Starting peer A node");
let config = config_a.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.await?;
tracing::info!("Peer A node running");
node.run().await
}
.boxed_local();

// Start node B (gateway)
std::mem::drop(network_socket_b);
std::mem::drop(ws_api_port_socket_b);
let node_b = async {
let _span = with_peer_id("gateway");
tracing::info!("Starting gateway node");
let config = config_b.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.await?;
tracing::info!("Gateway node running");
node.run().await
}
.boxed_local();

let test = tokio::time::timeout(Duration::from_secs(180), async {
// Wait for nodes to start up
tracing::info!("Waiting for nodes to start up...");
tokio::time::sleep(Duration::from_secs(15)).await;
tracing::info!("Nodes should be ready, proceeding with test...");

// Connect to node A's websocket API
let uri = format!(
"ws://127.0.0.1:{ws_api_port_peer_a}/v1/contract/command?encodingProtocol=native"
);
let (stream, _) = connect_async(&uri).await?;
let mut client_api_a = WebApi::start(stream);

// First PUT: Store initial contract state
tracing::info!("Sending first PUT with initial state...");
make_put(
&mut client_api_a,
initial_wrapped_state.clone(),
contract.clone(),
false,
)
.await?;

// Wait for first put response
let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
tracing::info!("First PUT successful for contract: {}", key);
assert_eq!(key, contract_key);
}
Ok(Ok(other)) => {
bail!("Unexpected response for first PUT: {:?}", other);
}
Ok(Err(e)) => {
bail!("Error receiving first PUT response: {}", e);
}
Err(_) => {
bail!("Timeout waiting for first PUT response");
}
}

// Wait a bit to ensure state is fully cached
tokio::time::sleep(Duration::from_secs(2)).await;

// Create updated state with more data (simulating a state merge)
let mut updated_todo_list: test_utils::TodoList =
serde_json::from_slice(initial_wrapped_state.as_ref()).unwrap();

// Add multiple tasks to make the state larger
for i in 1..=5 {
updated_todo_list.tasks.push(test_utils::Task {
id: i,
title: format!("Task {}", i),
description: format!("Description for task {}", i),
completed: false,
priority: i as u8,
});
}

let updated_bytes = serde_json::to_vec(&updated_todo_list).unwrap();
let updated_wrapped_state = WrappedState::from(updated_bytes);

tracing::info!(
"Initial state size: {} bytes, Updated state size: {} bytes",
initial_wrapped_state.as_ref().len(),
updated_wrapped_state.as_ref().len()
);

// Second PUT: Update the already-cached contract with new state
// This tests the bug fix - the merged state should be persisted
tracing::info!("Sending second PUT with updated state...");
make_put(
&mut client_api_a,
updated_wrapped_state.clone(),
contract.clone(),
false,
)
.await?;

// Wait for second put response
let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
tracing::info!("Second PUT successful for contract: {}", key);
assert_eq!(key, contract_key);
}
Ok(Ok(other)) => {
bail!("Unexpected response for second PUT: {:?}", other);
}
Ok(Err(e)) => {
bail!("Error receiving second PUT response: {}", e);
}
Err(_) => {
bail!("Timeout waiting for second PUT response");
}
}

// Wait a bit to ensure the merge and persistence completes
tokio::time::sleep(Duration::from_secs(2)).await;

// The key test: GET from gateway to verify it persisted the merged state
// This is the bug from issue #1995 - gateway receives PUT for already-cached
// contract, merges state, but doesn't persist it
let uri = format!(
"ws://127.0.0.1:{ws_api_port_peer_b}/v1/contract/command?encodingProtocol=native"
);
let (stream, _) = connect_async(&uri).await?;
let mut client_api_gateway = WebApi::start(stream);

tracing::info!("Getting contract from gateway to verify merged state was persisted...");
let (response_contract_gw, response_state_gw) = get_contract(
&mut client_api_gateway,
contract_key,
&preset_cfg_b.temp_dir,
)
.await?;

assert_eq!(response_contract_gw.key(), contract_key);

let response_todo_list_gw: test_utils::TodoList =
serde_json::from_slice(response_state_gw.as_ref())
.expect("Failed to deserialize state from gateway");

tracing::info!(
"Gateway returned state with {} tasks, size {} bytes",
response_todo_list_gw.tasks.len(),
response_state_gw.as_ref().len()
);

// This is the key assertion for issue #1995:
// Gateway received a PUT for an already-cached contract, merged the states,
// and should have PERSISTED the merged state (not just computed it)
assert_eq!(
response_todo_list_gw.tasks.len(),
5,
"Gateway should return merged state with 5 tasks (issue #1995: merged state must be persisted)"
);

// Verify the state size matches as additional confirmation
assert_eq!(
response_state_gw.as_ref().len(),
updated_wrapped_state.as_ref().len(),
"Gateway state size should match the updated state"
);

tracing::info!(
"✓ Test passed: Gateway correctly persisted merged state after second PUT (issue #1995 fixed)"
);

// Cleanup
client_api_a
.send(ClientRequest::Disconnect { cause: None })
.await?;
client_api_gateway
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;

Ok::<_, anyhow::Error>(())
});

select! {
a = node_a => {
let Err(a) = a;
return Err(anyhow!(a).into());
}
b = node_b => {
let Err(b) = b;
return Err(anyhow!(b).into());
}
r = test => {
r??;
tokio::time::sleep(Duration::from_secs(3)).await;
}
}

Ok(())
}

// This test is disabled due to race conditions in subscription propagation logic.
// The test expects multiple clients across different nodes to receive subscription updates,
// but the PUT caching refactor (commits 2cd337b5-0d432347) changed the subscription semantics.
Expand Down
Loading