diff --git a/Cargo.lock b/Cargo.lock index a695cf9c5..da4ff72a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3203,7 +3203,7 @@ dependencies = [ [[package]] name = "kinode" -version = "0.8.6" +version = "0.8.7" dependencies = [ "aes-gcm", "alloy", @@ -3259,7 +3259,7 @@ dependencies = [ [[package]] name = "kinode_lib" -version = "0.8.6" +version = "0.8.7" dependencies = [ "lib", ] @@ -3376,7 +3376,7 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "lib" -version = "0.8.6" +version = "0.8.7" dependencies = [ "alloy", "kit", diff --git a/Cargo.toml b/Cargo.toml index e37cf88ea..f4bb93eef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kinode_lib" authors = ["KinodeDAO"] -version = "0.8.6" +version = "0.8.7" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org" diff --git a/Dockerfile b/Dockerfile index fd6d6b756..2f1f5822d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,12 +1,12 @@ FROM debian:12-slim AS downloader +ARG VERSION WORKDIR /tmp/download RUN apt-get update -RUN apt-get install wget curl openssl jq unzip -y +RUN apt-get install unzip -y -ADD https://api.github.com/repos/kinode-dao/kinode/releases releases.json -RUN wget "https://github.com/kinode-dao/kinode/releases/download/$(cat releases.json | jq -r '.[0].tag_name')/kinode-x86_64-unknown-linux-gnu.zip" +ADD "https://github.com/kinode-dao/kinode/releases/download/${VERSION}/kinode-x86_64-unknown-linux-gnu.zip" kinode-x86_64-unknown-linux-gnu.zip RUN unzip kinode-x86_64-unknown-linux-gnu.zip FROM debian:12-slim diff --git a/README.md b/README.md index b7311aa4a..68c4645d7 100644 --- a/README.md +++ b/README.md @@ -163,15 +163,17 @@ The image includes EXPOSE directives for TCP port `8080` and TCP port `9000`. Po If you are running a direct node, you must map port `9000` to the same port on the host and on your router. Otherwise, your Kinode will not be able to connect to the rest of the network as connection info is written to the chain, and this information is based on the view from inside the Docker container. To build a local Docker image, run the following command in this project root. -``` -docker build -t 0xlynett/kinode . +```bash +# The `VERSION` may be replaced with the tag of a GitHub release +docker build -t 0xlynett/kinode . --build-arg VERSION=v0.8.6 ``` For example: -``` + +```bash docker volume create kinode-volume docker run -d -p 8080:8080 -it --name my-kinode \ --mount type=volume,source=kinode-volume,destination=/kinode-home \ 0xlynett/kinode -``` +``` \ No newline at end of file diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index c0f5ee856..5238cca40 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kinode" authors = ["KinodeDAO"] -version = "0.8.6" +version = "0.8.7" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org" diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 53d640aad..cec64a51a 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -322,6 +322,7 @@ async fn handle_message( ) .await; } + Ok(()) } Message::Request(req) => { let timeout = req.expects_response.unwrap_or(60); @@ -330,7 +331,7 @@ async fn handle_message( }; match req { IncomingReq::EthAction(eth_action) => { - return handle_eth_action(state, km, timeout, eth_action).await; + handle_eth_action(state, km, timeout, eth_action).await } IncomingReq::EthConfigAction(eth_config_action) => { kernel_message( @@ -344,29 +345,47 @@ async fn handle_message( &state.send_to_loop, ) .await; + Ok(()) } IncomingReq::EthSubResult(eth_sub_result) => { // forward this to rsvp, if we have the sub id in our active subs let Some(rsvp) = km.rsvp else { + verbose_print( + &state.print_tx, + "eth: got eth_sub_result with no rsvp, ignoring", + ) + .await; return Ok(()); // no rsvp, no need to forward }; let sub_id = match eth_sub_result { Ok(EthSub { id, .. }) => id, Err(EthSubError { id, .. }) => id, }; - if let Some(sub_map) = state.active_subscriptions.get(&rsvp) { - if let Some(ActiveSub::Remote { - provider_node, - sender, - .. - }) = sub_map.get(&sub_id) - { - if provider_node == &km.source.node { - if let Ok(()) = sender.send(eth_sub_result).await { - // successfully sent a subscription update from a - // remote provider to one of our processes - return Ok(()); + if let Some(mut sub_map) = state.active_subscriptions.get_mut(&rsvp) { + if let Some(sub) = sub_map.get(&sub_id) { + if let ActiveSub::Remote { + provider_node, + sender, + .. + } = sub + { + if provider_node == &km.source.node { + if let Ok(()) = sender.send(eth_sub_result).await { + // successfully sent a subscription update from a + // remote provider to one of our processes + return Ok(()); + } } + // failed to send subscription update to process, + // unsubscribe from provider and close + verbose_print( + &state.print_tx, + "eth: got eth_sub_result but provider node did not match or local sub was already closed", + ) + .await; + sub.close(sub_id, state).await; + sub_map.remove(&sub_id); + return Ok(()); } } } @@ -374,13 +393,16 @@ async fn handle_message( // so they can stop sending us updates verbose_print( &state.print_tx, - "eth: got eth_sub_result but no matching sub found, unsubscribing", + &format!( + "eth: got eth_sub_result but no matching sub {} found, unsubscribing", + sub_id + ), ) .await; kernel_message( - &state.our.clone(), + &state.our, km.id, - km.source.clone(), + km.source, None, true, None, @@ -388,6 +410,7 @@ async fn handle_message( &state.send_to_loop, ) .await; + Ok(()) } IncomingReq::SubKeepalive(sub_id) => { // source expects that we have a local sub for them with this id @@ -420,11 +443,11 @@ async fn handle_message( &state.send_to_loop, ) .await; + Ok(()) } } } } - Ok(()) } async fn handle_eth_action( @@ -479,12 +502,32 @@ async fn handle_eth_action( .await; } EthAction::UnsubscribeLogs(sub_id) => { - let mut sub_map = state - .active_subscriptions - .entry(km.source.clone()) - .or_insert(HashMap::new()); + let Some(mut sub_map) = state.active_subscriptions.get_mut(&km.source) else { + verbose_print( + &state.print_tx, + &format!( + "eth: got unsubscribe from {} but no subscription found", + km.source + ), + ) + .await; + error_message( + &state.our, + km.id, + km.source, + EthError::MalformedRequest, + &state.send_to_loop, + ) + .await; + return Ok(()); + }; if let Some(sub) = sub_map.remove(&sub_id) { sub.close(sub_id, state).await; + verbose_print( + &state.print_tx, + &format!("eth: closed subscription {} for {}", sub_id, km.source.node), + ) + .await; kernel_message( &state.our, km.id, @@ -499,7 +542,10 @@ async fn handle_eth_action( } else { verbose_print( &state.print_tx, - "eth: got unsubscribe but no matching subscription found", + &format!( + "eth: got unsubscribe from {} but no subscription {} found", + km.source, sub_id + ), ) .await; error_message( @@ -626,8 +672,11 @@ async fn fulfill_request( is_replacement_successful = false; return (); }; - aps.urls.remove(index); - aps.urls.insert(0, url_provider.clone()); + let old_provider = aps.urls.remove(index); + match old_provider.pubsub { + None => aps.urls.insert(0, url_provider.clone()), + Some(_) => aps.urls.insert(0, old_provider), + } }); if !is_replacement_successful { verbose_print( diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 3c7e37973..21e78021c 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -113,7 +113,6 @@ pub async fn create_new_subscription( let (keepalive_err_sender, keepalive_err_receiver) = tokio::sync::mpsc::channel(1); response_channels.insert(keepalive_km_id, keepalive_err_sender); - let response_channels = response_channels.clone(); subs.insert( remote_sub_id, ActiveSub::Remote { @@ -232,8 +231,11 @@ async fn build_subscription( is_replacement_successful = false; return (); }; - aps.urls.remove(index); - aps.urls.insert(0, url_provider.clone()); + let old_provider = aps.urls.remove(index); + match old_provider.pubsub { + None => aps.urls.insert(0, url_provider.clone()), + Some(_) => aps.urls.insert(0, old_provider), + } }); if !is_replacement_successful { verbose_print( @@ -468,7 +470,7 @@ async fn maintain_remote_subscription( true, Some(30), IncomingReq::SubKeepalive(remote_sub_id), - &send_to_loop, + send_to_loop, ).await; } _incoming = net_error_rx.recv() => { @@ -485,6 +487,23 @@ async fn maintain_remote_subscription( } } }; + // tell provider node we don't need their services anymore + // (in case they did not close the subscription on their side, + // such as in the 2-hour timeout case) + kernel_message( + our, + rand::random(), + Address { + node: provider_node.to_string(), + process: ETH_PROCESS_ID.clone(), + }, + None, + true, + None, + EthAction::UnsubscribeLogs(remote_sub_id), + send_to_loop, + ) + .await; active_subscriptions .entry(target.clone()) .and_modify(|sub_map| { diff --git a/lib/Cargo.toml b/lib/Cargo.toml index b79fc8a7b..909693c2c 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lib" authors = ["KinodeDAO"] -version = "0.8.6" +version = "0.8.7" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org"