diff --git a/hyperprocess_macro/Cargo.toml b/hyperprocess_macro/Cargo.toml index 74f4b76..ca00597 100644 --- a/hyperprocess_macro/Cargo.toml +++ b/hyperprocess_macro/Cargo.toml @@ -14,8 +14,7 @@ syn = { version = "2.0", features = ["full"] } anyhow = "1.0" futures-util = "0.3" hyperware_app_common = { path = "../hyperware_app_common" } -#hyperware_process_lib = { version = "1.0.4", features = ["logging"] } -hyperware_process_lib = { git = "https://github.com/hyperware-ai/process_lib", features = ["logging"], rev = "b7c9d27" } +hyperware_process_lib = { git = "https://github.com/hyperware-ai/process_lib", features = ["logging"], rev = "cfd6843" } once_cell = "1.20.2" paste = "1.0" process_macros = "0.1.0" diff --git a/hyperprocess_macro/src/lib.rs b/hyperprocess_macro/src/lib.rs index 29061f7..6164c03 100644 --- a/hyperprocess_macro/src/lib.rs +++ b/hyperprocess_macro/src/lib.rs @@ -1252,10 +1252,6 @@ fn generate_message_handlers( format!("Handler {} requires a request body", stringify!(#fn_name)).into_bytes() ); } - - hyperware_app_common::APP_HELPERS.with(|ctx| { - ctx.borrow_mut().current_path = None; - }); return; } } @@ -1308,19 +1304,8 @@ fn generate_message_handlers( let handler_body = if handler.is_async { quote! { - // Capture context values before async execution - let current_path = hyperware_app_common::get_path(); - let current_method = hyperware_app_common::get_http_method(); - let state_ptr: *mut #self_ty = state; hyperware_app_common::hyper! { - // Restore context in the async task - hyperware_app_common::APP_HELPERS.with(|ctx| { - let mut ctx_mut = ctx.borrow_mut(); - ctx_mut.current_path = current_path; - ctx_mut.current_http_method = current_method; - }); - let result = unsafe { (*state_ptr).#fn_name().await }; #response_handling } @@ -1340,9 +1325,6 @@ fn generate_message_handlers( if #path_check && #method_check { hyperware_process_lib::logging::debug!("Matched parameter-less handler {} for {} {}", stringify!(#fn_name), http_method, current_path); #handler_body - hyperware_app_common::APP_HELPERS.with(|ctx| { - ctx.borrow_mut().current_path = None; - }); return; } } @@ -1418,9 +1400,6 @@ fn generate_message_handlers( #http_request_match_arms hyperware_app_common::maybe_save_state(&mut *state); } - hyperware_app_common::APP_HELPERS.with(|ctx| { - ctx.borrow_mut().current_path = None; - }); return; }, Err(e) => { @@ -1446,9 +1425,6 @@ fn generate_message_handlers( None, error_details.into_bytes() ); - hyperware_app_common::APP_HELPERS.with(|ctx| { - ctx.borrow_mut().current_path = None; - }); return; } } @@ -1466,9 +1442,6 @@ fn generate_message_handlers( None, format!("No handler found for {} {}", http_method, current_path).into_bytes(), ); - hyperware_app_common::APP_HELPERS.with(|ctx| { - ctx.borrow_mut().current_path = None; - }); }, hyperware_process_lib::http::server::HttpServerRequest::WebSocketPush { channel_id, message_type } => { hyperware_process_lib::logging::debug!("Received WebSocket message on channel {}, type: {:?}", channel_id, message_type); @@ -1609,7 +1582,7 @@ fn generate_component_impl( // Extract values from args for use in the quote macro let name = &args.name; let endpoints = &args.endpoints; - let _save_config = &args.save_config; + let save_config = &args.save_config; let wit_world = &args.wit_world; let icon = match &args.icon { @@ -1683,6 +1656,11 @@ fn generate_component_impl( // Initialize our state let mut state = hyperware_app_common::initialize_state::<#self_ty>(); + // Set to persist state according to user setting + hyperware_app_common::APP_CONTEXT.with(|ctx| { + ctx.borrow_mut().hidden_state = Some(hyperware_app_common::HiddenState::new(#save_config)); + }); + // Set up necessary components let app_name = #name; let app_icon = #icon; @@ -1719,6 +1697,11 @@ fn generate_component_impl( hyperware_app_common::APP_HELPERS.with(|ctx| { ctx.borrow_mut().current_message = Some(message.clone()); }); + + // Store old state if needed (for OnDiff save option) + // This only stores if old_state is None (first time or after a save) + hyperware_app_common::store_old_state(&state); + match message { hyperware_process_lib::Message::Response { body, context, .. } => { let correlation_id = context @@ -1734,6 +1717,12 @@ fn generate_component_impl( hyperware_process_lib::Message::Request { .. } => { if message.is_local() && message.source().process == "http-server:distro:sys" { handle_http_server_message(&mut state, message); + hyperware_app_common::APP_HELPERS.with(|ctx| { + let mut ctx_mut = ctx.borrow_mut(); + ctx_mut.current_path = None; + ctx_mut.current_http_method = None; + ctx_mut.response_headers = std::collections::HashMap::new(); + }); } else if message.is_local() { handle_local_message(&mut state, message); } else { diff --git a/hyperware_app_common/Cargo.toml b/hyperware_app_common/Cargo.toml index b2c2ba4..ecaaa8c 100644 --- a/hyperware_app_common/Cargo.toml +++ b/hyperware_app_common/Cargo.toml @@ -6,8 +6,7 @@ publish = false [dependencies] anyhow = "1.0" -#hyperware_process_lib = { version = "1.0.4", features = ["logging"] } -hyperware_process_lib = { git = "https://github.com/hyperware-ai/process_lib", features = ["logging"], rev = "b7c9d27" } +hyperware_process_lib = { git = "https://github.com/hyperware-ai/process_lib", features = ["logging"], rev = "cfd6843" } futures-util = "0.3" once_cell = "1.20.2" paste = "1.0" diff --git a/hyperware_app_common/src/lib.rs b/hyperware_app_common/src/lib.rs index abab84b..4f81abf 100644 --- a/hyperware_app_common/src/lib.rs +++ b/hyperware_app_common/src/lib.rs @@ -9,7 +9,7 @@ use futures_util::task::noop_waker_ref; use hyperware_process_lib::http::server::HttpServer; use hyperware_process_lib::logging::info; use hyperware_process_lib::{ - get_state, http, kiprintln, set_state, BuildError, LazyLoadBlob, Message, Request, SendError, + get_state, http, kiprintln, set_state, timer, BuildError, LazyLoadBlob, Message, Request, SendError, }; use serde::Deserialize; use serde::Serialize; @@ -186,6 +186,21 @@ pub enum AppSendError { BuildError(BuildError), } +pub async fn sleep(sleep_ms: u64) -> Result<(), AppSendError> { + let request = Request::to(("our", "timer", "distro", "sys")) + .body(timer::TimerAction::SetTimer(sleep_ms)) + .expects_response((sleep_ms / 1_000) + 1); + + let correlation_id = Uuid::new_v4().to_string(); + if let Err(e) = request.context(correlation_id.as_bytes().to_vec()).send() { + return Err(AppSendError::BuildError(e)); + } + + let _ = ResponseFuture::new(correlation_id).await; + + return Ok(()); +} + pub async fn send(request: Request) -> Result where R: serde::de::DeserializeOwned, @@ -258,10 +273,13 @@ pub enum SaveOptions { EveryNMessage(u64), // Persist State Every N Seconds EveryNSeconds(u64), + // Persist State Only If Changed + OnDiff, } pub struct HiddenState { save_config: SaveOptions, message_count: u64, + old_state: Option>, // Stores the serialized state from before message processing } impl HiddenState { @@ -269,6 +287,7 @@ impl HiddenState { Self { save_config, message_count: 0, + old_state: None, } } @@ -286,12 +305,32 @@ impl HiddenState { } } SaveOptions::EveryNSeconds(_) => false, // Handled by timer instead + SaveOptions::OnDiff => false, // Will be handled separately with state comparison } } } // TODO: We need a timer macro again. +/// Store a snapshot of the current state before processing a message +/// This is used for OnDiff save option to compare state before and after +/// Only stores if old_state is None (i.e., first time or after a save) +pub fn store_old_state(state: &S) +where + S: serde::Serialize, +{ + APP_CONTEXT.with(|ctx| { + let mut ctx_mut = ctx.borrow_mut(); + if let Some(ref mut hidden_state) = ctx_mut.hidden_state { + if matches!(hidden_state.save_config, SaveOptions::OnDiff) && hidden_state.old_state.is_none() { + if let Ok(s_bytes) = rmp_serde::to_vec(state) { + hidden_state.old_state = Some(s_bytes); + } + } + } + }); +} + /// Trait that must be implemented by application state types pub trait State { /// Creates a new instance of the state. @@ -439,10 +478,35 @@ where APP_CONTEXT.with(|ctx| { let mut ctx_mut = ctx.borrow_mut(); if let Some(ref mut hidden_state) = ctx_mut.hidden_state { - if hidden_state.should_save_state() { + let should_save = if matches!(hidden_state.save_config, SaveOptions::OnDiff) { + // For OnDiff, compare current state with old state + if let Ok(current_bytes) = rmp_serde::to_vec(state) { + let state_changed = match &hidden_state.old_state { + Some(old_bytes) => old_bytes != ¤t_bytes, + None => true, // If no old state, consider it changed + }; + + if state_changed { + true + } else { + false + } + } else { + false + } + } else { + hidden_state.should_save_state() + }; + + if should_save { if let Ok(s_bytes) = rmp_serde::to_vec(state) { kiprintln!("State persisted"); let _ = set_state(&s_bytes); + + // Clear old_state after saving so it can be set again on next message + if matches!(hidden_state.save_config, SaveOptions::OnDiff) { + hidden_state.old_state = None; + } } } }