From 10ecf5672d60461043b7c4d35552c425a043d28b Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Thu, 18 Jul 2024 20:55:17 +0200 Subject: [PATCH 1/5] sqlite: fix potential nullable columns read --- kinode/src/sqlite.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kinode/src/sqlite.rs b/kinode/src/sqlite.rs index dec0145cf..6608723c9 100644 --- a/kinode/src/sqlite.rs +++ b/kinode/src/sqlite.rs @@ -203,14 +203,14 @@ async fn handle_request( .query_map(rusqlite::params_from_iter(parameters.iter()), |row| { let mut map = HashMap::new(); for (i, column_name) in column_names.iter().enumerate() { - let value: SqlValue = row.get(i)?; + let value: Option = row.get(i)?; let value_json = match value { - SqlValue::Integer(int) => serde_json::Value::Number(int.into()), - SqlValue::Real(real) => serde_json::Value::Number( + Some(SqlValue::Integer(int)) => serde_json::Value::Number(int.into()), + Some(SqlValue::Real(real)) => serde_json::Value::Number( serde_json::Number::from_f64(real).unwrap(), ), - SqlValue::Text(text) => serde_json::Value::String(text), - SqlValue::Blob(blob) => { + Some(SqlValue::Text(text)) => serde_json::Value::String(text), + Some(SqlValue::Blob(blob)) => { serde_json::Value::String(base64_standard.encode(blob)) } // or another representation if you prefer _ => serde_json::Value::Null, From e08510bf47666c453c976b41dc2be0bc129f6ec8 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Thu, 18 Jul 2024 17:34:55 -0700 Subject: [PATCH 2/5] bump to 0.8.6 --- Cargo.toml | 2 +- kinode/Cargo.toml | 2 +- lib/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8b1a7f811..e37cf88ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kinode_lib" authors = ["KinodeDAO"] -version = "0.8.5" +version = "0.8.6" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org" diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index 054f596ff..c0f5ee856 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kinode" authors = ["KinodeDAO"] -version = "0.8.5" +version = "0.8.6" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org" diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 8c6367db7..b79fc8a7b 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lib" authors = ["KinodeDAO"] -version = "0.8.5" +version = "0.8.6" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org" From 8ea52f4ef238deb35b4feba2beb6c9b671379a55 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Thu, 18 Jul 2024 17:35:26 -0700 Subject: [PATCH 3/5] eth: fix deadlock casued by multiple subscriptions/requests --- kinode/src/eth/mod.rs | 118 ++++++++++++++++++++++++++++++--- kinode/src/eth/subscription.rs | 81 +++++++++++++++++++--- 2 files changed, 180 insertions(+), 19 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 2ca7d26d1..ea0b72eea 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -37,14 +37,14 @@ struct ActiveProviders { pub nodes: Vec, } -#[derive(Debug)] +#[derive(Debug, Clone)] struct UrlProvider { pub trusted: bool, pub url: String, pub pubsub: Option>, } -#[derive(Debug)] +#[derive(Debug, Clone)] struct NodeProvider { /// NOT CURRENTLY USED pub trusted: bool, @@ -581,8 +581,12 @@ async fn fulfill_request( let Some(method) = to_static_str(&method) else { return EthResponse::Err(EthError::InvalidMethod(method.to_string())); }; - let Some(mut aps) = providers.get_mut(&chain_id) else { - return EthResponse::Err(EthError::NoRpcForChain); + let mut urls = { + // in code block to drop providers lock asap to avoid deadlock + let Some(aps) = providers.get(&chain_id) else { + return EthResponse::Err(EthError::NoRpcForChain); + }; + aps.urls.clone() }; // first, try any url providers we have for this chain, @@ -590,7 +594,7 @@ async fn fulfill_request( // finally, if no provider works, return an error. // bump the successful provider to the front of the list for future requests - for (index, url_provider) in aps.urls.iter_mut().enumerate() { + for url_provider in urls.iter_mut() { let pubsub = match &url_provider.pubsub { Some(pubsub) => pubsub, None => { @@ -613,8 +617,27 @@ async fn fulfill_request( }; match pubsub.raw_request(method.into(), params.clone()).await { Ok(value) => { - let successful_provider = aps.urls.remove(index); - aps.urls.insert(0, successful_provider); + let mut is_replacement_successful = true; + providers + .entry(chain_id) + .and_modify(|aps| { + let Some(index) = find_index( + &aps.urls.iter().map(|u| u.url.as_str()).collect(), + &url_provider.url) else + { + is_replacement_successful = false; + return (); + }; + aps.urls.remove(index); + aps.urls.insert(0, url_provider.clone()); + }); + if !is_replacement_successful { + verbose_print( + print_tx, + &format!("eth: unexpectedly couldn't find provider to be modified"), + ) + .await; + } return EthResponse::Response { value }; } Err(rpc_error) => { @@ -631,11 +654,40 @@ async fn fulfill_request( return EthResponse::Err(EthError::RpcError(err)); } // this provider failed and needs to be reset - url_provider.pubsub = None; + let mut is_reset_successful = true; + providers + .entry(chain_id) + .and_modify(|aps| { + let Some(index) = find_index( + &aps.urls.iter().map(|u| u.url.as_str()).collect(), + &url_provider.url) else + { + is_reset_successful = false; + return (); + }; + let mut url = aps.urls.remove(index); + url.pubsub = None; + aps.urls.insert(index, url); + }); + if !is_reset_successful { + verbose_print( + print_tx, + &format!("eth: unexpectedly couldn't find provider to be modified"), + ) + .await; + } } } } - for node_provider in &mut aps.nodes { + + let nodes = { + // in code block to drop providers lock asap to avoid deadlock + let Some(aps) = providers.get(&chain_id) else { + return EthResponse::Err(EthError::NoRpcForChain); + }; + aps.nodes.clone() + }; + for node_provider in &nodes { verbose_print( print_tx, &format!( @@ -656,7 +708,12 @@ async fn fulfill_request( .await; if let EthResponse::Err(e) = response { if let EthError::RpcMalformedResponse = e { - node_provider.usable = false; + set_node_unusable( + &providers, + &chain_id, + &node_provider.kns_update.name, + print_tx, + ).await; } } else { return response; @@ -987,3 +1044,44 @@ async fn kernel_message( }) .await; } + +fn find_index(vec: &Vec<&str>, item: &str) -> Option { + vec.iter().enumerate().find_map(|(index, value)| { + if *value == item { + Some(index) + } else { + None + } + }) +} + +async fn set_node_unusable( + providers: &Providers, + chain_id: &u64, + node_name: &str, + print_tx: &PrintSender, +) -> bool { + let mut is_replacement_successful = true; + providers + .entry(chain_id.clone()) + .and_modify(|aps| { + let Some(index) = find_index( + &aps.nodes.iter().map(|n| n.kns_update.name.as_str()).collect(), + &node_name + ) else { + is_replacement_successful = false; + return (); + }; + let mut node = aps.nodes.remove(index); + node.usable = false; + aps.nodes.insert(index, node); + }); + if !is_replacement_successful { + verbose_print( + print_tx, + &format!("eth: unexpectedly couldn't find provider to be modified"), + ) + .await; + } + is_replacement_successful +} diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 41d9c8c14..737f5ec9e 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -180,15 +180,21 @@ async fn build_subscription( else { return Err(EthError::PermissionDenied); // will never hit }; - let Some(mut aps) = providers.get_mut(&chain_id) else { - return Err(EthError::NoRpcForChain); + let mut urls = { + // in code block to drop providers lock asap to avoid deadlock + let Some(aps) = providers.get(&chain_id) else { + return Err(EthError::NoRpcForChain); + }; + aps.urls.clone() }; + let chain_id = chain_id.clone(); + // first, try any url providers we have for this chain, // then if we have none or they all fail, go to node providers. // finally, if no provider works, return an error. // bump the successful provider to the front of the list for future requests - for (index, url_provider) in aps.urls.iter_mut().enumerate() { + for url_provider in urls.iter_mut() { let pubsub = match &url_provider.pubsub { Some(pubsub) => pubsub, None => { @@ -217,8 +223,27 @@ async fn build_subscription( { Ok(sub) => { let rx = sub.into_raw(); - let successful_provider = aps.urls.remove(index); - aps.urls.insert(0, successful_provider); + let mut is_replacement_successful = true; + providers + .entry(chain_id) + .and_modify(|aps| { + let Some(index) = find_index( + &aps.urls.iter().map(|u| u.url.as_str()).collect(), + &url_provider.url) else + { + is_replacement_successful = false; + return (); + }; + aps.urls.remove(index); + aps.urls.insert(0, url_provider.clone()); + }); + if !is_replacement_successful { + verbose_print( + print_tx, + &format!("eth: unexpectedly couldn't find provider to be modified"), + ) + .await; + } return Ok(Ok(rx)); } Err(rpc_error) => { @@ -231,7 +256,28 @@ async fn build_subscription( ) .await; // this provider failed and needs to be reset - url_provider.pubsub = None; + let mut is_reset_successful = true; + providers + .entry(chain_id) + .and_modify(|aps| { + let Some(index) = find_index( + &aps.urls.iter().map(|u| u.url.as_str()).collect(), + &url_provider.url) else + { + is_reset_successful = false; + return (); + }; + let mut url = aps.urls.remove(index); + url.pubsub = None; + aps.urls.insert(index, url); + }); + if !is_reset_successful { + verbose_print( + print_tx, + &format!("eth: unexpectedly couldn't find provider to be modified"), + ) + .await; + } } } } @@ -241,7 +287,14 @@ async fn build_subscription( // we need to create our own unique sub id because in the remote provider node, // all subs will be identified under our process address. let remote_sub_id = rand::random(); - for node_provider in &mut aps.nodes { + let nodes = { + // in code block to drop providers lock asap to avoid deadlock + let Some(aps) = providers.get(&chain_id) else { + return Err(EthError::NoRpcForChain); + }; + aps.nodes.clone() + }; + for node_provider in &nodes { verbose_print( &print_tx, &format!( @@ -283,11 +336,21 @@ async fn build_subscription( } EthResponse::Response { .. } => { // the response to a SubscribeLogs request must be an 'ok' - node_provider.usable = false; + set_node_unusable( + &providers, + &chain_id, + &node_provider.kns_update.name, + print_tx, + ).await; } EthResponse::Err(e) => { if let EthError::RpcMalformedResponse = e { - node_provider.usable = false; + set_node_unusable( + &providers, + &chain_id, + &node_provider.kns_update.name, + print_tx, + ).await; } } } From a9f73c5d25731e5418c083b51734d4ef05ab0095 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 19 Jul 2024 00:37:20 +0000 Subject: [PATCH 4/5] Format Rust code using rustfmt --- kinode/src/eth/mod.rs | 98 +++++++++++++++++----------------- kinode/src/eth/subscription.rs | 56 ++++++++++--------- 2 files changed, 76 insertions(+), 78 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index ea0b72eea..53d640aad 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -618,19 +618,17 @@ async fn fulfill_request( match pubsub.raw_request(method.into(), params.clone()).await { Ok(value) => { let mut is_replacement_successful = true; - providers - .entry(chain_id) - .and_modify(|aps| { - let Some(index) = find_index( - &aps.urls.iter().map(|u| u.url.as_str()).collect(), - &url_provider.url) else - { - is_replacement_successful = false; - return (); - }; - aps.urls.remove(index); - aps.urls.insert(0, url_provider.clone()); - }); + providers.entry(chain_id).and_modify(|aps| { + let Some(index) = find_index( + &aps.urls.iter().map(|u| u.url.as_str()).collect(), + &url_provider.url, + ) else { + is_replacement_successful = false; + return (); + }; + aps.urls.remove(index); + aps.urls.insert(0, url_provider.clone()); + }); if !is_replacement_successful { verbose_print( print_tx, @@ -655,20 +653,18 @@ async fn fulfill_request( } // this provider failed and needs to be reset let mut is_reset_successful = true; - providers - .entry(chain_id) - .and_modify(|aps| { - let Some(index) = find_index( - &aps.urls.iter().map(|u| u.url.as_str()).collect(), - &url_provider.url) else - { - is_reset_successful = false; - return (); - }; - let mut url = aps.urls.remove(index); - url.pubsub = None; - aps.urls.insert(index, url); - }); + providers.entry(chain_id).and_modify(|aps| { + let Some(index) = find_index( + &aps.urls.iter().map(|u| u.url.as_str()).collect(), + &url_provider.url, + ) else { + is_reset_successful = false; + return (); + }; + let mut url = aps.urls.remove(index); + url.pubsub = None; + aps.urls.insert(index, url); + }); if !is_reset_successful { verbose_print( print_tx, @@ -713,7 +709,8 @@ async fn fulfill_request( &chain_id, &node_provider.kns_update.name, print_tx, - ).await; + ) + .await; } } else { return response; @@ -1046,13 +1043,15 @@ async fn kernel_message( } fn find_index(vec: &Vec<&str>, item: &str) -> Option { - vec.iter().enumerate().find_map(|(index, value)| { - if *value == item { - Some(index) - } else { - None - } - }) + vec.iter().enumerate().find_map( + |(index, value)| { + if *value == item { + Some(index) + } else { + None + } + }, + ) } async fn set_node_unusable( @@ -1062,20 +1061,21 @@ async fn set_node_unusable( print_tx: &PrintSender, ) -> bool { let mut is_replacement_successful = true; - providers - .entry(chain_id.clone()) - .and_modify(|aps| { - let Some(index) = find_index( - &aps.nodes.iter().map(|n| n.kns_update.name.as_str()).collect(), - &node_name - ) else { - is_replacement_successful = false; - return (); - }; - let mut node = aps.nodes.remove(index); - node.usable = false; - aps.nodes.insert(index, node); - }); + providers.entry(chain_id.clone()).and_modify(|aps| { + let Some(index) = find_index( + &aps.nodes + .iter() + .map(|n| n.kns_update.name.as_str()) + .collect(), + &node_name, + ) else { + is_replacement_successful = false; + return (); + }; + let mut node = aps.nodes.remove(index); + node.usable = false; + aps.nodes.insert(index, node); + }); if !is_replacement_successful { verbose_print( print_tx, diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 737f5ec9e..3c7e37973 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -224,19 +224,17 @@ async fn build_subscription( Ok(sub) => { let rx = sub.into_raw(); let mut is_replacement_successful = true; - providers - .entry(chain_id) - .and_modify(|aps| { - let Some(index) = find_index( - &aps.urls.iter().map(|u| u.url.as_str()).collect(), - &url_provider.url) else - { - is_replacement_successful = false; - return (); - }; - aps.urls.remove(index); - aps.urls.insert(0, url_provider.clone()); - }); + providers.entry(chain_id).and_modify(|aps| { + let Some(index) = find_index( + &aps.urls.iter().map(|u| u.url.as_str()).collect(), + &url_provider.url, + ) else { + is_replacement_successful = false; + return (); + }; + aps.urls.remove(index); + aps.urls.insert(0, url_provider.clone()); + }); if !is_replacement_successful { verbose_print( print_tx, @@ -257,20 +255,18 @@ async fn build_subscription( .await; // this provider failed and needs to be reset let mut is_reset_successful = true; - providers - .entry(chain_id) - .and_modify(|aps| { - let Some(index) = find_index( - &aps.urls.iter().map(|u| u.url.as_str()).collect(), - &url_provider.url) else - { - is_reset_successful = false; - return (); - }; - let mut url = aps.urls.remove(index); - url.pubsub = None; - aps.urls.insert(index, url); - }); + providers.entry(chain_id).and_modify(|aps| { + let Some(index) = find_index( + &aps.urls.iter().map(|u| u.url.as_str()).collect(), + &url_provider.url, + ) else { + is_reset_successful = false; + return (); + }; + let mut url = aps.urls.remove(index); + url.pubsub = None; + aps.urls.insert(index, url); + }); if !is_reset_successful { verbose_print( print_tx, @@ -341,7 +337,8 @@ async fn build_subscription( &chain_id, &node_provider.kns_update.name, print_tx, - ).await; + ) + .await; } EthResponse::Err(e) => { if let EthError::RpcMalformedResponse = e { @@ -350,7 +347,8 @@ async fn build_subscription( &chain_id, &node_provider.kns_update.name, print_tx, - ).await; + ) + .await; } } } From 43733d955118b1fc73eb500c0dea61aa9a4e19be Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Thu, 18 Jul 2024 20:12:34 -0700 Subject: [PATCH 5/5] bump Cargo.lock --- Cargo.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2ed34e20..a695cf9c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3203,7 +3203,7 @@ dependencies = [ [[package]] name = "kinode" -version = "0.8.5" +version = "0.8.6" dependencies = [ "aes-gcm", "alloy", @@ -3259,7 +3259,7 @@ dependencies = [ [[package]] name = "kinode_lib" -version = "0.8.5" +version = "0.8.6" dependencies = [ "lib", ] @@ -3376,7 +3376,7 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "lib" -version = "0.8.5" +version = "0.8.6" dependencies = [ "alloy", "kit",