Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions hyperprocess_macro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ syn = { version = "2.0", features = ["full"] }
anyhow = "1.0"
futures-util = "0.3"
hyperware_app_common = { path = "../hyperware_app_common" }
#hyperware_process_lib = { version = "1.0.4", features = ["logging"] }
hyperware_process_lib = { git = "https://github.com/hyperware-ai/process_lib", features = ["logging"], rev = "b7c9d27" }
hyperware_process_lib = { git = "https://github.com/hyperware-ai/process_lib", features = ["logging"], rev = "cfd6843" }
once_cell = "1.20.2"
paste = "1.0"
process_macros = "0.1.0"
Expand Down
45 changes: 17 additions & 28 deletions hyperprocess_macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,10 +1252,6 @@ fn generate_message_handlers(
format!("Handler {} requires a request body", stringify!(#fn_name)).into_bytes()
);
}

hyperware_app_common::APP_HELPERS.with(|ctx| {
ctx.borrow_mut().current_path = None;
});
return;
}
}
Expand Down Expand Up @@ -1308,19 +1304,8 @@ fn generate_message_handlers(

let handler_body = if handler.is_async {
quote! {
// Capture context values before async execution
let current_path = hyperware_app_common::get_path();
let current_method = hyperware_app_common::get_http_method();

let state_ptr: *mut #self_ty = state;
hyperware_app_common::hyper! {
// Restore context in the async task
hyperware_app_common::APP_HELPERS.with(|ctx| {
let mut ctx_mut = ctx.borrow_mut();
ctx_mut.current_path = current_path;
ctx_mut.current_http_method = current_method;
});

let result = unsafe { (*state_ptr).#fn_name().await };
#response_handling
}
Expand All @@ -1340,9 +1325,6 @@ fn generate_message_handlers(
if #path_check && #method_check {
hyperware_process_lib::logging::debug!("Matched parameter-less handler {} for {} {}", stringify!(#fn_name), http_method, current_path);
#handler_body
hyperware_app_common::APP_HELPERS.with(|ctx| {
ctx.borrow_mut().current_path = None;
});
return;
}
}
Expand Down Expand Up @@ -1418,9 +1400,6 @@ fn generate_message_handlers(
#http_request_match_arms
hyperware_app_common::maybe_save_state(&mut *state);
}
hyperware_app_common::APP_HELPERS.with(|ctx| {
ctx.borrow_mut().current_path = None;
});
return;
},
Err(e) => {
Expand All @@ -1446,9 +1425,6 @@ fn generate_message_handlers(
None,
error_details.into_bytes()
);
hyperware_app_common::APP_HELPERS.with(|ctx| {
ctx.borrow_mut().current_path = None;
});
return;
}
}
Expand All @@ -1466,9 +1442,6 @@ fn generate_message_handlers(
None,
format!("No handler found for {} {}", http_method, current_path).into_bytes(),
);
hyperware_app_common::APP_HELPERS.with(|ctx| {
ctx.borrow_mut().current_path = None;
});
},
hyperware_process_lib::http::server::HttpServerRequest::WebSocketPush { channel_id, message_type } => {
hyperware_process_lib::logging::debug!("Received WebSocket message on channel {}, type: {:?}", channel_id, message_type);
Expand Down Expand Up @@ -1609,7 +1582,7 @@ fn generate_component_impl(
// Extract values from args for use in the quote macro
let name = &args.name;
let endpoints = &args.endpoints;
let _save_config = &args.save_config;
let save_config = &args.save_config;
let wit_world = &args.wit_world;

let icon = match &args.icon {
Expand Down Expand Up @@ -1683,6 +1656,11 @@ fn generate_component_impl(
// Initialize our state
let mut state = hyperware_app_common::initialize_state::<#self_ty>();

// Set to persist state according to user setting
hyperware_app_common::APP_CONTEXT.with(|ctx| {
ctx.borrow_mut().hidden_state = Some(hyperware_app_common::HiddenState::new(#save_config));
});

// Set up necessary components
let app_name = #name;
let app_icon = #icon;
Expand Down Expand Up @@ -1719,6 +1697,11 @@ fn generate_component_impl(
hyperware_app_common::APP_HELPERS.with(|ctx| {
ctx.borrow_mut().current_message = Some(message.clone());
});

// Store old state if needed (for OnDiff save option)
// This only stores if old_state is None (first time or after a save)
hyperware_app_common::store_old_state(&state);

match message {
hyperware_process_lib::Message::Response { body, context, .. } => {
let correlation_id = context
Expand All @@ -1734,6 +1717,12 @@ fn generate_component_impl(
hyperware_process_lib::Message::Request { .. } => {
if message.is_local() && message.source().process == "http-server:distro:sys" {
handle_http_server_message(&mut state, message);
hyperware_app_common::APP_HELPERS.with(|ctx| {
let mut ctx_mut = ctx.borrow_mut();
ctx_mut.current_path = None;
ctx_mut.current_http_method = None;
ctx_mut.response_headers = std::collections::HashMap::new();
});
} else if message.is_local() {
handle_local_message(&mut state, message);
} else {
Expand Down
3 changes: 1 addition & 2 deletions hyperware_app_common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ publish = false

[dependencies]
anyhow = "1.0"
#hyperware_process_lib = { version = "1.0.4", features = ["logging"] }
hyperware_process_lib = { git = "https://github.com/hyperware-ai/process_lib", features = ["logging"], rev = "b7c9d27" }
hyperware_process_lib = { git = "https://github.com/hyperware-ai/process_lib", features = ["logging"], rev = "cfd6843" }
futures-util = "0.3"
once_cell = "1.20.2"
paste = "1.0"
Expand Down
68 changes: 66 additions & 2 deletions hyperware_app_common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures_util::task::noop_waker_ref;
use hyperware_process_lib::http::server::HttpServer;
use hyperware_process_lib::logging::info;
use hyperware_process_lib::{
get_state, http, kiprintln, set_state, BuildError, LazyLoadBlob, Message, Request, SendError,
get_state, http, kiprintln, set_state, timer, BuildError, LazyLoadBlob, Message, Request, SendError,
};
use serde::Deserialize;
use serde::Serialize;
Expand Down Expand Up @@ -186,6 +186,21 @@ pub enum AppSendError {
BuildError(BuildError),
}

pub async fn sleep(sleep_ms: u64) -> Result<(), AppSendError> {
let request = Request::to(("our", "timer", "distro", "sys"))
.body(timer::TimerAction::SetTimer(sleep_ms))
.expects_response((sleep_ms / 1_000) + 1);

let correlation_id = Uuid::new_v4().to_string();
if let Err(e) = request.context(correlation_id.as_bytes().to_vec()).send() {
return Err(AppSendError::BuildError(e));
}

let _ = ResponseFuture::new(correlation_id).await;

return Ok(());
}

pub async fn send<R>(request: Request) -> Result<R, AppSendError>
where
R: serde::de::DeserializeOwned,
Expand Down Expand Up @@ -258,17 +273,21 @@ pub enum SaveOptions {
EveryNMessage(u64),
// Persist State Every N Seconds
EveryNSeconds(u64),
// Persist State Only If Changed
OnDiff,
}
pub struct HiddenState {
save_config: SaveOptions,
message_count: u64,
old_state: Option<Vec<u8>>, // Stores the serialized state from before message processing
}

impl HiddenState {
pub fn new(save_config: SaveOptions) -> Self {
Self {
save_config,
message_count: 0,
old_state: None,
}
}

Expand All @@ -286,12 +305,32 @@ impl HiddenState {
}
}
SaveOptions::EveryNSeconds(_) => false, // Handled by timer instead
SaveOptions::OnDiff => false, // Will be handled separately with state comparison
}
}
}

// TODO: We need a timer macro again.

/// Store a snapshot of the current state before processing a message
/// This is used for OnDiff save option to compare state before and after
/// Only stores if old_state is None (i.e., first time or after a save)
pub fn store_old_state<S>(state: &S)
where
S: serde::Serialize,
{
APP_CONTEXT.with(|ctx| {
let mut ctx_mut = ctx.borrow_mut();
if let Some(ref mut hidden_state) = ctx_mut.hidden_state {
if matches!(hidden_state.save_config, SaveOptions::OnDiff) && hidden_state.old_state.is_none() {
if let Ok(s_bytes) = rmp_serde::to_vec(state) {
hidden_state.old_state = Some(s_bytes);
}
}
}
});
}

/// Trait that must be implemented by application state types
pub trait State {
/// Creates a new instance of the state.
Expand Down Expand Up @@ -439,10 +478,35 @@ where
APP_CONTEXT.with(|ctx| {
let mut ctx_mut = ctx.borrow_mut();
if let Some(ref mut hidden_state) = ctx_mut.hidden_state {
if hidden_state.should_save_state() {
let should_save = if matches!(hidden_state.save_config, SaveOptions::OnDiff) {
// For OnDiff, compare current state with old state
if let Ok(current_bytes) = rmp_serde::to_vec(state) {
let state_changed = match &hidden_state.old_state {
Some(old_bytes) => old_bytes != &current_bytes,
None => true, // If no old state, consider it changed
};

if state_changed {
true
} else {
false
}
} else {
false
}
} else {
hidden_state.should_save_state()
};

if should_save {
if let Ok(s_bytes) = rmp_serde::to_vec(state) {
kiprintln!("State persisted");
let _ = set_state(&s_bytes);

// Clear old_state after saving so it can be set again on next message
if matches!(hidden_state.save_config, SaveOptions::OnDiff) {
hidden_state.old_state = None;
}
}
}
}
Expand Down