Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kinode_lib"
authors = ["KinodeDAO"]
version = "0.8.5"
version = "0.8.6"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"
Expand Down
2 changes: 1 addition & 1 deletion kinode/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kinode"
authors = ["KinodeDAO"]
version = "0.8.5"
version = "0.8.6"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"
Expand Down
118 changes: 108 additions & 10 deletions kinode/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ struct ActiveProviders {
pub nodes: Vec<NodeProvider>,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct UrlProvider {
pub trusted: bool,
pub url: String,
pub pubsub: Option<RootProvider<PubSubFrontend>>,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct NodeProvider {
/// NOT CURRENTLY USED
pub trusted: bool,
Expand Down Expand Up @@ -581,16 +581,20 @@ async fn fulfill_request(
let Some(method) = to_static_str(&method) else {
return EthResponse::Err(EthError::InvalidMethod(method.to_string()));
};
let Some(mut aps) = providers.get_mut(&chain_id) else {
return EthResponse::Err(EthError::NoRpcForChain);
let mut urls = {
// in code block to drop providers lock asap to avoid deadlock
let Some(aps) = providers.get(&chain_id) else {
return EthResponse::Err(EthError::NoRpcForChain);
};
aps.urls.clone()
};

// first, try any url providers we have for this chain,
// then if we have none or they all fail, go to node providers.
// finally, if no provider works, return an error.

// bump the successful provider to the front of the list for future requests
for (index, url_provider) in aps.urls.iter_mut().enumerate() {
for url_provider in urls.iter_mut() {
let pubsub = match &url_provider.pubsub {
Some(pubsub) => pubsub,
None => {
Expand All @@ -613,8 +617,25 @@ async fn fulfill_request(
};
match pubsub.raw_request(method.into(), params.clone()).await {
Ok(value) => {
let successful_provider = aps.urls.remove(index);
aps.urls.insert(0, successful_provider);
let mut is_replacement_successful = true;
providers.entry(chain_id).and_modify(|aps| {
let Some(index) = find_index(
&aps.urls.iter().map(|u| u.url.as_str()).collect(),
&url_provider.url,
) else {
is_replacement_successful = false;
return ();
};
aps.urls.remove(index);
aps.urls.insert(0, url_provider.clone());
});
if !is_replacement_successful {
verbose_print(
print_tx,
&format!("eth: unexpectedly couldn't find provider to be modified"),
)
.await;
}
return EthResponse::Response { value };
}
Err(rpc_error) => {
Expand All @@ -631,11 +652,38 @@ async fn fulfill_request(
return EthResponse::Err(EthError::RpcError(err));
}
// this provider failed and needs to be reset
url_provider.pubsub = None;
let mut is_reset_successful = true;
providers.entry(chain_id).and_modify(|aps| {
let Some(index) = find_index(
&aps.urls.iter().map(|u| u.url.as_str()).collect(),
&url_provider.url,
) else {
is_reset_successful = false;
return ();
};
let mut url = aps.urls.remove(index);
url.pubsub = None;
aps.urls.insert(index, url);
});
if !is_reset_successful {
verbose_print(
print_tx,
&format!("eth: unexpectedly couldn't find provider to be modified"),
)
.await;
}
}
}
}
for node_provider in &mut aps.nodes {

let nodes = {
// in code block to drop providers lock asap to avoid deadlock
let Some(aps) = providers.get(&chain_id) else {
return EthResponse::Err(EthError::NoRpcForChain);
};
aps.nodes.clone()
};
for node_provider in &nodes {
verbose_print(
print_tx,
&format!(
Expand All @@ -656,7 +704,13 @@ async fn fulfill_request(
.await;
if let EthResponse::Err(e) = response {
if let EthError::RpcMalformedResponse = e {
node_provider.usable = false;
set_node_unusable(
&providers,
&chain_id,
&node_provider.kns_update.name,
print_tx,
)
.await;
}
} else {
return response;
Expand Down Expand Up @@ -987,3 +1041,47 @@ async fn kernel_message<T: Serialize>(
})
.await;
}

fn find_index(vec: &Vec<&str>, item: &str) -> Option<usize> {
vec.iter().enumerate().find_map(
|(index, value)| {
if *value == item {
Some(index)
} else {
None
}
},
)
}

async fn set_node_unusable(
providers: &Providers,
chain_id: &u64,
node_name: &str,
print_tx: &PrintSender,
) -> bool {
let mut is_replacement_successful = true;
providers.entry(chain_id.clone()).and_modify(|aps| {
let Some(index) = find_index(
&aps.nodes
.iter()
.map(|n| n.kns_update.name.as_str())
.collect(),
&node_name,
) else {
is_replacement_successful = false;
return ();
};
let mut node = aps.nodes.remove(index);
node.usable = false;
aps.nodes.insert(index, node);
});
if !is_replacement_successful {
verbose_print(
print_tx,
&format!("eth: unexpectedly couldn't find provider to be modified"),
)
.await;
}
is_replacement_successful
}
79 changes: 70 additions & 9 deletions kinode/src/eth/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,21 @@ async fn build_subscription(
else {
return Err(EthError::PermissionDenied); // will never hit
};
let Some(mut aps) = providers.get_mut(&chain_id) else {
return Err(EthError::NoRpcForChain);
let mut urls = {
// in code block to drop providers lock asap to avoid deadlock
let Some(aps) = providers.get(&chain_id) else {
return Err(EthError::NoRpcForChain);
};
aps.urls.clone()
};
let chain_id = chain_id.clone();

// first, try any url providers we have for this chain,
// then if we have none or they all fail, go to node providers.
// finally, if no provider works, return an error.

// bump the successful provider to the front of the list for future requests
for (index, url_provider) in aps.urls.iter_mut().enumerate() {
for url_provider in urls.iter_mut() {
let pubsub = match &url_provider.pubsub {
Some(pubsub) => pubsub,
None => {
Expand Down Expand Up @@ -217,8 +223,25 @@ async fn build_subscription(
{
Ok(sub) => {
let rx = sub.into_raw();
let successful_provider = aps.urls.remove(index);
aps.urls.insert(0, successful_provider);
let mut is_replacement_successful = true;
providers.entry(chain_id).and_modify(|aps| {
let Some(index) = find_index(
&aps.urls.iter().map(|u| u.url.as_str()).collect(),
&url_provider.url,
) else {
is_replacement_successful = false;
return ();
};
aps.urls.remove(index);
aps.urls.insert(0, url_provider.clone());
});
if !is_replacement_successful {
verbose_print(
print_tx,
&format!("eth: unexpectedly couldn't find provider to be modified"),
)
.await;
}
return Ok(Ok(rx));
}
Err(rpc_error) => {
Expand All @@ -231,7 +254,26 @@ async fn build_subscription(
)
.await;
// this provider failed and needs to be reset
url_provider.pubsub = None;
let mut is_reset_successful = true;
providers.entry(chain_id).and_modify(|aps| {
let Some(index) = find_index(
&aps.urls.iter().map(|u| u.url.as_str()).collect(),
&url_provider.url,
) else {
is_reset_successful = false;
return ();
};
let mut url = aps.urls.remove(index);
url.pubsub = None;
aps.urls.insert(index, url);
});
if !is_reset_successful {
verbose_print(
print_tx,
&format!("eth: unexpectedly couldn't find provider to be modified"),
)
.await;
}
}
}
}
Expand All @@ -241,7 +283,14 @@ async fn build_subscription(
// we need to create our own unique sub id because in the remote provider node,
// all subs will be identified under our process address.
let remote_sub_id = rand::random();
for node_provider in &mut aps.nodes {
let nodes = {
// in code block to drop providers lock asap to avoid deadlock
let Some(aps) = providers.get(&chain_id) else {
return Err(EthError::NoRpcForChain);
};
aps.nodes.clone()
};
for node_provider in &nodes {
verbose_print(
&print_tx,
&format!(
Expand Down Expand Up @@ -283,11 +332,23 @@ async fn build_subscription(
}
EthResponse::Response { .. } => {
// the response to a SubscribeLogs request must be an 'ok'
node_provider.usable = false;
set_node_unusable(
&providers,
&chain_id,
&node_provider.kns_update.name,
print_tx,
)
.await;
}
EthResponse::Err(e) => {
if let EthError::RpcMalformedResponse = e {
node_provider.usable = false;
set_node_unusable(
&providers,
&chain_id,
&node_provider.kns_update.name,
print_tx,
)
.await;
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions kinode/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,14 @@ async fn handle_request(
.query_map(rusqlite::params_from_iter(parameters.iter()), |row| {
let mut map = HashMap::new();
for (i, column_name) in column_names.iter().enumerate() {
let value: SqlValue = row.get(i)?;
let value: Option<SqlValue> = row.get(i)?;
let value_json = match value {
SqlValue::Integer(int) => serde_json::Value::Number(int.into()),
SqlValue::Real(real) => serde_json::Value::Number(
Some(SqlValue::Integer(int)) => serde_json::Value::Number(int.into()),
Some(SqlValue::Real(real)) => serde_json::Value::Number(
serde_json::Number::from_f64(real).unwrap(),
),
SqlValue::Text(text) => serde_json::Value::String(text),
SqlValue::Blob(blob) => {
Some(SqlValue::Text(text)) => serde_json::Value::String(text),
Some(SqlValue::Blob(blob)) => {
serde_json::Value::String(base64_standard.encode(blob))
} // or another representation if you prefer
_ => serde_json::Value::Null,
Expand Down
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "lib"
authors = ["KinodeDAO"]
version = "0.8.5"
version = "0.8.6"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"
Expand Down