From 85917226e5dbf5194da0ad0366a049a4f1836fcb Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Mon, 18 Aug 2025 16:44:55 -0700 Subject: [PATCH 1/8] add #[ws_client] --- Cargo.lock | 2 +- README.md | 186 ++++++++++++++++++++++++++++---------------------- src/lib.rs | 194 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 293 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0e815b..638b39d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1609,7 +1609,7 @@ dependencies = [ [[package]] name = "hyperware_process_lib" version = "2.0.1" -source = "git+https://github.com/hyperware-ai/process_lib?rev=604edfd#604edfdb6e065c37102bdd64bda0ea2979a8e042" +source = "git+https://github.com/hyperware-ai/process_lib?rev=c27a881#c27a881e764920636ac025112533a714d622793c" dependencies = [ "alloy", "alloy-primitives", diff --git a/README.md b/README.md index d81c270..f1f1058 100644 --- a/README.md +++ b/README.md @@ -72,9 +72,9 @@ struct MyProcessState { name = "My Process", ui = Some(HttpBindingConfig::default()), endpoints = vec![ - Binding::Http { - path: "/api", - config: HttpBindingConfig::new(false, false, false, None) + Binding::Http { + path: "/api", + config: HttpBindingConfig::new(false, false, false, None) } ], save_config = SaveOptions::EveryMessage, @@ -85,7 +85,7 @@ impl MyProcessState { async fn initialize(&mut self) { // Initialize your process } - + #[http] async fn handle_http_request(&mut self, value: String) -> String { self.counter += 1; @@ -216,14 +216,14 @@ fn handle_any_method(&mut self) -> Response { **Supported Methods**: `GET`, `POST`, `PUT`, `DELETE`, `PATCH`, `HEAD`, `OPTIONS` -### Smart Routing System +### Smart Routing System The framework uses intelligent priority-based routing that automatically chooses the best handler based on the request: #### **Priority Logic:** 1. **Has Request Body** → Tries parameterized handlers first - - Deserializes body to determine the correct handler + - Deserializes body to determine the correct handler - Falls back to parameter-less handlers if deserialization fails 2. **No Request Body** → Tries parameter-less handlers first @@ -238,7 +238,7 @@ The framework uses intelligent priority-based routing that automatically chooses #[http(method = "GET", path = "/health")] fn health_check(&mut self) -> &'static str { "OK" } -#[http(method = "DELETE", path = "/api/users")] +#[http(method = "DELETE", path = "/api/users")] fn delete_all_users(&mut self) -> Result { // DELETE requests typically have no body self.users.clear(); @@ -276,10 +276,10 @@ async fn async_handler(&mut self, data: MyData) -> Result { // get_path() and get_http_method() work correctly in async handlers! let path = get_path().unwrap_or_default(); let method = get_http_method().unwrap_or_default(); - + // Make async calls to other services let result = external_api_call(data).await?; - + Ok(format!("Processed {} {} with result: {}", method, path, result)) } ``` @@ -289,7 +289,7 @@ async fn async_handler(&mut self, data: MyData) -> Result { ```rust // Request: POST /api/upload (with body) // 1. ✅ Tries: create_item handler if body matches {"CreateItem": ...} -// 2. ✅ Tries: update_settings handler if body matches {"UpdateSettings": ...} +// 2. ✅ Tries: update_settings handler if body matches {"UpdateSettings": ...} // 3. ✅ Falls back to: handle_post_with_data for unmatched bodies // 4. ✅ Ultimate fallback: handle_any_method @@ -318,7 +318,7 @@ impl MyApp { // This handler ONLY responds to GET /api/users self.users.clone() } - + // Handler with parameters (path optional but recommended) #[http(method = "POST", path = "/api/users")] fn create_user(&mut self, user: NewUser) -> User { @@ -327,14 +327,14 @@ impl MyApp { self.users.push(user.clone()); user } - + // Parameter-less handler accepting all methods (path optional) #[http(path = "/api/status")] fn api_status(&mut self) -> Status { // This handles ALL methods to /api/status Status { healthy: true } } - + // Parameter-less handler for any path - uses get_path() for routing #[http] fn dynamic_handler(&mut self) -> Response { @@ -344,7 +344,7 @@ impl MyApp { _ => Response::not_found("Unknown endpoint") } } - + // Handler with parameters without specific path #[http(method = "POST")] fn generic_post_handler(&mut self, data: GenericData) -> Response { @@ -389,18 +389,40 @@ async fn initialize(&mut self) { #### WebSocket Handler -For defining a `ws` endpoint, do: +For defining a `ws` endpoint (server-side WebSocket), do: ```rust #[ws] fn handle_websocket(&mut self, channel_id: u32, message_type: WsMessageType, blob: LazyLoadBlob) { - // Process WebSocket messages + // Process WebSocket messages from connected clients } ``` if you have multiple ws endpoints, you can match on the ws endpoints with `get_path()`, which will give you an `Option`. if you want to access the http server, you can call `get_server()`, giving you access to `HttpServer`. +#### WebSocket Client Handler + +For handling WebSocket client connections (when your process acts as a WebSocket client), use: + +```rust +#[ws_client] +fn handle_ws_client(&mut self, channel_id: u32, request: HttpClientRequest) { + match request { + HttpClientRequest::WebSocketPush { channel_id, message_type } => { + // Handle incoming message from the WebSocket server + let blob = get_blob().unwrap(); + // Process the message... + }, + HttpClientRequest::WebSocketClose { channel_id } => { + // Handle connection close + } + } +} +``` + +This handler receives messages from WebSocket servers that your process has connected to using the `http-client:distro:sys` service. + ### Binding Endpoints The `endpoints` parameter configures HTTP and WebSocket endpoints: @@ -450,7 +472,7 @@ struct AsyncRequesterState { Binding::Http { path: "/api", config: HttpBindingConfig::new(false, false, false, None), - }, + }, Binding::Ws { path: "/ws", config: WsBindingConfig::new(false, false, false), @@ -527,22 +549,22 @@ fn search(&mut self) -> Vec { if let Some(params) = get_query_params() { // params is a HashMap with: // {"q" => "rust", "limit" => "20", "sort" => "date"} - + // Get search query (with default) let query = params.get("q") .map(|s| s.to_string()) .unwrap_or_else(|| "".to_string()); - + // Parse numeric parameters let limit = params.get("limit") .and_then(|s| s.parse::().ok()) .unwrap_or(10); - + // Get optional parameters let sort_by = params.get("sort") .map(|s| s.as_str()) .unwrap_or("relevance"); - + // Use the parameters self.perform_search(&query, limit, sort_by) } else { @@ -565,7 +587,7 @@ The macro generates detailed logging for all operations: ```rust // Automatically generated logs help track request flow: // Phase 1: Checking parameter-less handlers for path: '/api/users', method: 'GET' -// Successfully parsed HTTP path: '/api/users' +// Successfully parsed HTTP path: '/api/users' // Set current_path to: Some("/api/users") // Set current_http_method to: Some("GET") ``` @@ -574,11 +596,11 @@ The macro generates detailed logging for all operations: ``` // Wrong handler name -Invalid request format. Expected one of the parameterized handler formats, +Invalid request format. Expected one of the parameterized handler formats, but got: {"WrongHandler":{"message":"test"}} // Invalid JSON syntax -Invalid JSON in request body. Expected: {"CreateUser":[ ...parameters... ]}. +Invalid JSON in request body. Expected: {"CreateUser":[ ...parameters... ]}. Parse error: expected value at line 1 column 1 // Empty body for parameterized handler @@ -659,11 +681,11 @@ Content-Type: application/json } // ✅ This works - body must wrap parameters in handler name -POST /api/users +POST /api/users Content-Type: application/json { "CreateUser": { - "name": "John Doe", + "name": "John Doe", "email": "john@example.com" } } @@ -694,11 +716,11 @@ async fn create_user(&mut self, user: User) -> Result { ... } async fn async_handler(&mut self, data: MyData) -> String { // ✅ Works - context is preserved by the framework let path = get_path().unwrap_or_default(); - + // ⚠️ Potential issue - long-running tasks might lose context tokio::time::sleep(Duration::from_secs(30)).await; let path2 = get_path(); // May be None if context expires - + format!("Path: {}", path) } ``` @@ -749,7 +771,7 @@ fn handle_file_upload(&mut self) -> Result { #[http(method = "POST")] fn create_user(&mut self, user: User) -> User { ... } -#[http(method = "PUT")] +#[http(method = "PUT")] fn create_user(&mut self, user: User) -> User { ... } // ERROR: Duplicate CreateUser variant ``` @@ -767,7 +789,7 @@ fn update_user(&mut self, user: User) -> User { ... } fn user_handler(&mut self, user: User) -> User { match get_http_method().as_deref() { Some("POST") => self.create_user_impl(user), - Some("PUT") => self.update_user_impl(user), + Some("PUT") => self.update_user_impl(user), _ => panic!("Unsupported method") } } @@ -810,7 +832,7 @@ fn health_check(&mut self) -> &'static str { "OK" } #[http(method = "POST", path = "/api/users")] async fn create_specific_user(&mut self, user: NewUser) -> User { ... } -// Medium-priority dynamic handlers +// Medium-priority dynamic handlers #[http(method = "POST")] async fn create_general(&mut self, data: CreateData) -> Response { ... } @@ -823,7 +845,7 @@ fn catch_all(&mut self) -> Response { ... } 1. ✅ Matches `health_check` directly (exact path + method) 2. ❌ No body parsing attempted -**Request: `POST /api/users` with body `{"CreateSpecificUser": {...}}`** +**Request: `POST /api/users` with body `{"CreateSpecificUser": {...}}`** 1. ✅ Matches `create_specific_user` (path + method + body deserialization) 2. ❌ No fallback needed @@ -855,7 +877,7 @@ fn health_check(&mut self) -> &'static str { fn api_router(&mut self) -> Response { match (get_http_method().as_deref(), get_path().as_deref()) { (Some("GET"), Some("/api/users")) => self.list_users(), - (Some("GET"), Some("/api/stats")) => self.get_stats(), + (Some("GET"), Some("/api/stats")) => self.get_stats(), _ => Response::not_found("Endpoint not found") } } @@ -894,7 +916,7 @@ pub enum ApiError { fn validated_handler(&mut self, data: InputData) -> Result { data.validate() .map_err(|e| ApiError::ValidationError(e))?; - + self.process(data) .map_err(|_| ApiError::InternalError) } @@ -908,10 +930,10 @@ struct MyAppState { // ✅ Use reasonable defaults pub counter: u64, pub users: Vec, - + // ✅ Use Options for optional state pub last_sync: Option, - + // ✅ Group related data pub config: AppConfig, } @@ -922,7 +944,7 @@ impl MyAppState { self.counter += 1; self.counter } - + fn add_user(&mut self, user: User) -> Result<(), String> { if self.users.iter().any(|u| u.id == user.id) { return Err("User already exists".to_string()); @@ -944,7 +966,7 @@ async fn fetch_external_data(&mut self, query: String) -> Result &external_service_address(), 10 // timeout in seconds ).await; - + match result { SendResult::Success(response) => Ok(response.data), SendResult::Timeout => Err("Request timed out".to_string()), @@ -974,7 +996,7 @@ The macro will parse arguments like so: fn parse_args(attr_args: MetaList) -> syn::Result { // Parse attributes like name, icon, endpoints, etc. // Validate required parameters - + Ok(HyperProcessArgs { name: name.ok_or_else(|| syn::Error::new(span, "Missing 'name'"))?, icon, @@ -998,7 +1020,7 @@ fn validate_init_method(method: &syn::ImplItemFn) -> syn::Result<()> { "Init method must be declared as async", )); } - + // Check parameter and return types // ... } @@ -1164,7 +1186,7 @@ where { // Generate unique correlation ID let correlation_id = Uuid::new_v4().to_string(); - + // Send request with correlation ID let _ = Request::to(target) .body(serde_json::to_vec(&message).unwrap()) @@ -1174,7 +1196,7 @@ where // Await response with matching correlation ID let response_bytes = ResponseFuture::new(correlation_id).await; - + // Process response... } ``` @@ -1224,7 +1246,7 @@ loop { APP_CONTEXT.with(|ctx| { ctx.borrow_mut().executor.poll_all_tasks(); }); - + // Wait for next message (blocking) match await_message() { // Process message... @@ -1309,11 +1331,11 @@ For each handler, the macro generates dispatch code: ```rust Request::FetchData(id) => { let id_captured = id; // Capture parameter before moving - let state_ptr: *mut MyState = state; - + let state_ptr: *mut MyState = state; + hyper! { let result = unsafe { (*state_ptr).fetch_data(id_captured).await }; - + // For remote/local handlers let resp = Response::new() .body(serde_json::to_vec(&result).unwrap()); @@ -1334,8 +1356,8 @@ wit_bindgen::generate!({ world: #wit_world, generate_unused_types: true, additional_derives: [ - serde::Deserialize, - serde::Serialize, + serde::Deserialize, + serde::Serialize, process_macros::SerdeJsonInto ], }); @@ -1366,7 +1388,7 @@ impl Guest for Component { // Setup server with endpoints let mut server = setup_server(ui_config.as_ref(), &endpoints); - + // Call user's init method if provided if #init_method_ident.is_some() { #init_method_call @@ -1402,19 +1424,19 @@ graph TB classDef external fill:#222222,color:#ffffff,stroke:#444444,stroke-width:1px classDef dataflow fill:#008CBA,color:#ffffff,stroke:#0077A3,stroke-width:1px classDef annotation fill:none,color:#FF6600,stroke:none,stroke-width:0px - + %% BUILD PHASE - Where components are generated subgraph BuildPhase["⚙️ BUILD PHASE"] UserSrc[/"User Source Code #[hyperprocess] macro #[http], #[local], #[remote] methods"/] - + subgraph CodeGen["Code Generation Pipeline"] direction TB - + HyperBindgen["hyper-bindgen CLI Scans for #[hyperprocess]"] - + subgraph BindgenOutputs["hyper-bindgen Outputs"] direction LR WitFiles["WIT Files @@ -1424,98 +1446,98 @@ graph TB EnumStructs["Shared Enums & Structs Cross-process types"] end - + ProcMacro["hyperprocess Macro AST Transformation"] - + subgraph MacroOutputs["Macro Generated Code"] direction LR ReqResEnums["Request/Response Enums - Generated variants per handler - Parameter & return mappings"] - + HandlerDisp["Handler Dispatch Logic - HTTP/Local/Remote routing - Async handler spawning - Message serialization"] - + AsyncRuntime["Async Runtime Components - ResponseFuture impl - Correlation ID system - Executor & task management"] - + MainLoop["Component Implementation - Message loop - Task polling - Error handling"] end end - + %% Dev-time Connections UserSrc --> HyperBindgen UserSrc --> ProcMacro HyperBindgen --> BindgenOutputs ProcMacro --> MacroOutputs - + %% Final Compilation MacroOutputs --> WasmComp["WebAssembly Component WASI Preview 2"] BindgenOutputs --> WasmComp end - + %% RUNTIME PHASE - How processes execute subgraph RuntimePhase["⚡ RUNTIME PHASE"] subgraph Process["Process A"] direction TB - + InMsg[/"Incoming Messages"/] --> MsgLoop["Message Loop await_message()"] - + subgraph ProcessInternals["Process Internals"] direction LR - + MsgLoop --> MsgRouter{"Message Router"} MsgRouter -->|"HTTP"| HttpHandler["HTTP Handlers"] MsgRouter -->|"Local"| LocalHandler["Local Handlers"] MsgRouter -->|"Remote"| RemoteHandler["Remote Handlers"] MsgRouter -->|"WebSocket"| WsHandler["WebSocket Handlers"] MsgRouter -->|"Response"| RespHandler["Response Handler"] - + %% State management HttpHandler & LocalHandler & RemoteHandler & WsHandler --> AppState[("Application State SaveOptions::EveryMessage")] - + %% Async handling RespHandler --> RespRegistry["Response Registry correlation_id → response"] - + CallStub["RPC Stub Calls e.g. increment_counter_rpc()"] end - + %% Asynchronous execution AppState -.->|"Persist"| Storage[(Persistent Storage)] - + MsgLoop -.->|"Poll Tasks"| Executor["Async Executor poll_all_tasks()"] - + ProcessInternals -->|"Generate"| OutMsg[/"Outgoing Messages"/] end - + %% External communication points ExtClient1["HTTP Client"] & ExtClient2["WebSocket Client"] --> InMsg OutMsg --> Process2["Process B"] Process2 --> InMsg end - + %% ASYNC FLOW - Detailed sequence of async communication subgraph AsyncFlow["⚡ ASYNC MESSAGE EXCHANGE"] direction LR - + AF1["1️⃣ Call RPC Stub - increment_counter_rpc(target, 42)"] --> + increment_counter_rpc(target, 42)"] --> AF2["2️⃣ Generate UUID - correlation_id = uuid::new_v4()"] --> + correlation_id = uuid::new_v4()"] --> AF3["3️⃣ Create Future ResponseFuture(correlation_id)"] --> AF4["4️⃣ Send Request @@ -1531,21 +1553,21 @@ graph TB AF9["9️⃣ Future Polling ResponseFuture finds response and completes"] end - + %% KEY CONNECTIONS BETWEEN SECTIONS - + %% Build to Runtime WasmComp ==>|"Load Component"| Process - + %% Runtime to Async Flow CallStub ==>|"Initiates"| AF1 AF9 ==>|"Resume Future in"| Executor RespRegistry ===|"Powers"| AF8 - + %% Annotation for the Correlation ID system CorrelationNote["CORRELATION SYSTEM Tracks request→response with UUIDs"] -.-> RespRegistry - + %% Style elements class UserSrc,WitFiles,CallerUtils,EnumStructs,ReqResEnums,HandlerDisp,AsyncRuntime,MainLoop,WasmComp mainflow class MsgLoop,Executor,RespRegistry,RespHandler,AF2,AF8 accent @@ -1553,7 +1575,7 @@ graph TB class AF1,AF3,AF4,AF5,AF6,AF7,AF9 asyncflow class ExtClient1,ExtClient2,Process2,Storage,InMsg,OutMsg external class CorrelationNote annotation - + %% Subgraph styling style BuildPhase fill:#171717,stroke:#333333,color:#ffffff style CodeGen fill:#222222,stroke:#444444,color:#ffffff @@ -1568,4 +1590,4 @@ graph TB ## Todos -- Let the new kit templates make use of the new framework \ No newline at end of file +- Let the new kit templates make use of the new framework diff --git a/src/lib.rs b/src/lib.rs index 632460d..31a7548 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -120,6 +120,12 @@ struct WsMethodDetails { call: proc_macro2::TokenStream, } +/// WebSocket client method details for code generation +struct WsClientMethodDetails { + identifier: proc_macro2::TokenStream, + call: proc_macro2::TokenStream, +} + //------------------------------------------------------------------------------ // Parse Implementation //------------------------------------------------------------------------------ @@ -334,6 +340,7 @@ fn clean_impl_block(impl_block: &ItemImpl) -> ItemImpl { && !attr.path().is_ident("local") && !attr.path().is_ident("remote") && !attr.path().is_ident("ws") + && !attr.path().is_ident("ws_client") }); } } @@ -518,6 +525,61 @@ fn validate_websocket_method(method: &syn::ImplItemFn) -> syn::Result<()> { Ok(()) } +/// Validate the websocket client method signature +fn validate_websocket_client_method(method: &syn::ImplItemFn) -> syn::Result<()> { + // Ensure first param is &mut self + if !has_valid_self_receiver(method) { + return Err(syn::Error::new_spanned( + &method.sig, + "WebSocket client method must take &mut self as first parameter", + )); + } + + // Ensure there are exactly 3 parameters (including &mut self) + if method.sig.inputs.len() != 3 { + return Err(syn::Error::new_spanned( + &method.sig, + "WebSocket client method must take exactly 2 additional parameters: channel_id and request", + )); + } + + // Get parameters (excluding &mut self) + let params: Vec<_> = method.sig.inputs.iter().skip(1).collect(); + + // Check parameter types + let channel_id_param = ¶ms[0]; + let request_param = ¶ms[1]; + + if let syn::FnArg::Typed(pat_type) = channel_id_param { + if !pat_type.ty.to_token_stream().to_string().contains("u32") { + return Err(syn::Error::new_spanned( + pat_type, + "First parameter of WebSocket client method must be channel_id: u32", + )); + } + } + + if let syn::FnArg::Typed(pat_type) = request_param { + let type_str = pat_type.ty.to_token_stream().to_string(); + if !type_str.contains("HttpClientRequest") { + return Err(syn::Error::new_spanned( + pat_type, + "Second parameter of WebSocket client method must be request: HttpClientRequest", + )); + } + } + + // Validate return type (must be unit) + if !matches!(method.sig.output, ReturnType::Default) { + return Err(syn::Error::new_spanned( + &method.sig.output, + "WebSocket client method must not return a value", + )); + } + + Ok(()) +} + /// Validate a request-response function signature fn validate_request_response_function(method: &syn::ImplItemFn) -> syn::Result<()> { // Ensure first param is &mut self @@ -544,11 +606,13 @@ fn analyze_methods( ) -> syn::Result<( Option, // init method Option, // ws method + Option, // ws_client method Vec, // metadata for request/response methods bool, // whether init method contains logging init )> { let mut init_method = None; let mut ws_method = None; + let mut ws_client_method = None; let mut has_init_logging = false; let mut function_metadata = Vec::new(); @@ -562,10 +626,11 @@ fn analyze_methods( let has_local = has_attribute(method, "local"); let has_remote = has_attribute(method, "remote"); let has_ws = has_attribute(method, "ws"); + let has_ws_client = has_attribute(method, "ws_client"); // Handle init method if has_init { - if has_http || has_local || has_remote || has_ws { + if has_http || has_local || has_remote || has_ws || has_ws_client { return Err(syn::Error::new_spanned( method, "#[init] cannot be combined with other attributes", @@ -588,7 +653,7 @@ fn analyze_methods( // Handle WebSocket method if has_ws { - if has_http || has_local || has_remote || has_init { + if has_http || has_local || has_remote || has_init || has_ws_client { return Err(syn::Error::new_spanned( method, "#[ws] cannot be combined with other attributes", @@ -605,6 +670,25 @@ fn analyze_methods( continue; } + // Handle WebSocket client method + if has_ws_client { + if has_http || has_local || has_remote || has_init || has_ws { + return Err(syn::Error::new_spanned( + method, + "#[ws_client] cannot be combined with other attributes", + )); + } + validate_websocket_client_method(method)?; + if ws_client_method.is_some() { + return Err(syn::Error::new_spanned( + method, + "Multiple #[ws_client] methods defined", + )); + } + ws_client_method = Some(ident); + continue; + } + // Handle request-response methods if has_http || has_local || has_remote { validate_request_response_function(method)?; @@ -660,7 +744,13 @@ fn analyze_methods( } } - Ok((init_method, ws_method, function_metadata, has_init_logging)) + Ok(( + init_method, + ws_method, + ws_client_method, + function_metadata, + has_init_logging, + )) } /// Extract metadata from a function @@ -1085,6 +1175,26 @@ fn ws_method_opt_to_call(ws_method: &Option) -> proc_macro2::TokenSt } } +/// Convert optional WebSocket client method to token stream for identifier +fn ws_client_method_opt_to_token( + ws_client_method: &Option, +) -> proc_macro2::TokenStream { + if let Some(method_name) = ws_client_method { + quote! { Some(stringify!(#method_name)) } + } else { + quote! { None::<&str> } + } +} + +/// Convert optional WebSocket client method to token stream for method call +fn ws_client_method_opt_to_call(ws_client_method: &Option) -> proc_macro2::TokenStream { + if let Some(method_name) = ws_client_method { + quote! { unsafe { (*state).#method_name(channel_id, request) }; } + } else { + quote! {} + } +} + //------------------------------------------------------------------------------ // HTTP Helper Functions //------------------------------------------------------------------------------ @@ -1420,6 +1530,55 @@ fn generate_http_handler_dispatcher( // WebSocket Helper Functions //------------------------------------------------------------------------------ +/// Generate WebSocket client message handler +fn generate_websocket_client_handler( + ws_client_method_call: &proc_macro2::TokenStream, + self_ty: &Box, +) -> proc_macro2::TokenStream { + quote! { + hyperware_process_lib::logging::debug!("Processing WebSocket client message from: {:?}", message.source()); + + let blob_opt = message.blob(); + + match serde_json::from_slice::(message.body()) { + Ok(request) => { + match request { + hyperware_process_lib::http::client::HttpClientRequest::WebSocketPush { ref channel_id, ref message_type } => { + hyperware_process_lib::logging::debug!("Received WebSocket client push on channel {}, type: {:?}", channel_id, message_type); + + let Some(blob) = blob_opt else { + hyperware_process_lib::logging::error!( + "Failed to get blob for WebSocket client push on channel {}. This indicates a malformed WebSocket message.", + channel_id + ); + return; + }; + + hyperware_process_lib::logging::debug!("Processing WebSocket client message with {} bytes", blob.bytes.len()); + + #ws_client_method_call + + unsafe { + hyperware_process_lib::hyperapp::maybe_save_state(&mut *state); + } + }, + hyperware_process_lib::http::client::HttpClientRequest::WebSocketClose { channel_id } => { + hyperware_process_lib::logging::debug!("WebSocket client connection closed on channel {}", channel_id); + } + } + }, + Err(e) => { + hyperware_process_lib::logging::error!( + "Failed to parse WebSocket client request: {}\n\ + Source: {:?}\n\ + This usually indicates a malformed message from the http-client service.", + e, message.source() + ); + } + } + } +} + /// Generate WebSocket message handler fn generate_websocket_handler( ws_method_call: &proc_macro2::TokenStream, @@ -1536,6 +1695,7 @@ fn generate_message_handlers( self_ty: &Box, handler_arms: &HandlerDispatch, ws_method_call: &proc_macro2::TokenStream, + ws_client_method_call: &proc_macro2::TokenStream, http_handlers: &[&FunctionMetadata], ) -> proc_macro2::TokenStream { let http_request_match_arms = &handler_arms.http; @@ -1547,12 +1707,18 @@ fn generate_message_handlers( let http_dispatcher = generate_http_handler_dispatcher(http_handlers, self_ty, http_request_match_arms); let websocket_handlers = generate_websocket_handler(ws_method_call, self_ty); + let websocket_client_handler = + generate_websocket_client_handler(ws_client_method_call, self_ty); let local_message_handler = generate_local_message_handler(self_ty, local_and_remote_request_match_arms); let remote_message_handler = generate_remote_message_handler(self_ty, remote_request_match_arms); quote! { + /// Handle WebSocket client messages + fn handle_websocket_client_message(state: *mut #self_ty, message: hyperware_process_lib::Message) { + #websocket_client_handler + } /// Handle messages from the HTTP server fn handle_http_server_message(state: *mut #self_ty, message: hyperware_process_lib::Message) { let blob_opt = message.blob(); @@ -1607,6 +1773,7 @@ fn generate_component_impl( response_enum: &proc_macro2::TokenStream, init_method_details: &InitMethodDetails, ws_method_details: &WsMethodDetails, + ws_client_method_details: &WsClientMethodDetails, handler_arms: &HandlerDispatch, has_init_logging: bool, http_handlers: &[&FunctionMetadata], @@ -1641,10 +1808,16 @@ fn generate_component_impl( let init_method_ident = &init_method_details.identifier; let init_method_call = &init_method_details.call; let ws_method_call = &ws_method_details.call; + let ws_client_method_call = &ws_client_method_details.call; // Generate message handler functions - let message_handlers = - generate_message_handlers(self_ty, handler_arms, ws_method_call, http_handlers); + let message_handlers = generate_message_handlers( + self_ty, + handler_arms, + ws_method_call, + ws_client_method_call, + http_handlers, + ); // Generate the logging initialization conditionally let logging_init = if !has_init_logging { @@ -1750,6 +1923,8 @@ fn generate_component_impl( hyperware_process_lib::Message::Request { .. } => { if message.is_local() && message.source().process == "http-server:distro:sys" { handle_http_server_message(&mut state, message); + } else if message.is_local() && message.source().process == "http-client:distro:sys" { + handle_websocket_client_message(&mut state, message); } else if message.is_local() { handle_local_message(&mut state, message); } else { @@ -1804,7 +1979,7 @@ pub fn hyperprocess(attr: TokenStream, item: TokenStream) -> TokenStream { let self_ty = &impl_block.self_ty; // Analyze the methods in the implementation block - let (init_method, ws_method, function_metadata, has_init_logging) = + let (init_method, ws_method, ws_client_method, function_metadata, has_init_logging) = match analyze_methods(&impl_block) { Ok(methods) => methods, Err(e) => return e.to_compile_error().into(), @@ -1861,6 +2036,12 @@ pub fn hyperprocess(attr: TokenStream, item: TokenStream) -> TokenStream { call: ws_method_opt_to_call(&ws_method), }; + // Prepare WebSocket client method details for code generation + let ws_client_method_details = WsClientMethodDetails { + identifier: ws_client_method_opt_to_token(&ws_client_method), + call: ws_client_method_opt_to_call(&ws_client_method), + }; + // Generate the final output generate_component_impl( &args, @@ -1870,6 +2051,7 @@ pub fn hyperprocess(attr: TokenStream, item: TokenStream) -> TokenStream { &response_enum, &init_method_details, &ws_method_details, + &ws_client_method_details, &handler_arms, has_init_logging, &handlers.http, From 1e0aa03f447cabf081301c95a25222bcc0790f17 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Mon, 18 Aug 2025 18:01:03 -0700 Subject: [PATCH 2/8] fix channel_id type --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 31a7548..5b638a1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1555,6 +1555,7 @@ fn generate_websocket_client_handler( }; hyperware_process_lib::logging::debug!("Processing WebSocket client message with {} bytes", blob.bytes.len()); + let channel_id = channel_id.clone(); #ws_client_method_call From fa7f250a76ba564013f51465bfba0208aca0c269 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Tue, 19 Aug 2025 04:49:24 -0700 Subject: [PATCH 3/8] respond to ws Pings with Pongs --- src/lib.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 5b638a1..b626c1b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1546,6 +1546,16 @@ fn generate_websocket_client_handler( hyperware_process_lib::http::client::HttpClientRequest::WebSocketPush { ref channel_id, ref message_type } => { hyperware_process_lib::logging::debug!("Received WebSocket client push on channel {}, type: {:?}", channel_id, message_type); + if message_type == &hyperware_process_lib::http::Server::WsMessageType::Ping { + // Respond to Pings with Pongs + hyperware_process_lib::http::client::send_ws_client_push( + channel_id.clone(), + hyperware_process_lib::http::Server::WsMessageType::Pong, + hyperware_process_lib::LazyLoadBlob::default(), + ); + return; + } + let Some(blob) = blob_opt else { hyperware_process_lib::logging::error!( "Failed to get blob for WebSocket client push on channel {}. This indicates a malformed WebSocket message.", @@ -1589,6 +1599,16 @@ fn generate_websocket_handler( hyperware_process_lib::http::server::HttpServerRequest::WebSocketPush { channel_id, message_type } => { hyperware_process_lib::logging::debug!("Received WebSocket message on channel {}, type: {:?}", channel_id, message_type); + if message_type == hyperware_process_lib::http::Server::WsMessageType::Ping { + // Respond to Pings with Pongs + hyperware_process_lib::http::server::send_ws_client_push( + channel_id, + hyperware_process_lib::http::Server::WsMessageType::Pong, + hyperware_process_lib::LazyLoadBlob::default(), + ); + return; + } + let Some(blob) = blob_opt else { hyperware_process_lib::logging::error!( "Failed to get blob for WebSocketPush on channel {}. This indicates a malformed WebSocket message.", From 3c6c154d8c63b46f857dc2790afa77494384cf2f Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Tue, 19 Aug 2025 04:50:35 -0700 Subject: [PATCH 4/8] fix typo --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index b626c1b..5a37dff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1601,7 +1601,7 @@ fn generate_websocket_handler( if message_type == hyperware_process_lib::http::Server::WsMessageType::Ping { // Respond to Pings with Pongs - hyperware_process_lib::http::server::send_ws_client_push( + hyperware_process_lib::http::server::send_ws_push( channel_id, hyperware_process_lib::http::Server::WsMessageType::Pong, hyperware_process_lib::LazyLoadBlob::default(), From 263fd18fa60f2cc77998559e4783a7b56e8e2cd3 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Tue, 19 Aug 2025 04:51:40 -0700 Subject: [PATCH 5/8] fix more typos --- src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5a37dff..e73fc56 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1546,11 +1546,11 @@ fn generate_websocket_client_handler( hyperware_process_lib::http::client::HttpClientRequest::WebSocketPush { ref channel_id, ref message_type } => { hyperware_process_lib::logging::debug!("Received WebSocket client push on channel {}, type: {:?}", channel_id, message_type); - if message_type == &hyperware_process_lib::http::Server::WsMessageType::Ping { + if message_type == &hyperware_process_lib::http::server::WsMessageType::Ping { // Respond to Pings with Pongs hyperware_process_lib::http::client::send_ws_client_push( channel_id.clone(), - hyperware_process_lib::http::Server::WsMessageType::Pong, + hyperware_process_lib::http::server::WsMessageType::Pong, hyperware_process_lib::LazyLoadBlob::default(), ); return; @@ -1599,11 +1599,11 @@ fn generate_websocket_handler( hyperware_process_lib::http::server::HttpServerRequest::WebSocketPush { channel_id, message_type } => { hyperware_process_lib::logging::debug!("Received WebSocket message on channel {}, type: {:?}", channel_id, message_type); - if message_type == hyperware_process_lib::http::Server::WsMessageType::Ping { + if message_type == hyperware_process_lib::http::server::WsMessageType::Ping { // Respond to Pings with Pongs hyperware_process_lib::http::server::send_ws_push( channel_id, - hyperware_process_lib::http::Server::WsMessageType::Pong, + hyperware_process_lib::http::server::WsMessageType::Pong, hyperware_process_lib::LazyLoadBlob::default(), ); return; From 1fc980ffe155bdce4798cbf6aeb7ce1677e6ba39 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Tue, 19 Aug 2025 04:55:10 -0700 Subject: [PATCH 6/8] dont overwrite the ws blob --- src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e73fc56..fbb197b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1536,8 +1536,6 @@ fn generate_websocket_client_handler( self_ty: &Box, ) -> proc_macro2::TokenStream { quote! { - hyperware_process_lib::logging::debug!("Processing WebSocket client message from: {:?}", message.source()); - let blob_opt = message.blob(); match serde_json::from_slice::(message.body()) { From 19a2f48bbf0ed02d278f08678d318215521f5933 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Tue, 19 Aug 2025 05:10:54 -0700 Subject: [PATCH 7/8] make `#[ws_client]` have same fn signature as `#[ws]` --- README.md | 16 ++++++++++----- src/lib.rs | 58 ++++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index f1f1058..177787c 100644 --- a/README.md +++ b/README.md @@ -407,21 +407,27 @@ For handling WebSocket client connections (when your process acts as a WebSocket ```rust #[ws_client] -fn handle_ws_client(&mut self, channel_id: u32, request: HttpClientRequest) { - match request { - HttpClientRequest::WebSocketPush { channel_id, message_type } => { +fn handle_ws_client(&mut self, channel_id: u32, message_type: WsMessageType, blob: LazyLoadBlob) { + match message_type { + WsMessageType::Text | WsMessageType::Binary => { // Handle incoming message from the WebSocket server - let blob = get_blob().unwrap(); + // The blob contains the message data + let data = String::from_utf8_lossy(&blob.bytes); // Process the message... }, - HttpClientRequest::WebSocketClose { channel_id } => { + WsMessageType::Close => { // Handle connection close + // blob will be empty for close messages + }, + _ => { + // Handle other message types (Ping/Pong are handled automatically) } } } ``` This handler receives messages from WebSocket servers that your process has connected to using the `http-client:distro:sys` service. +The signature matches that of `#[ws]` for consistency. ### Binding Endpoints diff --git a/src/lib.rs b/src/lib.rs index fbb197b..ccf41ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -535,20 +535,21 @@ fn validate_websocket_client_method(method: &syn::ImplItemFn) -> syn::Result<()> )); } - // Ensure there are exactly 3 parameters (including &mut self) - if method.sig.inputs.len() != 3 { + // Ensure there are exactly 4 parameters (including &mut self) + if method.sig.inputs.len() != 4 { return Err(syn::Error::new_spanned( &method.sig, - "WebSocket client method must take exactly 2 additional parameters: channel_id and request", + "WebSocket client method must take exactly 3 additional parameters: channel_id, message_type, and blob", )); } // Get parameters (excluding &mut self) let params: Vec<_> = method.sig.inputs.iter().skip(1).collect(); - // Check parameter types + // Check parameter types (we're not doing exact type checking, just rough check) let channel_id_param = ¶ms[0]; - let request_param = ¶ms[1]; + let message_type_param = ¶ms[1]; + let blob_param = ¶ms[2]; if let syn::FnArg::Typed(pat_type) = channel_id_param { if !pat_type.ty.to_token_stream().to_string().contains("u32") { @@ -559,12 +560,26 @@ fn validate_websocket_client_method(method: &syn::ImplItemFn) -> syn::Result<()> } } - if let syn::FnArg::Typed(pat_type) = request_param { + if let syn::FnArg::Typed(pat_type) = message_type_param { let type_str = pat_type.ty.to_token_stream().to_string(); - if !type_str.contains("HttpClientRequest") { + if !type_str.contains("WsMessageType") && !type_str.contains("MessageType") { + return Err(syn::Error::new_spanned( + pat_type, + "Second parameter of WebSocket client method must be message_type: WsMessageType", + )); + } + } + + if let syn::FnArg::Typed(pat_type) = blob_param { + if !pat_type + .ty + .to_token_stream() + .to_string() + .contains("LazyLoadBlob") + { return Err(syn::Error::new_spanned( pat_type, - "Second parameter of WebSocket client method must be request: HttpClientRequest", + "Third parameter of WebSocket client method must be blob: LazyLoadBlob", )); } } @@ -1189,7 +1204,7 @@ fn ws_client_method_opt_to_token( /// Convert optional WebSocket client method to token stream for method call fn ws_client_method_opt_to_call(ws_client_method: &Option) -> proc_macro2::TokenStream { if let Some(method_name) = ws_client_method { - quote! { unsafe { (*state).#method_name(channel_id, request) }; } + quote! { unsafe { (*state).#method_name(channel_id, message_type, blob) }; } } else { quote! {} } @@ -1541,13 +1556,17 @@ fn generate_websocket_client_handler( match serde_json::from_slice::(message.body()) { Ok(request) => { match request { - hyperware_process_lib::http::client::HttpClientRequest::WebSocketPush { ref channel_id, ref message_type } => { + hyperware_process_lib::http::client::HttpClientRequest::WebSocketPush { channel_id, message_type } => { hyperware_process_lib::logging::debug!("Received WebSocket client push on channel {}, type: {:?}", channel_id, message_type); - if message_type == &hyperware_process_lib::http::server::WsMessageType::Ping { + if message_type == hyperware_process_lib::http::server::WsMessageType::Pong { + return; + } + + if message_type == hyperware_process_lib::http::server::WsMessageType::Ping { // Respond to Pings with Pongs hyperware_process_lib::http::client::send_ws_client_push( - channel_id.clone(), + channel_id, hyperware_process_lib::http::server::WsMessageType::Pong, hyperware_process_lib::LazyLoadBlob::default(), ); @@ -1563,7 +1582,6 @@ fn generate_websocket_client_handler( }; hyperware_process_lib::logging::debug!("Processing WebSocket client message with {} bytes", blob.bytes.len()); - let channel_id = channel_id.clone(); #ws_client_method_call @@ -1573,6 +1591,16 @@ fn generate_websocket_client_handler( }, hyperware_process_lib::http::client::HttpClientRequest::WebSocketClose { channel_id } => { hyperware_process_lib::logging::debug!("WebSocket client connection closed on channel {}", channel_id); + + // Call the handler with a special Close message type and empty blob + let message_type = hyperware_process_lib::http::server::WsMessageType::Close; + let blob = hyperware_process_lib::LazyLoadBlob::default(); + + #ws_client_method_call + + unsafe { + hyperware_process_lib::hyperapp::maybe_save_state(&mut *state); + } } } }, @@ -1597,6 +1625,10 @@ fn generate_websocket_handler( hyperware_process_lib::http::server::HttpServerRequest::WebSocketPush { channel_id, message_type } => { hyperware_process_lib::logging::debug!("Received WebSocket message on channel {}, type: {:?}", channel_id, message_type); + if message_type == hyperware_process_lib::http::server::WsMessageType::Pong { + return; + } + if message_type == hyperware_process_lib::http::server::WsMessageType::Ping { // Respond to Pings with Pongs hyperware_process_lib::http::server::send_ws_push( From ed99c19c1e4f511786b2c827fb909ba3d3950238 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Tue, 19 Aug 2025 14:06:46 -0700 Subject: [PATCH 8/8] allow async #[ws] --- README.md | 24 ++++++++++++++-- src/lib.rs | 84 ++++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 86 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 177787c..fee7b64 100644 --- a/README.md +++ b/README.md @@ -392,20 +392,30 @@ async fn initialize(&mut self) { For defining a `ws` endpoint (server-side WebSocket), do: ```rust +// Synchronous WebSocket handler #[ws] fn handle_websocket(&mut self, channel_id: u32, message_type: WsMessageType, blob: LazyLoadBlob) { // Process WebSocket messages from connected clients } + +// Asynchronous WebSocket handler +#[ws] +async fn handle_websocket_async(&mut self, channel_id: u32, message_type: WsMessageType, blob: LazyLoadBlob) { + // Process WebSocket messages asynchronously + // Can make async calls to other services + let result = some_async_operation().await; +} ``` -if you have multiple ws endpoints, you can match on the ws endpoints with `get_path()`, which will give you an `Option`. -if you want to access the http server, you can call `get_server()`, giving you access to `HttpServer`. +Both sync and async variants are supported. If you have multiple ws endpoints, you can match on the ws endpoints with `get_path()`, which will give you an `Option`. +If you want to access the http server, you can call `get_server()`, giving you access to `HttpServer`. #### WebSocket Client Handler For handling WebSocket client connections (when your process acts as a WebSocket client), use: ```rust +// Synchronous WebSocket client handler #[ws_client] fn handle_ws_client(&mut self, channel_id: u32, message_type: WsMessageType, blob: LazyLoadBlob) { match message_type { @@ -424,9 +434,17 @@ fn handle_ws_client(&mut self, channel_id: u32, message_type: WsMessageType, blo } } } + +// Asynchronous WebSocket client handler +#[ws_client] +async fn handle_ws_client_async(&mut self, channel_id: u32, message_type: WsMessageType, blob: LazyLoadBlob) { + // Process WebSocket client messages asynchronously + let processed_data = async_process_message(&blob).await; + // Send response back if needed... +} ``` -This handler receives messages from WebSocket servers that your process has connected to using the `http-client:distro:sys` service. +Both sync and async variants are supported. This handler receives messages from WebSocket servers that your process has connected to using the `http-client:distro:sys` service. The signature matches that of `#[ws]` for consistency. ### Binding Endpoints diff --git a/src/lib.rs b/src/lib.rs index ccf41ff..427face 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -114,6 +114,20 @@ struct InitMethodDetails { call: proc_macro2::TokenStream, } +/// WebSocket method info from analysis +#[derive(Clone)] +struct WsMethodInfo { + name: syn::Ident, + is_async: bool, +} + +/// WebSocket client method info from analysis +#[derive(Clone)] +struct WsClientMethodInfo { + name: syn::Ident, + is_async: bool, +} + /// WebSocket method details for code generation struct WsMethodDetails { identifier: proc_macro2::TokenStream, @@ -619,11 +633,11 @@ fn validate_request_response_function(method: &syn::ImplItemFn) -> syn::Result<( fn analyze_methods( impl_block: &ItemImpl, ) -> syn::Result<( - Option, // init method - Option, // ws method - Option, // ws_client method - Vec, // metadata for request/response methods - bool, // whether init method contains logging init + Option, // init method + Option, // ws method + Option, // ws_client method + Vec, // metadata for request/response methods + bool, // whether init method contains logging init )> { let mut init_method = None; let mut ws_method = None; @@ -681,7 +695,10 @@ fn analyze_methods( "Multiple #[ws] methods defined", )); } - ws_method = Some(ident); + ws_method = Some(WsMethodInfo { + name: ident, + is_async: method.sig.asyncness.is_some(), + }); continue; } @@ -700,7 +717,10 @@ fn analyze_methods( "Multiple #[ws_client] methods defined", )); } - ws_client_method = Some(ident); + ws_client_method = Some(WsClientMethodInfo { + name: ident, + is_async: method.sig.asyncness.is_some(), + }); continue; } @@ -1173,8 +1193,9 @@ fn init_method_opt_to_call( } /// Convert optional WebSocket method to token stream for identifier -fn ws_method_opt_to_token(ws_method: &Option) -> proc_macro2::TokenStream { - if let Some(method_name) = ws_method { +fn ws_method_opt_to_token(ws_method: &Option) -> proc_macro2::TokenStream { + if let Some(method_info) = ws_method { + let method_name = &method_info.name; quote! { Some(stringify!(#method_name)) } } else { quote! { None::<&str> } @@ -1182,9 +1203,21 @@ fn ws_method_opt_to_token(ws_method: &Option) -> proc_macro2::TokenS } /// Convert optional WebSocket method to token stream for method call -fn ws_method_opt_to_call(ws_method: &Option) -> proc_macro2::TokenStream { - if let Some(method_name) = ws_method { - quote! { unsafe { (*state).#method_name(channel_id, message_type, blob) }; } +fn ws_method_opt_to_call(ws_method: &Option, self_ty: &Box) -> proc_macro2::TokenStream { + if let Some(method_info) = ws_method { + let method_name = &method_info.name; + if method_info.is_async { + quote! { + // Create a raw pointer to state for use in the async block + let state_ptr: *mut #self_ty = state; + hyperware_process_lib::hyperapp::run_async! { + // Inside the async block, use the pointer to access state + unsafe { (*state_ptr).#method_name(channel_id, message_type, blob).await }; + } + } + } else { + quote! { unsafe { (*state).#method_name(channel_id, message_type, blob) }; } + } } else { quote! {} } @@ -1192,9 +1225,10 @@ fn ws_method_opt_to_call(ws_method: &Option) -> proc_macro2::TokenSt /// Convert optional WebSocket client method to token stream for identifier fn ws_client_method_opt_to_token( - ws_client_method: &Option, + ws_client_method: &Option, ) -> proc_macro2::TokenStream { - if let Some(method_name) = ws_client_method { + if let Some(method_info) = ws_client_method { + let method_name = &method_info.name; quote! { Some(stringify!(#method_name)) } } else { quote! { None::<&str> } @@ -1202,9 +1236,21 @@ fn ws_client_method_opt_to_token( } /// Convert optional WebSocket client method to token stream for method call -fn ws_client_method_opt_to_call(ws_client_method: &Option) -> proc_macro2::TokenStream { - if let Some(method_name) = ws_client_method { - quote! { unsafe { (*state).#method_name(channel_id, message_type, blob) }; } +fn ws_client_method_opt_to_call(ws_client_method: &Option, self_ty: &Box) -> proc_macro2::TokenStream { + if let Some(method_info) = ws_client_method { + let method_name = &method_info.name; + if method_info.is_async { + quote! { + // Create a raw pointer to state for use in the async block + let state_ptr: *mut #self_ty = state; + hyperware_process_lib::hyperapp::run_async! { + // Inside the async block, use the pointer to access state + unsafe { (*state_ptr).#method_name(channel_id, message_type, blob).await }; + } + } + } else { + quote! { unsafe { (*state).#method_name(channel_id, message_type, blob) }; } + } } else { quote! {} } @@ -2084,13 +2130,13 @@ pub fn hyperprocess(attr: TokenStream, item: TokenStream) -> TokenStream { // Prepare WebSocket method details for code generation let ws_method_details = WsMethodDetails { identifier: ws_method_opt_to_token(&ws_method), - call: ws_method_opt_to_call(&ws_method), + call: ws_method_opt_to_call(&ws_method, self_ty), }; // Prepare WebSocket client method details for code generation let ws_client_method_details = WsClientMethodDetails { identifier: ws_client_method_opt_to_token(&ws_client_method), - call: ws_client_method_opt_to_call(&ws_client_method), + call: ws_client_method_opt_to_call(&ws_client_method, self_ty), }; // Generate the final output