diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index dcb5e9927..98123cf40 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -300,6 +300,12 @@ impl ContractExecutor for Executor { if updated_state.as_ref() == current_state.as_ref() { Ok(UpsertResult::NoChange) } else { + // Persist the updated state before returning + self.state_store + .update(&key, updated_state.clone()) + .await + .map_err(ExecutorError::other)?; + // todo: forward delta like we are doing with puts tracing::warn!("Delta updates are not yet supported"); Ok(UpsertResult::Updated(updated_state)) diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index 10aa328dd..ad3597a64 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -537,6 +537,262 @@ async fn test_update_contract() -> TestResult { Ok(()) } +/// Test that a second PUT to an already cached contract persists the merged state. +/// This is a regression test for issue #1995. +#[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 4))] +async fn test_put_merge_persists_state() -> TestResult { + // Load test contract + const TEST_CONTRACT: &str = "test-contract-integration"; + let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?; + let contract_key = contract.key(); + + // Create initial state with empty todo list + let initial_state = test_utils::create_empty_todo_list(); + let initial_wrapped_state = WrappedState::from(initial_state); + + // Create network sockets + let network_socket_b = TcpListener::bind("127.0.0.1:0")?; + let ws_api_port_socket_a = TcpListener::bind("127.0.0.1:0")?; + let ws_api_port_socket_b = TcpListener::bind("127.0.0.1:0")?; + + // Configure gateway node B + let (config_b, preset_cfg_b, config_b_gw) = { + let (cfg, preset) = base_node_test_config( + true, + vec![], + Some(network_socket_b.local_addr()?.port()), + ws_api_port_socket_b.local_addr()?.port(), + ) + .await?; + let public_port = cfg.network_api.public_port.unwrap(); + let path = preset.temp_dir.path().to_path_buf(); + (cfg, preset, gw_config(public_port, &path)?) + }; + let ws_api_port_peer_b = config_b.ws_api.ws_api_port.unwrap(); + + // Configure peer node A + let (config_a, preset_cfg_a) = base_node_test_config( + false, + vec![serde_json::to_string(&config_b_gw)?], + None, + ws_api_port_socket_a.local_addr()?.port(), + ) + .await?; + let ws_api_port_peer_a = config_a.ws_api.ws_api_port.unwrap(); + + tracing::info!("Node A data dir: {:?}", preset_cfg_a.temp_dir.path()); + tracing::info!("Node B (gw) data dir: {:?}", preset_cfg_b.temp_dir.path()); + + // Start node A (peer) + std::mem::drop(ws_api_port_socket_a); + let node_a = async move { + let _span = with_peer_id("peer-a"); + tracing::info!("Starting peer A node"); + let config = config_a.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + tracing::info!("Peer A node running"); + node.run().await + } + .boxed_local(); + + // Start node B (gateway) + std::mem::drop(network_socket_b); + std::mem::drop(ws_api_port_socket_b); + let node_b = async { + let _span = with_peer_id("gateway"); + tracing::info!("Starting gateway node"); + let config = config_b.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + tracing::info!("Gateway node running"); + node.run().await + } + .boxed_local(); + + let test = tokio::time::timeout(Duration::from_secs(180), async { + // Wait for nodes to start up + tracing::info!("Waiting for nodes to start up..."); + tokio::time::sleep(Duration::from_secs(15)).await; + tracing::info!("Nodes should be ready, proceeding with test..."); + + // Connect to node A's websocket API + let uri = format!( + "ws://127.0.0.1:{ws_api_port_peer_a}/v1/contract/command?encodingProtocol=native" + ); + let (stream, _) = connect_async(&uri).await?; + let mut client_api_a = WebApi::start(stream); + + // First PUT: Store initial contract state + tracing::info!("Sending first PUT with initial state..."); + make_put( + &mut client_api_a, + initial_wrapped_state.clone(), + contract.clone(), + false, + ) + .await?; + + // Wait for first put response + let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => { + tracing::info!("First PUT successful for contract: {}", key); + assert_eq!(key, contract_key); + } + Ok(Ok(other)) => { + bail!("Unexpected response for first PUT: {:?}", other); + } + Ok(Err(e)) => { + bail!("Error receiving first PUT response: {}", e); + } + Err(_) => { + bail!("Timeout waiting for first PUT response"); + } + } + + // Wait a bit to ensure state is fully cached + tokio::time::sleep(Duration::from_secs(2)).await; + + // Create updated state with more data (simulating a state merge) + let mut updated_todo_list: test_utils::TodoList = + serde_json::from_slice(initial_wrapped_state.as_ref()).unwrap(); + + // Add multiple tasks to make the state larger + for i in 1..=5 { + updated_todo_list.tasks.push(test_utils::Task { + id: i, + title: format!("Task {}", i), + description: format!("Description for task {}", i), + completed: false, + priority: i as u8, + }); + } + + let updated_bytes = serde_json::to_vec(&updated_todo_list).unwrap(); + let updated_wrapped_state = WrappedState::from(updated_bytes); + + tracing::info!( + "Initial state size: {} bytes, Updated state size: {} bytes", + initial_wrapped_state.as_ref().len(), + updated_wrapped_state.as_ref().len() + ); + + // Second PUT: Update the already-cached contract with new state + // This tests the bug fix - the merged state should be persisted + tracing::info!("Sending second PUT with updated state..."); + make_put( + &mut client_api_a, + updated_wrapped_state.clone(), + contract.clone(), + false, + ) + .await?; + + // Wait for second put response + let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => { + tracing::info!("Second PUT successful for contract: {}", key); + assert_eq!(key, contract_key); + } + Ok(Ok(other)) => { + bail!("Unexpected response for second PUT: {:?}", other); + } + Ok(Err(e)) => { + bail!("Error receiving second PUT response: {}", e); + } + Err(_) => { + bail!("Timeout waiting for second PUT response"); + } + } + + // Wait a bit to ensure the merge and persistence completes + tokio::time::sleep(Duration::from_secs(2)).await; + + // The key test: GET from gateway to verify it persisted the merged state + // This is the bug from issue #1995 - gateway receives PUT for already-cached + // contract, merges state, but doesn't persist it + let uri = format!( + "ws://127.0.0.1:{ws_api_port_peer_b}/v1/contract/command?encodingProtocol=native" + ); + let (stream, _) = connect_async(&uri).await?; + let mut client_api_gateway = WebApi::start(stream); + + tracing::info!("Getting contract from gateway to verify merged state was persisted..."); + let (response_contract_gw, response_state_gw) = get_contract( + &mut client_api_gateway, + contract_key, + &preset_cfg_b.temp_dir, + ) + .await?; + + assert_eq!(response_contract_gw.key(), contract_key); + + let response_todo_list_gw: test_utils::TodoList = + serde_json::from_slice(response_state_gw.as_ref()) + .expect("Failed to deserialize state from gateway"); + + tracing::info!( + "Gateway returned state with {} tasks, size {} bytes", + response_todo_list_gw.tasks.len(), + response_state_gw.as_ref().len() + ); + + // This is the key assertion for issue #1995: + // Gateway received a PUT for an already-cached contract, merged the states, + // and should have PERSISTED the merged state (not just computed it) + assert_eq!( + response_todo_list_gw.tasks.len(), + 5, + "Gateway should return merged state with 5 tasks (issue #1995: merged state must be persisted)" + ); + + // Verify the state size matches as additional confirmation + assert_eq!( + response_state_gw.as_ref().len(), + updated_wrapped_state.as_ref().len(), + "Gateway state size should match the updated state" + ); + + tracing::info!( + "✓ Test passed: Gateway correctly persisted merged state after second PUT (issue #1995 fixed)" + ); + + // Cleanup + client_api_a + .send(ClientRequest::Disconnect { cause: None }) + .await?; + client_api_gateway + .send(ClientRequest::Disconnect { cause: None }) + .await?; + tokio::time::sleep(Duration::from_millis(100)).await; + + Ok::<_, anyhow::Error>(()) + }); + + select! { + a = node_a => { + let Err(a) = a; + return Err(anyhow!(a).into()); + } + b = node_b => { + let Err(b) = b; + return Err(anyhow!(b).into()); + } + r = test => { + r??; + tokio::time::sleep(Duration::from_secs(3)).await; + } + } + + Ok(()) +} + // This test is disabled due to race conditions in subscription propagation logic. // The test expects multiple clients across different nodes to receive subscription updates, // but the PUT caching refactor (commits 2cd337b5-0d432347) changed the subscription semantics.