From 13d56dfeaed938ba0094a9373db0aca6fe36189d Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Fri, 30 Aug 2024 00:45:24 -0700 Subject: [PATCH 1/5] eth: fix remote provider 1. dont unsubscribe, since subs are de-duped by alloy; if you unsub one mint, e.g., you unsub all 2. cache eth requests 3. retry eth requests once on rpc error --- Cargo.lock | 1 + kinode/Cargo.toml | 1 + kinode/src/eth/mod.rs | 128 +++++++++++++++++++++++++++------ kinode/src/eth/subscription.rs | 22 +++--- lib/src/eth.rs | 4 +- 5 files changed, 118 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 044df8212..41152555a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3547,6 +3547,7 @@ dependencies = [ "hex", "hmac", "http 1.1.0", + "indexmap", "jwt", "kit 0.6.10", "lazy_static", diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index 67077b3e1..3536f3def 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -58,6 +58,7 @@ generic-array = "0.14.7" hex = "0.4.3" hmac = "0.12" http = "1.1.0" +indexmap = "2.4" jwt = "0.16" lib = { path = "../lib" } lazy_static = "1.4.0" diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 82fc34998..7d5928175 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -4,11 +4,14 @@ use alloy::rpc::client::WsConnect; use alloy::rpc::json_rpc::RpcError; use anyhow::Result; use dashmap::DashMap; +use indexmap::IndexMap; use lib::types::core::*; use lib::types::eth::*; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::Mutex; use tokio::task::JoinHandle; use url::Url; @@ -158,8 +161,17 @@ struct ModuleState { send_to_loop: MessageSender, /// our sender for terminal prints print_tx: PrintSender, + /// cache of ETH requests + request_cache: RequestCache, + /// duration since we sent last eth_blockNumber request + since_last_block_number: Arc>>, } +type RequestCache = Arc, EthResponse>>>; + +const DELAY_MS: u64 = 1_000; +const MAX_REQUEST_CACHE_LEN: usize = 500; + /// TODO replace with alloy abstraction fn valid_method(method: &str) -> Option<&'static str> { match method { @@ -240,6 +252,8 @@ pub async fn provider( response_channels: Arc::new(DashMap::new()), send_to_loop, print_tx, + request_cache: Arc::new(Mutex::new(IndexMap::new())), + since_last_block_number: Arc::new(Mutex::new(None)), }; // convert saved configs into data structure that we will use to route queries @@ -598,13 +612,15 @@ async fn handle_eth_action( } } EthAction::Request { .. } => { - let (sender, receiver) = tokio::sync::mpsc::channel(1); + let (sender, mut receiver) = tokio::sync::mpsc::channel(1); state.response_channels.insert(km.id, sender); let our = state.our.to_string(); let send_to_loop = state.send_to_loop.clone(); let providers = state.providers.clone(); let response_channels = state.response_channels.clone(); let print_tx = state.print_tx.clone(); + let mut request_cache = Arc::clone(&state.request_cache); + let mut since_last_block_number = Arc::clone(&state.since_last_block_number); tokio::spawn(async move { match tokio::time::timeout( std::time::Duration::from_secs(timeout), @@ -612,26 +628,67 @@ async fn handle_eth_action( &our, km.id, &send_to_loop, - eth_action, - providers, - receiver, + ð_action, + &providers, + &mut receiver, &print_tx, + &mut request_cache, + &mut since_last_block_number, ), ) .await { Ok(response) => { - kernel_message( - &our, - km.id, - km.rsvp.unwrap_or(km.source), - None, - false, - None, - response, - &send_to_loop, - ) - .await; + if let EthResponse::Err(EthError::RpcError(_)) = response { + // try one more time after 1s delay in case RPC is rate limiting + std::thread::sleep(std::time::Duration::from_millis(DELAY_MS)); + match tokio::time::timeout( + std::time::Duration::from_secs(timeout), + fulfill_request( + &our, + km.id, + &send_to_loop, + ð_action, + &providers, + &mut receiver, + &print_tx, + &mut request_cache, + &mut since_last_block_number, + ), + ) + .await { + Ok(response) => { + kernel_message( + &our, + km.id, + km.rsvp.clone().unwrap_or(km.source.clone()), + None, + false, + None, + response, + &send_to_loop, + ) + .await; + } + Err(_) => { + // task timeout + error_message(&our, km.id, km.source.clone(), EthError::RpcTimeout, &send_to_loop) + .await; + } + } + } else { + kernel_message( + &our, + km.id, + km.rsvp.unwrap_or(km.source), + None, + false, + None, + response, + &send_to_loop, + ) + .await; + } } Err(_) => { // task timeout @@ -650,19 +707,36 @@ async fn fulfill_request( our: &str, km_id: u64, send_to_loop: &MessageSender, - eth_action: EthAction, - providers: Providers, - mut remote_request_receiver: ProcessMessageReceiver, + eth_action: &EthAction, + providers: &Providers, + remote_request_receiver: &mut ProcessMessageReceiver, print_tx: &PrintSender, + request_cache: &mut RequestCache, + since_last_block_number: &mut Arc>>, ) -> EthResponse { + let serialized_action = serde_json::to_vec(eth_action).unwrap(); let EthAction::Request { - chain_id, + ref chain_id, ref method, ref params, } = eth_action else { return EthResponse::Err(EthError::PermissionDenied); // will never hit }; + { + let since_last_block_number = since_last_block_number.lock().await; + if since_last_block_number.is_some_and(|t| t.elapsed() > Duration::from_millis(DELAY_MS)) + || method != "eth_blockNumber" + { + let mut request_cache = request_cache.lock().await; + if let Some(cache_hit) = request_cache.shift_remove(&serialized_action) { + // refresh cache entry (it is most recently accessed) & return it + println!("cache hit\r"); + request_cache.insert(serialized_action, cache_hit.clone()); + return cache_hit; + } + } + } let Some(method) = valid_method(&method) else { return EthResponse::Err(EthError::InvalidMethod(method.to_string())); }; @@ -703,7 +777,7 @@ async fn fulfill_request( match pubsub.raw_request(method.into(), params.clone()).await { Ok(value) => { let mut is_replacement_successful = true; - providers.entry(chain_id).and_modify(|aps| { + providers.entry(chain_id.clone()).and_modify(|aps| { let Some(index) = find_index( &aps.urls.iter().map(|u| u.url.as_str()).collect(), &url_provider.url, @@ -724,7 +798,15 @@ async fn fulfill_request( ) .await; } - return EthResponse::Response { value }; + let response = EthResponse::Response { value }; + let mut request_cache = request_cache.lock().await; + println!("cache add"); + if request_cache.len() >= MAX_REQUEST_CACHE_LEN { + // drop 10% oldest cache entries + request_cache.drain(0..MAX_REQUEST_CACHE_LEN / 10); + } + request_cache.insert(serialized_action, response.clone()); + return response; } Err(rpc_error) => { verbose_print( @@ -741,7 +823,7 @@ async fn fulfill_request( } // this provider failed and needs to be reset let mut is_reset_successful = true; - providers.entry(chain_id).and_modify(|aps| { + providers.entry(chain_id.clone()).and_modify(|aps| { let Some(index) = find_index( &aps.urls.iter().map(|u| u.url.as_str()).collect(), &url_provider.url, @@ -787,7 +869,7 @@ async fn fulfill_request( node_provider, eth_action.clone(), send_to_loop, - &mut remote_request_receiver, + remote_request_receiver, ) .await; if let EthResponse::Err(e) = response { diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index a49cbf972..33ce369dc 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -386,24 +386,20 @@ async fn maintain_local_subscription( mut close_receiver: tokio::sync::mpsc::Receiver, print_tx: &PrintSender, ) -> Result<(), EthSubError> { - loop { + let e = loop { tokio::select! { _ = close_receiver.recv() => { - unsubscribe(rx, &chain_id, providers, print_tx).await; + //unsubscribe(rx, &chain_id, providers, print_tx).await; return Ok(()); }, value = rx.recv() => { - let Ok(value) = value else { - break; + let value = match value { + Ok(v) => v, + Err(e) => break e.to_string(), }; let result: SubscriptionResult = match serde_json::from_str(value.get()) { Ok(res) => res, - Err(e) => { - return Err(EthSubError { - id: sub_id, - error: e.to_string(), - }); - } + Err(e) => break e.to_string(), }; kernel_message( our, @@ -418,16 +414,16 @@ async fn maintain_local_subscription( .await; }, } - } + }; active_subscriptions .entry(target.clone()) .and_modify(|sub_map| { sub_map.remove(&sub_id); }); - unsubscribe(rx, &chain_id, providers, print_tx).await; + //unsubscribe(rx, &chain_id, providers, print_tx).await; Err(EthSubError { id: sub_id, - error: format!("subscription ({target}) closed unexpectedly"), + error: format!("subscription ({target}) closed unexpectedly {e}"), }) } diff --git a/lib/src/eth.rs b/lib/src/eth.rs index db4f05dd3..2f88244f1 100644 --- a/lib/src/eth.rs +++ b/lib/src/eth.rs @@ -53,14 +53,14 @@ pub struct EthSubError { /// /// In the case of an [`EthAction::SubscribeLogs`] request, the response will indicate if /// the subscription was successfully created or not. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub enum EthResponse { Ok, Response { value: serde_json::Value }, Err(EthError), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub enum EthError { /// RPC provider returned an error RpcError(ErrorPayload), From dc267a41f9c8764a3cf8ae203cc49d04d797e524 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 30 Aug 2024 07:47:25 +0000 Subject: [PATCH 2/5] Format Rust code using rustfmt --- kinode/src/eth/mod.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 7d5928175..ae355bab1 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -656,7 +656,8 @@ async fn handle_eth_action( &mut since_last_block_number, ), ) - .await { + .await + { Ok(response) => { kernel_message( &our, @@ -672,8 +673,14 @@ async fn handle_eth_action( } Err(_) => { // task timeout - error_message(&our, km.id, km.source.clone(), EthError::RpcTimeout, &send_to_loop) - .await; + error_message( + &our, + km.id, + km.source.clone(), + EthError::RpcTimeout, + &send_to_loop, + ) + .await; } } } else { From 0972b09d8589c177836f566d0631947b0747868b Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Fri, 30 Aug 2024 09:00:22 -0700 Subject: [PATCH 3/5] make cache entries go stale --- kinode/src/eth/mod.rs | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index ae355bab1..f410338f8 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -163,11 +163,9 @@ struct ModuleState { print_tx: PrintSender, /// cache of ETH requests request_cache: RequestCache, - /// duration since we sent last eth_blockNumber request - since_last_block_number: Arc>>, } -type RequestCache = Arc, EthResponse>>>; +type RequestCache = Arc, (EthResponse, Instant)>>>; const DELAY_MS: u64 = 1_000; const MAX_REQUEST_CACHE_LEN: usize = 500; @@ -253,7 +251,6 @@ pub async fn provider( send_to_loop, print_tx, request_cache: Arc::new(Mutex::new(IndexMap::new())), - since_last_block_number: Arc::new(Mutex::new(None)), }; // convert saved configs into data structure that we will use to route queries @@ -620,7 +617,6 @@ async fn handle_eth_action( let response_channels = state.response_channels.clone(); let print_tx = state.print_tx.clone(); let mut request_cache = Arc::clone(&state.request_cache); - let mut since_last_block_number = Arc::clone(&state.since_last_block_number); tokio::spawn(async move { match tokio::time::timeout( std::time::Duration::from_secs(timeout), @@ -633,7 +629,6 @@ async fn handle_eth_action( &mut receiver, &print_tx, &mut request_cache, - &mut since_last_block_number, ), ) .await @@ -653,7 +648,6 @@ async fn handle_eth_action( &mut receiver, &print_tx, &mut request_cache, - &mut since_last_block_number, ), ) .await @@ -719,7 +713,6 @@ async fn fulfill_request( remote_request_receiver: &mut ProcessMessageReceiver, print_tx: &PrintSender, request_cache: &mut RequestCache, - since_last_block_number: &mut Arc>>, ) -> EthResponse { let serialized_action = serde_json::to_vec(eth_action).unwrap(); let EthAction::Request { @@ -731,15 +724,12 @@ async fn fulfill_request( return EthResponse::Err(EthError::PermissionDenied); // will never hit }; { - let since_last_block_number = since_last_block_number.lock().await; - if since_last_block_number.is_some_and(|t| t.elapsed() > Duration::from_millis(DELAY_MS)) - || method != "eth_blockNumber" - { - let mut request_cache = request_cache.lock().await; - if let Some(cache_hit) = request_cache.shift_remove(&serialized_action) { - // refresh cache entry (it is most recently accessed) & return it + let mut request_cache = request_cache.lock().await; + if let Some((cache_hit, time_of_hit)) = request_cache.shift_remove(&serialized_action) { + // refresh cache entry (it is most recently accessed) & return it + if time_of_hit.elapsed() > Duration::from_millis(DELAY_MS) { println!("cache hit\r"); - request_cache.insert(serialized_action, cache_hit.clone()); + request_cache.insert(serialized_action, (cache_hit.clone(), time_of_hit)); return cache_hit; } } @@ -812,7 +802,7 @@ async fn fulfill_request( // drop 10% oldest cache entries request_cache.drain(0..MAX_REQUEST_CACHE_LEN / 10); } - request_cache.insert(serialized_action, response.clone()); + request_cache.insert(serialized_action, (response.clone(), Instant::now())); return response; } Err(rpc_error) => { From 953f1ffafbe60b9cbdcbbfaf34d5fdf250469593 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Fri, 30 Aug 2024 09:15:04 -0700 Subject: [PATCH 4/5] fix conditional expression --- kinode/src/eth/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index f410338f8..f8e816261 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -727,7 +727,7 @@ async fn fulfill_request( let mut request_cache = request_cache.lock().await; if let Some((cache_hit, time_of_hit)) = request_cache.shift_remove(&serialized_action) { // refresh cache entry (it is most recently accessed) & return it - if time_of_hit.elapsed() > Duration::from_millis(DELAY_MS) { + if time_of_hit.elapsed() < Duration::from_millis(DELAY_MS) { println!("cache hit\r"); request_cache.insert(serialized_action, (cache_hit.clone(), time_of_hit)); return cache_hit; From 0a94f7aa26a89a0446b37e97805e83a8dd07111d Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Fri, 30 Aug 2024 16:36:24 -0700 Subject: [PATCH 5/5] remove prints --- kinode/src/eth/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index f8e816261..cf9d371ea 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -728,7 +728,6 @@ async fn fulfill_request( if let Some((cache_hit, time_of_hit)) = request_cache.shift_remove(&serialized_action) { // refresh cache entry (it is most recently accessed) & return it if time_of_hit.elapsed() < Duration::from_millis(DELAY_MS) { - println!("cache hit\r"); request_cache.insert(serialized_action, (cache_hit.clone(), time_of_hit)); return cache_hit; } @@ -797,7 +796,6 @@ async fn fulfill_request( } let response = EthResponse::Response { value }; let mut request_cache = request_cache.lock().await; - println!("cache add"); if request_cache.len() >= MAX_REQUEST_CACHE_LEN { // drop 10% oldest cache entries request_cache.drain(0..MAX_REQUEST_CACHE_LEN / 10);