Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions kinode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ generic-array = "0.14.7"
hex = "0.4.3"
hmac = "0.12"
http = "1.1.0"
indexmap = "2.4"
jwt = "0.16"
lib = { path = "../lib" }
lazy_static = "1.4.0"
Expand Down
123 changes: 100 additions & 23 deletions kinode/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use alloy::rpc::client::WsConnect;
use alloy::rpc::json_rpc::RpcError;
use anyhow::Result;
use dashmap::DashMap;
use indexmap::IndexMap;
use lib::types::core::*;
use lib::types::eth::*;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use url::Url;

Expand Down Expand Up @@ -158,8 +161,15 @@ struct ModuleState {
send_to_loop: MessageSender,
/// our sender for terminal prints
print_tx: PrintSender,
/// cache of ETH requests
request_cache: RequestCache,
}

type RequestCache = Arc<Mutex<IndexMap<Vec<u8>, (EthResponse, Instant)>>>;

const DELAY_MS: u64 = 1_000;
const MAX_REQUEST_CACHE_LEN: usize = 500;

/// TODO replace with alloy abstraction
fn valid_method(method: &str) -> Option<&'static str> {
match method {
Expand Down Expand Up @@ -240,6 +250,7 @@ pub async fn provider(
response_channels: Arc::new(DashMap::new()),
send_to_loop,
print_tx,
request_cache: Arc::new(Mutex::new(IndexMap::new())),
};

// convert saved configs into data structure that we will use to route queries
Expand Down Expand Up @@ -598,40 +609,87 @@ async fn handle_eth_action(
}
}
EthAction::Request { .. } => {
let (sender, receiver) = tokio::sync::mpsc::channel(1);
let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
state.response_channels.insert(km.id, sender);
let our = state.our.to_string();
let send_to_loop = state.send_to_loop.clone();
let providers = state.providers.clone();
let response_channels = state.response_channels.clone();
let print_tx = state.print_tx.clone();
let mut request_cache = Arc::clone(&state.request_cache);
tokio::spawn(async move {
match tokio::time::timeout(
std::time::Duration::from_secs(timeout),
fulfill_request(
&our,
km.id,
&send_to_loop,
eth_action,
providers,
receiver,
&eth_action,
&providers,
&mut receiver,
&print_tx,
&mut request_cache,
),
)
.await
{
Ok(response) => {
kernel_message(
&our,
km.id,
km.rsvp.unwrap_or(km.source),
None,
false,
None,
response,
&send_to_loop,
)
.await;
if let EthResponse::Err(EthError::RpcError(_)) = response {
// try one more time after 1s delay in case RPC is rate limiting
std::thread::sleep(std::time::Duration::from_millis(DELAY_MS));
match tokio::time::timeout(
std::time::Duration::from_secs(timeout),
fulfill_request(
&our,
km.id,
&send_to_loop,
&eth_action,
&providers,
&mut receiver,
&print_tx,
&mut request_cache,
),
)
.await
{
Ok(response) => {
kernel_message(
&our,
km.id,
km.rsvp.clone().unwrap_or(km.source.clone()),
None,
false,
None,
response,
&send_to_loop,
)
.await;
}
Err(_) => {
// task timeout
error_message(
&our,
km.id,
km.source.clone(),
EthError::RpcTimeout,
&send_to_loop,
)
.await;
}
}
} else {
kernel_message(
&our,
km.id,
km.rsvp.unwrap_or(km.source),
None,
false,
None,
response,
&send_to_loop,
)
.await;
}
}
Err(_) => {
// task timeout
Expand All @@ -650,19 +708,31 @@ async fn fulfill_request(
our: &str,
km_id: u64,
send_to_loop: &MessageSender,
eth_action: EthAction,
providers: Providers,
mut remote_request_receiver: ProcessMessageReceiver,
eth_action: &EthAction,
providers: &Providers,
remote_request_receiver: &mut ProcessMessageReceiver,
print_tx: &PrintSender,
request_cache: &mut RequestCache,
) -> EthResponse {
let serialized_action = serde_json::to_vec(eth_action).unwrap();
let EthAction::Request {
chain_id,
ref chain_id,
ref method,
ref params,
} = eth_action
else {
return EthResponse::Err(EthError::PermissionDenied); // will never hit
};
{
let mut request_cache = request_cache.lock().await;
if let Some((cache_hit, time_of_hit)) = request_cache.shift_remove(&serialized_action) {
// refresh cache entry (it is most recently accessed) & return it
if time_of_hit.elapsed() < Duration::from_millis(DELAY_MS) {
request_cache.insert(serialized_action, (cache_hit.clone(), time_of_hit));
return cache_hit;
}
}
}
let Some(method) = valid_method(&method) else {
return EthResponse::Err(EthError::InvalidMethod(method.to_string()));
};
Expand Down Expand Up @@ -703,7 +773,7 @@ async fn fulfill_request(
match pubsub.raw_request(method.into(), params.clone()).await {
Ok(value) => {
let mut is_replacement_successful = true;
providers.entry(chain_id).and_modify(|aps| {
providers.entry(chain_id.clone()).and_modify(|aps| {
let Some(index) = find_index(
&aps.urls.iter().map(|u| u.url.as_str()).collect(),
&url_provider.url,
Expand All @@ -724,7 +794,14 @@ async fn fulfill_request(
)
.await;
}
return EthResponse::Response { value };
let response = EthResponse::Response { value };
let mut request_cache = request_cache.lock().await;
if request_cache.len() >= MAX_REQUEST_CACHE_LEN {
// drop 10% oldest cache entries
request_cache.drain(0..MAX_REQUEST_CACHE_LEN / 10);
}
request_cache.insert(serialized_action, (response.clone(), Instant::now()));
return response;
}
Err(rpc_error) => {
verbose_print(
Expand All @@ -741,7 +818,7 @@ async fn fulfill_request(
}
// this provider failed and needs to be reset
let mut is_reset_successful = true;
providers.entry(chain_id).and_modify(|aps| {
providers.entry(chain_id.clone()).and_modify(|aps| {
let Some(index) = find_index(
&aps.urls.iter().map(|u| u.url.as_str()).collect(),
&url_provider.url,
Expand Down Expand Up @@ -787,7 +864,7 @@ async fn fulfill_request(
node_provider,
eth_action.clone(),
send_to_loop,
&mut remote_request_receiver,
remote_request_receiver,
)
.await;
if let EthResponse::Err(e) = response {
Expand Down
22 changes: 9 additions & 13 deletions kinode/src/eth/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,24 +386,20 @@ async fn maintain_local_subscription(
mut close_receiver: tokio::sync::mpsc::Receiver<bool>,
print_tx: &PrintSender,
) -> Result<(), EthSubError> {
loop {
let e = loop {
tokio::select! {
_ = close_receiver.recv() => {
unsubscribe(rx, &chain_id, providers, print_tx).await;
//unsubscribe(rx, &chain_id, providers, print_tx).await;
return Ok(());
},
value = rx.recv() => {
let Ok(value) = value else {
break;
let value = match value {
Ok(v) => v,
Err(e) => break e.to_string(),
};
let result: SubscriptionResult = match serde_json::from_str(value.get()) {
Ok(res) => res,
Err(e) => {
return Err(EthSubError {
id: sub_id,
error: e.to_string(),
});
}
Err(e) => break e.to_string(),
};
kernel_message(
our,
Expand All @@ -418,16 +414,16 @@ async fn maintain_local_subscription(
.await;
},
}
}
};
active_subscriptions
.entry(target.clone())
.and_modify(|sub_map| {
sub_map.remove(&sub_id);
});
unsubscribe(rx, &chain_id, providers, print_tx).await;
//unsubscribe(rx, &chain_id, providers, print_tx).await;
Err(EthSubError {
id: sub_id,
error: format!("subscription ({target}) closed unexpectedly"),
error: format!("subscription ({target}) closed unexpectedly {e}"),
})
}

Expand Down
4 changes: 2 additions & 2 deletions lib/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ pub struct EthSubError {
///
/// In the case of an [`EthAction::SubscribeLogs`] request, the response will indicate if
/// the subscription was successfully created or not.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum EthResponse {
Ok,
Response { value: serde_json::Value },
Err(EthError),
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum EthError {
/// RPC provider returned an error
RpcError(ErrorPayload),
Expand Down