From 9c6455416cca7494e922b414d5fd0377a0d24dfd Mon Sep 17 00:00:00 2001 From: Gohlub <62673775+Gohlub@users.noreply.github.com> Date: Wed, 3 Sep 2025 15:18:05 -0400 Subject: [PATCH 1/6] added eth handler --- src/lib.rs | 113 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 107 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9541edb..bf7fd26 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,6 +46,7 @@ struct FunctionMetadata { is_local: bool, // Has #[local] attribute is_remote: bool, // Has #[remote] attribute is_http: bool, // Has #[http] attribute + is_eth: bool, // Has #[eth] attribute http_methods: Vec, // HTTP methods this handler accepts (GET, POST, etc.) http_path: Option, // Specific path this handler is bound to (optional) } @@ -56,6 +57,7 @@ enum HandlerType { Local, Remote, Http, + Eth, } /// Grouped handlers by type @@ -63,6 +65,7 @@ struct HandlerGroups<'a> { local: Vec<&'a FunctionMetadata>, remote: Vec<&'a FunctionMetadata>, http: Vec<&'a FunctionMetadata>, + eth: Vec<&'a FunctionMetadata>, // New group for combined handlers (used for local messages that can also use remote handlers) local_and_remote: Vec<&'a FunctionMetadata>, } @@ -78,6 +81,9 @@ impl<'a> HandlerGroups<'a> { // Collect HTTP handlers let http: Vec<_> = metadata.iter().filter(|f| f.is_http).collect(); + // Collect ETH handler (there can only be one, but kept this for consistency) + let eth: Vec<_> = metadata.iter().filter(|f| f.is_eth).collect(); + // Create a combined list of local and remote handlers for local messages // We first include all local handlers, then add remote handlers that aren't already covered let mut local_and_remote = local.clone(); @@ -95,6 +101,7 @@ impl<'a> HandlerGroups<'a> { local, remote, http, + eth, local_and_remote, } } @@ -105,6 +112,7 @@ struct HandlerDispatch { local: proc_macro2::TokenStream, remote: proc_macro2::TokenStream, http: proc_macro2::TokenStream, + eth: proc_macro2::TokenStream, local_and_remote: proc_macro2::TokenStream, } @@ -128,6 +136,13 @@ struct WsClientMethodInfo { is_async: bool, } +/// ETH method info from analysis +#[derive(Clone)] +struct EthMethodInfo { + name: syn::Ident, + is_async: bool, +} + /// WebSocket method details for code generation struct WsMethodDetails { identifier: proc_macro2::TokenStream, @@ -353,6 +368,7 @@ fn clean_impl_block(impl_block: &ItemImpl) -> ItemImpl { && !attr.path().is_ident("http") && !attr.path().is_ident("local") && !attr.path().is_ident("remote") + && !attr.path().is_ident("eth") && !attr.path().is_ident("ws") && !attr.path().is_ident("ws_client") }); @@ -636,12 +652,14 @@ fn analyze_methods( Option, // init method Option, // ws method Option, // ws_client method + Option, // eth method Vec, // metadata for request/response methods bool, // whether init method contains logging init )> { let mut init_method = None; let mut ws_method = None; let mut ws_client_method = None; + let mut eth_method = None; let mut has_init_logging = false; let mut function_metadata = Vec::new(); @@ -654,12 +672,13 @@ fn analyze_methods( let has_http = has_attribute(method, "http"); let has_local = has_attribute(method, "local"); let has_remote = has_attribute(method, "remote"); + let has_eth = has_attribute(method, "eth"); let has_ws = has_attribute(method, "ws"); let has_ws_client = has_attribute(method, "ws_client"); // Handle init method if has_init { - if has_http || has_local || has_remote || has_ws || has_ws_client { + if has_http || has_local || has_remote || has_eth || has_ws || has_ws_client { return Err(syn::Error::new_spanned( method, "#[init] cannot be combined with other attributes", @@ -682,7 +701,7 @@ fn analyze_methods( // Handle WebSocket method if has_ws { - if has_http || has_local || has_remote || has_init || has_ws_client { + if has_http || has_local || has_remote || has_eth || has_init || has_ws_client { return Err(syn::Error::new_spanned( method, "#[ws] cannot be combined with other attributes", @@ -704,7 +723,7 @@ fn analyze_methods( // Handle WebSocket client method if has_ws_client { - if has_http || has_local || has_remote || has_init || has_ws { + if has_http || has_local || has_remote || has_eth || has_init || has_ws { return Err(syn::Error::new_spanned( method, "#[ws_client] cannot be combined with other attributes", @@ -724,10 +743,32 @@ fn analyze_methods( continue; } + // Handle ETH method + if has_eth { + if has_http || has_local || has_remote || has_init || has_ws || has_ws_client { + return Err(syn::Error::new_spanned( + method, + "#[eth] cannot be combined with other attributes", + )); + } + validate_request_response_function(method)?; + if eth_method.is_some() { + return Err(syn::Error::new_spanned( + method, + "Multiple #[eth] methods defined", + )); + } + eth_method = Some(EthMethodInfo { + name: ident.clone(), + is_async: method.sig.asyncness.is_some(), + }); + // Continue with regular processing for function metadata + } + // Handle request-response methods - if has_http || has_local || has_remote { + if has_http || has_local || has_remote || has_eth { validate_request_response_function(method)?; - let metadata = extract_function_metadata(method, has_local, has_remote, has_http); + let metadata = extract_function_metadata(method, has_local, has_remote, has_http, has_eth); // Parameter-less HTTP handlers can optionally specify a path, but it's not required // They can use get_path() and get_method() to handle requests dynamically @@ -783,6 +824,7 @@ fn analyze_methods( init_method, ws_method, ws_client_method, + eth_method, function_metadata, has_init_logging, )) @@ -794,6 +836,7 @@ fn extract_function_metadata( is_local: bool, is_remote: bool, is_http: bool, + is_eth: bool, ) -> FunctionMetadata { let ident = method.sig.ident.clone(); @@ -837,6 +880,7 @@ fn extract_function_metadata( is_local, is_remote, is_http, + is_eth, http_methods, http_path, } @@ -967,6 +1011,7 @@ fn generate_handler_dispatch( HandlerType::Local => "No local handlers defined but received a local request", HandlerType::Remote => "No remote handlers defined but received a remote request", HandlerType::Http => "No HTTP handlers defined but received an HTTP request", + HandlerType::Eth => "No ETH handlers defined but received an ETH request", }; return quote! { hyperware_process_lib::logging::warn!(#message); @@ -977,6 +1022,7 @@ fn generate_handler_dispatch( HandlerType::Local => "local", HandlerType::Remote => "remote", HandlerType::Http => "http", + HandlerType::Eth => "eth", }; let dispatch_arms = handlers @@ -1033,6 +1079,14 @@ fn generate_response_handling( resp.send().unwrap(); } } + HandlerType::Eth => { + quote! { + // Instead of wrapping in HPMResponse enum, directly serialize the result + let resp = hyperware_process_lib::Response::new() + .body(serde_json::to_vec(&result).unwrap()); + resp.send().unwrap(); + } + } HandlerType::Http => { quote! { // Instead of wrapping in HPMResponse enum, directly serialize the result @@ -1800,10 +1854,12 @@ fn generate_message_handlers( ws_method_call: &proc_macro2::TokenStream, ws_client_method_call: &proc_macro2::TokenStream, http_handlers: &[&FunctionMetadata], + eth_handlers: &[&FunctionMetadata], ) -> proc_macro2::TokenStream { let http_request_match_arms = &handler_arms.http; let local_and_remote_request_match_arms = &handler_arms.local_and_remote; let remote_request_match_arms = &handler_arms.remote; + let eth_request_match_arms = &handler_arms.eth; let http_context_setup = generate_http_context_setup(); let http_request_parsing = generate_http_request_parsing(); @@ -1816,6 +1872,8 @@ fn generate_message_handlers( generate_local_message_handler(self_ty, local_and_remote_request_match_arms); let remote_message_handler = generate_remote_message_handler(self_ty, remote_request_match_arms); + let eth_message_handler = + generate_eth_message_handler(self_ty, eth_request_match_arms); quote! { /// Handle WebSocket client messages @@ -1854,6 +1912,40 @@ fn generate_message_handlers( #local_message_handler #remote_message_handler + #eth_message_handler + } +} + +/// Generate ETH message handler +fn generate_eth_message_handler( + self_ty: &Box, + match_arms: &proc_macro2::TokenStream, +) -> proc_macro2::TokenStream { + quote! { + /// Handle ETH messages + fn handle_eth_message(state: *mut #self_ty, message: hyperware_process_lib::Message) { + hyperware_process_lib::logging::debug!("Processing ETH message from: {:?}", message.source()); + match serde_json::from_slice::(message.body()) { + Ok(request) => { + unsafe { + #match_arms + hyperware_process_lib::hyperapp::maybe_save_state(&mut *state); + } + }, + Err(e) => { + let raw_body = String::from_utf8_lossy(message.body()); + hyperware_process_lib::logging::error!( + "Failed to deserialize ETH request into HPMRequest enum.\n\ + Error: {}\n\ + Source: {:?}\n\ + Body: {}\n\ + \n\ + 💡 This usually means the message format doesn't match any of your #[eth] handlers.", + e, message.source(), raw_body + ); + } + } + } } } @@ -1880,6 +1972,7 @@ fn generate_component_impl( handler_arms: &HandlerDispatch, has_init_logging: bool, http_handlers: &[&FunctionMetadata], + eth_handlers: &[&FunctionMetadata], ) -> proc_macro2::TokenStream { // Extract values from args for use in the quote macro let name = &args.name; @@ -1920,6 +2013,7 @@ fn generate_component_impl( ws_method_call, ws_client_method_call, http_handlers, + eth_handlers, ); // Generate the logging initialization conditionally @@ -2028,6 +2122,8 @@ fn generate_component_impl( handle_http_server_message(&mut state, message); } else if message.is_local() && message.source().process == "http-client:distro:sys" { handle_websocket_client_message(&mut state, message); + } else if message.is_local() && message.source().process == "eth:distro:sys" { + handle_eth_message(&mut state, message); } else if message.is_local() { handle_local_message(&mut state, message); } else { @@ -2082,7 +2178,7 @@ pub fn hyperprocess(attr: TokenStream, item: TokenStream) -> TokenStream { let self_ty = &impl_block.self_ty; // Analyze the methods in the implementation block - let (init_method, ws_method, ws_client_method, function_metadata, has_init_logging) = + let (init_method, ws_method, ws_client_method, eth_method, function_metadata, has_init_logging) = match analyze_methods(&impl_block) { Ok(methods) => methods, Err(e) => return e.to_compile_error().into(), @@ -2113,6 +2209,7 @@ pub fn hyperprocess(attr: TokenStream, item: TokenStream) -> TokenStream { self_ty, HandlerType::Local, ), + eth: generate_handler_dispatch(&handlers.eth, self_ty, HandlerType::Eth), }; // Clean the implementation block @@ -2136,6 +2233,9 @@ pub fn hyperprocess(attr: TokenStream, item: TokenStream) -> TokenStream { call: ws_client_method_opt_to_call(&ws_client_method, self_ty), }; + // Note: ETH handlers don't need special method details like WebSocket handlers + // They work through the standard request-response dispatch system + // Generate the final output generate_component_impl( &args, @@ -2149,6 +2249,7 @@ pub fn hyperprocess(attr: TokenStream, item: TokenStream) -> TokenStream { &handler_arms, has_init_logging, &handlers.http, + &handlers.eth, ) .into() } From 0fa4de105e8e89aa7723199f6309fed8bdcf800e Mon Sep 17 00:00:00 2001 From: Gohlub <62673775+Gohlub@users.noreply.github.com> Date: Wed, 3 Sep 2025 15:23:11 -0400 Subject: [PATCH 2/6] removed extra comment --- src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bf7fd26..cb45d48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2233,8 +2233,7 @@ pub fn hyperprocess(attr: TokenStream, item: TokenStream) -> TokenStream { call: ws_client_method_opt_to_call(&ws_client_method, self_ty), }; - // Note: ETH handlers don't need special method details like WebSocket handlers - // They work through the standard request-response dispatch system + // Generate the final output generate_component_impl( From 246ebeaa3498ce8aec34739fa7a9120070ae1f16 Mon Sep 17 00:00:00 2001 From: Gohlub <62673775+Gohlub@users.noreply.github.com> Date: Wed, 3 Sep 2025 17:04:19 -0400 Subject: [PATCH 3/6] stricter eth handler signature --- README.md | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- src/lib.rs | 48 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 102 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index fee7b64..429a006 100644 --- a/README.md +++ b/README.md @@ -135,13 +135,14 @@ Example: ### Handler Types -Hyperware processes can handle three types of requests, specified by attributes: +Hyperware processes can handle four types of requests, specified by attributes: | Attribute | Description | |-----------|-------------| | `#[local]` | Handles local (same-node) requests | | `#[remote]` | Handles remote (cross-node) requests | | `#[http]` | Handles ALL HTTP requests (GET, POST, PUT, DELETE, etc.) | +| `#[eth]` | Handles Ethereum subscription updates from your RPC provider | These attributes can be combined to make a handler respond to multiple request types: @@ -366,7 +367,9 @@ impl MyApp { - All handler names must be unique when converted to CamelCase (e.g., `get_user` and `get_user` conflict) - Init methods must be async and take only `&mut self` - WebSocket methods must have exactly 3 parameters: `channel_id: u32`, `message_type: WsMessageType`, `blob: LazyLoadBlob` -- At least one handler must be defined (`#[http]`, `#[local]`, or `#[remote]`) +- ETH handlers must take exactly 1 parameter: `eth_sub_result: EthSubResult` +- Only one ETH handler is allowed per hyperprocess +- At least one handler must be defined (`#[http]`, `#[local]`, `#[remote]`, or `#[eth]`) - The macro provides comprehensive error messages with debugging tips for all validation failures **Current Limitations**: @@ -447,6 +450,57 @@ async fn handle_ws_client_async(&mut self, channel_id: u32, message_type: WsMess Both sync and async variants are supported. This handler receives messages from WebSocket servers that your process has connected to using the `http-client:distro:sys` service. The signature matches that of `#[ws]` for consistency. +#### ETH Handler + +For handling Ethereum subscription updates from the `eth:distro:sys` service, use: + +```rust +// Synchronous ETH handler +#[eth] +fn handle_eth(&mut self, eth_sub_result: EthSubResult) -> String { + match eth_sub_result { + Ok(eth_sub) => { + // Handle successful subscription update + println!("Got ETH subscription update: id={}, result={:?}", + eth_sub.id, eth_sub.result); + "Subscription update processed".to_string() + } + Err(eth_sub_error) => { + // Handle subscription error + println!("ETH subscription error: id={}, error={}", + eth_sub_error.id, eth_sub_error.error); + "Subscription error handled".to_string() + } + } +} + +// Asynchronous ETH handler +#[eth] +async fn handle_eth_async(&mut self, eth_sub_result: EthSubResult) -> String { + match eth_sub_result { + Ok(eth_sub) => { + // Process subscription update asynchronously + let processed = self.process_eth_event(ð_sub).await; + format!("Processed ETH event: {}", processed) + } + Err(eth_sub_error) => { + // Handle error asynchronously + self.log_eth_error(ð_sub_error).await; + "Error logged".to_string() + } + } +} +``` + +**Important Notes:** +- Only **one** ETH handler is allowed per hyperprocess +- The handler **must** take exactly one parameter: `eth_sub_result: EthSubResult` +- Both sync and async variants are supported +- The handler receives subscription updates and errors from the ETH module +- `EthSubResult` is a `Result` type where: + - `EthSub` contains subscription updates with `id: u64` and `result: serde_json::Value` + - `EthSubError` contains subscription errors with `id: u64` and `error: String` + ### Binding Endpoints The `endpoints` parameter configures HTTP and WebSocket endpoints: diff --git a/src/lib.rs b/src/lib.rs index cb45d48..ae1cdbd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -641,6 +641,47 @@ fn validate_request_response_function(method: &syn::ImplItemFn) -> syn::Result<( Ok(()) } +/// Validate the ETH handler signature +fn validate_eth_handler(method: &syn::ImplItemFn) -> syn::Result<()> { + // Ensure first param is &mut self + if !has_valid_self_receiver(method) { + return Err(syn::Error::new_spanned( + &method.sig, + "ETH handler must take &mut self as first parameter", + )); + } + + // Ensure there are exactly 2 parameters (&mut self + eth_sub_result) + if method.sig.inputs.len() != 2 { + return Err(syn::Error::new_spanned( + &method.sig, + "ETH handler must take exactly one parameter: eth_sub_result: EthSubResult", + )); + } + + // Get the second parameter (the eth_sub_result parameter) + let params: Vec<_> = method.sig.inputs.iter().skip(1).collect(); + let eth_param = ¶ms[0]; + + if let syn::FnArg::Typed(pat_type) = eth_param { + let type_str = pat_type.ty.to_token_stream().to_string(); + if !type_str.contains("EthSubResult") { + return Err(syn::Error::new_spanned( + pat_type, + "ETH handler parameter must be eth_sub_result: EthSubResult", + )); + } + } else { + return Err(syn::Error::new_spanned( + eth_param, + "ETH handler parameter must be eth_sub_result: EthSubResult", + )); + } + + // Any return type is allowed + Ok(()) +} + //------------------------------------------------------------------------------ // Method Analysis Functions //------------------------------------------------------------------------------ @@ -751,7 +792,7 @@ fn analyze_methods( "#[eth] cannot be combined with other attributes", )); } - validate_request_response_function(method)?; + validate_eth_handler(method)?; if eth_method.is_some() { return Err(syn::Error::new_spanned( method, @@ -767,7 +808,10 @@ fn analyze_methods( // Handle request-response methods if has_http || has_local || has_remote || has_eth { - validate_request_response_function(method)?; + // ETH handlers are already validated above, skip validation for them + if !has_eth { + validate_request_response_function(method)?; + } let metadata = extract_function_metadata(method, has_local, has_remote, has_http, has_eth); // Parameter-less HTTP handlers can optionally specify a path, but it's not required From d23a48f69ec5769c66a2c9d8d11b122575553534 Mon Sep 17 00:00:00 2001 From: Gohlub <62673775+Gohlub@users.noreply.github.com> Date: Wed, 3 Sep 2025 17:26:54 -0400 Subject: [PATCH 4/6] changed logic to model more constrained handlers like ws --- src/lib.rs | 92 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 65 insertions(+), 27 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ae1cdbd..166528e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,7 +65,6 @@ struct HandlerGroups<'a> { local: Vec<&'a FunctionMetadata>, remote: Vec<&'a FunctionMetadata>, http: Vec<&'a FunctionMetadata>, - eth: Vec<&'a FunctionMetadata>, // New group for combined handlers (used for local messages that can also use remote handlers) local_and_remote: Vec<&'a FunctionMetadata>, } @@ -81,8 +80,6 @@ impl<'a> HandlerGroups<'a> { // Collect HTTP handlers let http: Vec<_> = metadata.iter().filter(|f| f.is_http).collect(); - // Collect ETH handler (there can only be one, but kept this for consistency) - let eth: Vec<_> = metadata.iter().filter(|f| f.is_eth).collect(); // Create a combined list of local and remote handlers for local messages // We first include all local handlers, then add remote handlers that aren't already covered @@ -101,7 +98,6 @@ impl<'a> HandlerGroups<'a> { local, remote, http, - eth, local_and_remote, } } @@ -112,7 +108,6 @@ struct HandlerDispatch { local: proc_macro2::TokenStream, remote: proc_macro2::TokenStream, http: proc_macro2::TokenStream, - eth: proc_macro2::TokenStream, local_and_remote: proc_macro2::TokenStream, } @@ -155,6 +150,12 @@ struct WsClientMethodDetails { call: proc_macro2::TokenStream, } +/// ETH method details for code generation +struct EthMethodDetails { + identifier: proc_macro2::TokenStream, + call: proc_macro2::TokenStream, +} + //------------------------------------------------------------------------------ // Parse Implementation //------------------------------------------------------------------------------ @@ -806,13 +807,10 @@ fn analyze_methods( // Continue with regular processing for function metadata } - // Handle request-response methods - if has_http || has_local || has_remote || has_eth { - // ETH handlers are already validated above, skip validation for them - if !has_eth { - validate_request_response_function(method)?; - } - let metadata = extract_function_metadata(method, has_local, has_remote, has_http, has_eth); + // Handle request-response methods (local, remote, http - NOT eth) + if has_http || has_local || has_remote { + validate_request_response_function(method)?; + let metadata = extract_function_metadata(method, has_local, has_remote, has_http, false); // Parameter-less HTTP handlers can optionally specify a path, but it's not required // They can use get_path() and get_method() to handle requests dynamically @@ -826,7 +824,7 @@ fn analyze_methods( if function_metadata.is_empty() { return Err(syn::Error::new( proc_macro2::Span::call_site(), - "You must specify at least one handler with #[remote], #[local], or #[http] attribute. Without any handlers, this hyperprocess wouldn't respond to any requests.", + "You must specify at least one handler with #[remote], #[local] or #[http] attribute. Without any handlers, this hyperprocess wouldn't respond to any requests.", )); } @@ -1360,6 +1358,40 @@ fn ws_client_method_opt_to_call( } } +/// Convert optional ETH method to token stream for identifier +fn eth_method_opt_to_token(eth_method: &Option) -> proc_macro2::TokenStream { + if let Some(method_info) = eth_method { + let method_name = &method_info.name; + quote! { Some(stringify!(#method_name)) } + } else { + quote! { None::<&str> } + } +} + +/// Convert optional ETH method to token stream for method call +fn eth_method_opt_to_call( + eth_method: &Option, + self_ty: &Box, +) -> proc_macro2::TokenStream { + if let Some(method_info) = eth_method { + let method_name = &method_info.name; + if method_info.is_async { + quote! { + // Create a raw pointer to state for use in the async block + let state_ptr: *mut #self_ty = state; + hyperware_process_lib::hyperapp::spawn(async move { + // Inside the async block, use the pointer to access state + unsafe { (*state_ptr).#method_name(eth_sub_result).await }; + }); + } + } else { + quote! { unsafe { (*state).#method_name(eth_sub_result) }; } + } + } else { + quote! {} + } +} + //------------------------------------------------------------------------------ // HTTP Helper Functions //------------------------------------------------------------------------------ @@ -1897,13 +1929,12 @@ fn generate_message_handlers( handler_arms: &HandlerDispatch, ws_method_call: &proc_macro2::TokenStream, ws_client_method_call: &proc_macro2::TokenStream, + eth_method_call: &proc_macro2::TokenStream, http_handlers: &[&FunctionMetadata], - eth_handlers: &[&FunctionMetadata], ) -> proc_macro2::TokenStream { let http_request_match_arms = &handler_arms.http; let local_and_remote_request_match_arms = &handler_arms.local_and_remote; let remote_request_match_arms = &handler_arms.remote; - let eth_request_match_arms = &handler_arms.eth; let http_context_setup = generate_http_context_setup(); let http_request_parsing = generate_http_request_parsing(); @@ -1917,7 +1948,7 @@ fn generate_message_handlers( let remote_message_handler = generate_remote_message_handler(self_ty, remote_request_match_arms); let eth_message_handler = - generate_eth_message_handler(self_ty, eth_request_match_arms); + generate_eth_message_handler(self_ty, eth_method_call); quote! { /// Handle WebSocket client messages @@ -1963,28 +1994,31 @@ fn generate_message_handlers( /// Generate ETH message handler fn generate_eth_message_handler( self_ty: &Box, - match_arms: &proc_macro2::TokenStream, + eth_method_call: &proc_macro2::TokenStream, ) -> proc_macro2::TokenStream { quote! { /// Handle ETH messages fn handle_eth_message(state: *mut #self_ty, message: hyperware_process_lib::Message) { hyperware_process_lib::logging::debug!("Processing ETH message from: {:?}", message.source()); - match serde_json::from_slice::(message.body()) { - Ok(request) => { + + // ETH messages contain EthSubResult directly, not wrapped in HPMRequest + match serde_json::from_slice::(message.body()) { + Ok(eth_sub_result) => { + hyperware_process_lib::logging::debug!("Successfully parsed EthSubResult, calling ETH handler"); + #eth_method_call unsafe { - #match_arms hyperware_process_lib::hyperapp::maybe_save_state(&mut *state); } }, Err(e) => { let raw_body = String::from_utf8_lossy(message.body()); hyperware_process_lib::logging::error!( - "Failed to deserialize ETH request into HPMRequest enum.\n\ + "Failed to deserialize ETH message into EthSubResult.\n\ Error: {}\n\ Source: {:?}\n\ Body: {}\n\ \n\ - 💡 This usually means the message format doesn't match any of your #[eth] handlers.", + 💡 This usually means the message format from eth:distro:sys doesn't match EthSubResult.", e, message.source(), raw_body ); } @@ -2013,10 +2047,10 @@ fn generate_component_impl( init_method_details: &InitMethodDetails, ws_method_details: &WsMethodDetails, ws_client_method_details: &WsClientMethodDetails, + eth_method_details: &EthMethodDetails, handler_arms: &HandlerDispatch, has_init_logging: bool, http_handlers: &[&FunctionMetadata], - eth_handlers: &[&FunctionMetadata], ) -> proc_macro2::TokenStream { // Extract values from args for use in the quote macro let name = &args.name; @@ -2049,6 +2083,7 @@ fn generate_component_impl( let init_method_call = &init_method_details.call; let ws_method_call = &ws_method_details.call; let ws_client_method_call = &ws_client_method_details.call; + let eth_method_call = ð_method_details.call; // Generate message handler functions let message_handlers = generate_message_handlers( @@ -2056,8 +2091,8 @@ fn generate_component_impl( handler_arms, ws_method_call, ws_client_method_call, + eth_method_call, http_handlers, - eth_handlers, ); // Generate the logging initialization conditionally @@ -2253,7 +2288,6 @@ pub fn hyperprocess(attr: TokenStream, item: TokenStream) -> TokenStream { self_ty, HandlerType::Local, ), - eth: generate_handler_dispatch(&handlers.eth, self_ty, HandlerType::Eth), }; // Clean the implementation block @@ -2277,7 +2311,11 @@ pub fn hyperprocess(attr: TokenStream, item: TokenStream) -> TokenStream { call: ws_client_method_opt_to_call(&ws_client_method, self_ty), }; - + // Prepare ETH method details for code generation + let eth_method_details = EthMethodDetails { + identifier: eth_method_opt_to_token(ð_method), + call: eth_method_opt_to_call(ð_method, self_ty), + }; // Generate the final output generate_component_impl( @@ -2289,10 +2327,10 @@ pub fn hyperprocess(attr: TokenStream, item: TokenStream) -> TokenStream { &init_method_details, &ws_method_details, &ws_client_method_details, + ð_method_details, &handler_arms, has_init_logging, &handlers.http, - &handlers.eth, ) .into() } From ec6a6142bad266c6395331152ac05adba7a10fe1 Mon Sep 17 00:00:00 2001 From: Gohlub <62673775+Gohlub@users.noreply.github.com> Date: Thu, 4 Sep 2025 17:37:52 -0400 Subject: [PATCH 5/6] fixed docs --- README.md | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 429a006..098a23e 100644 --- a/README.md +++ b/README.md @@ -469,7 +469,15 @@ fn handle_eth(&mut self, eth_sub_result: EthSubResult) -> String { // Handle subscription error println!("ETH subscription error: id={}, error={}", eth_sub_error.id, eth_sub_error.error); - "Subscription error handled".to_string() + + self.hypermap.provider.subscribe_loop( + 1, + make_filter(&state.hypermap, None), + 0, + 0, + ); + + "ETH subscription error resolved, subscription reinstated".to_string() } } } @@ -486,7 +494,15 @@ async fn handle_eth_async(&mut self, eth_sub_result: EthSubResult) -> String { Err(eth_sub_error) => { // Handle error asynchronously self.log_eth_error(ð_sub_error).await; - "Error logged".to_string() + + self.hypermap.provider.subscribe_loop( + 1, + make_filter(&state.hypermap, None), + 0, + 0, + ); + + "ETH subscription error resolved, subscription reinstated".to_string() } } } From dcfea0b8c2b5723689a73b529c9c052463ad76b0 Mon Sep 17 00:00:00 2001 From: Gohlub <62673775+Gohlub@users.noreply.github.com> Date: Thu, 4 Sep 2025 17:42:10 -0400 Subject: [PATCH 6/6] better handling --- README.md | 40 +++++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 098a23e..5cdd45d 100644 --- a/README.md +++ b/README.md @@ -455,7 +455,7 @@ The signature matches that of `#[ws]` for consistency. For handling Ethereum subscription updates from the `eth:distro:sys` service, use: ```rust -// Synchronous ETH handler +// Synchronous ETH handler with resubscription #[eth] fn handle_eth(&mut self, eth_sub_result: EthSubResult) -> String { match eth_sub_result { @@ -466,23 +466,25 @@ fn handle_eth(&mut self, eth_sub_result: EthSubResult) -> String { "Subscription update processed".to_string() } Err(eth_sub_error) => { - // Handle subscription error + // Handle subscription error with resubscription println!("ETH subscription error: id={}, error={}", eth_sub_error.id, eth_sub_error.error); - + + // Clean up existing subscription and resubscribe + let _ = self.hypermap.provider.unsubscribe(1); self.hypermap.provider.subscribe_loop( - 1, - make_filter(&state.hypermap, None), - 0, - 0, - ); - + 1, + make_filter(&self.hypermap, None), + 0, + 0, + ); + "ETH subscription error resolved, subscription reinstated".to_string() } } } -// Asynchronous ETH handler +// Asynchronous ETH handler with resubscription #[eth] async fn handle_eth_async(&mut self, eth_sub_result: EthSubResult) -> String { match eth_sub_result { @@ -492,16 +494,18 @@ async fn handle_eth_async(&mut self, eth_sub_result: EthSubResult) -> String { format!("Processed ETH event: {}", processed) } Err(eth_sub_error) => { - // Handle error asynchronously + // Handle error asynchronously with resubscription self.log_eth_error(ð_sub_error).await; + // Clean up existing subscription and resubscribe + let _ = self.hypermap.provider.unsubscribe(1); self.hypermap.provider.subscribe_loop( - 1, - make_filter(&state.hypermap, None), - 0, - 0, - ); - + 1, + make_filter(&self.hypermap, None), + 0, + 0, + ); + "ETH subscription error resolved, subscription reinstated".to_string() } } @@ -516,6 +520,8 @@ async fn handle_eth_async(&mut self, eth_sub_result: EthSubResult) -> String { - `EthSubResult` is a `Result` type where: - `EthSub` contains subscription updates with `id: u64` and `result: serde_json::Value` - `EthSubError` contains subscription errors with `id: u64` and `error: String` +- **Resubscription Pattern**: Always unsubscribe first, then resubscribe with current state + ### Binding Endpoints