diff --git a/Cargo.lock b/Cargo.lock index bd1476831..e90bd38a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,21 +97,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ba1c79677c9ce51c8d45e20845b05e6fb070ea2c863fba03ad6af2c778474bd" dependencies = [ "alloy-consensus 0.1.4", - "alloy-contract", "alloy-core", "alloy-eips 0.1.4", "alloy-genesis 0.1.4", "alloy-json-rpc 0.1.4", - "alloy-network", - "alloy-provider", - "alloy-pubsub", - "alloy-rpc-client", + "alloy-provider 0.1.4", + "alloy-rpc-client 0.1.4", "alloy-rpc-types 0.1.4", "alloy-serde 0.1.4", - "alloy-signer", + "alloy-transport-http 0.1.4", +] + +[[package]] +name = "alloy" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-consensus 0.2.1", + "alloy-contract", + "alloy-core", + "alloy-eips 0.2.1", + "alloy-genesis 0.2.1", + "alloy-json-rpc 0.2.1", + "alloy-network 0.2.1", + "alloy-provider 0.2.1", + "alloy-pubsub", + "alloy-rpc-client 0.2.1", + "alloy-rpc-types 0.2.1", + "alloy-serde 0.2.1", + "alloy-signer 0.2.1", "alloy-signer-local", - "alloy-transport 0.1.4", - "alloy-transport-http", + "alloy-transport 0.2.1", + "alloy-transport-http 0.2.1", "alloy-transport-ws", ] @@ -153,21 +170,34 @@ dependencies = [ "serde", ] +[[package]] +name = "alloy-consensus" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-eips 0.2.1", + "alloy-primitives", + "alloy-rlp", + "alloy-serde 0.2.1", + "c-kzg", + "serde", +] + [[package]] name = "alloy-contract" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dc6957ff706f9e5f6fd42f52a93e4bce476b726c92d077b348de28c4a76730c" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", - "alloy-network", + "alloy-network 0.2.1", + "alloy-network-primitives", "alloy-primitives", - "alloy-provider", + "alloy-provider 0.2.1", "alloy-pubsub", - "alloy-rpc-types-eth", + "alloy-rpc-types-eth 0.2.1", "alloy-sol-types", - "alloy-transport 0.1.4", + "alloy-transport 0.2.1", "futures", "futures-util", "thiserror", @@ -230,6 +260,20 @@ dependencies = [ "sha2", ] +[[package]] +name = "alloy-eips" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-primitives", + "alloy-rlp", + "alloy-serde 0.2.1", + "c-kzg", + "once_cell", + "serde", + "sha2", +] + [[package]] name = "alloy-genesis" version = "0.1.0" @@ -251,6 +295,16 @@ dependencies = [ "serde", ] +[[package]] +name = "alloy-genesis" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-primitives", + "alloy-serde 0.2.1", + "serde", +] + [[package]] name = "alloy-json-abi" version = "0.7.7" @@ -288,6 +342,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "alloy-json-rpc" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-primitives", + "alloy-sol-types", + "serde", + "serde_json", + "thiserror", + "tracing", +] + [[package]] name = "alloy-network" version = "0.1.4" @@ -298,9 +365,9 @@ dependencies = [ "alloy-eips 0.1.4", "alloy-json-rpc 0.1.4", "alloy-primitives", - "alloy-rpc-types-eth", + "alloy-rpc-types-eth 0.1.4", "alloy-serde 0.1.4", - "alloy-signer", + "alloy-signer 0.1.4", "alloy-sol-types", "async-trait", "auto_impl", @@ -308,6 +375,36 @@ dependencies = [ "thiserror", ] +[[package]] +name = "alloy-network" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-consensus 0.2.1", + "alloy-eips 0.2.1", + "alloy-json-rpc 0.2.1", + "alloy-network-primitives", + "alloy-primitives", + "alloy-rpc-types-eth 0.2.1", + "alloy-serde 0.2.1", + "alloy-signer 0.2.1", + "alloy-sol-types", + "async-trait", + "auto_impl", + "futures-utils-wasm", + "thiserror", +] + +[[package]] +name = "alloy-network-primitives" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-primitives", + "alloy-serde 0.2.1", + "serde", +] + [[package]] name = "alloy-primitives" version = "0.7.7" @@ -340,18 +437,50 @@ dependencies = [ "alloy-consensus 0.1.4", "alloy-eips 0.1.4", "alloy-json-rpc 0.1.4", - "alloy-network", + "alloy-network 0.1.4", "alloy-primitives", - "alloy-pubsub", - "alloy-rpc-client", - "alloy-rpc-types-eth", + "alloy-rpc-client 0.1.4", + "alloy-rpc-types-eth 0.1.4", "alloy-transport 0.1.4", - "alloy-transport-http", + "alloy-transport-http 0.1.4", + "async-stream", + "async-trait", + "auto_impl", + "dashmap 5.5.3", + "futures", + "futures-utils-wasm", + "lru", + "pin-project", + "reqwest 0.12.5", + "serde", + "serde_json", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "alloy-provider" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-chains", + "alloy-consensus 0.2.1", + "alloy-eips 0.2.1", + "alloy-json-rpc 0.2.1", + "alloy-network 0.2.1", + "alloy-network-primitives", + "alloy-primitives", + "alloy-pubsub", + "alloy-rpc-client 0.2.1", + "alloy-rpc-types-eth 0.2.1", + "alloy-transport 0.2.1", + "alloy-transport-http 0.2.1", "alloy-transport-ws", "async-stream", "async-trait", "auto_impl", - "dashmap", + "dashmap 6.0.1", "futures", "futures-utils-wasm", "lru", @@ -359,6 +488,7 @@ dependencies = [ "reqwest 0.12.5", "serde", "serde_json", + "thiserror", "tokio", "tracing", "url", @@ -366,13 +496,12 @@ dependencies = [ [[package]] name = "alloy-pubsub" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a7341322d9bc0e49f6e9fd9f2eb8e30f73806f2dd12cbb3d6bab2694c921f87" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ - "alloy-json-rpc 0.1.4", + "alloy-json-rpc 0.2.1", "alloy-primitives", - "alloy-transport 0.1.4", + "alloy-transport 0.2.1", "bimap", "futures", "serde", @@ -412,10 +541,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ba31bae67773fd5a60020bea900231f8396202b7feca4d0c70c6b59308ab4a8" dependencies = [ "alloy-json-rpc 0.1.4", + "alloy-transport 0.1.4", + "alloy-transport-http 0.1.4", + "futures", + "pin-project", + "reqwest 0.12.5", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tower", + "tracing", + "url", +] + +[[package]] +name = "alloy-rpc-client" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-json-rpc 0.2.1", "alloy-primitives", "alloy-pubsub", - "alloy-transport 0.1.4", - "alloy-transport-http", + "alloy-transport 0.2.1", + "alloy-transport-http 0.2.1", "alloy-transport-ws", "futures", "pin-project", @@ -453,10 +602,20 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "184a7a42c7ba9141cc9e76368356168c282c3bc3d9e5d78f3556bdfe39343447" dependencies = [ - "alloy-rpc-types-eth", + "alloy-rpc-types-eth 0.1.4", "alloy-serde 0.1.4", ] +[[package]] +name = "alloy-rpc-types" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-rpc-types-eth 0.2.1", + "alloy-serde 0.2.1", + "serde", +] + [[package]] name = "alloy-rpc-types-eth" version = "0.1.4" @@ -475,6 +634,24 @@ dependencies = [ "thiserror", ] +[[package]] +name = "alloy-rpc-types-eth" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-consensus 0.2.1", + "alloy-eips 0.2.1", + "alloy-network-primitives", + "alloy-primitives", + "alloy-rlp", + "alloy-serde 0.2.1", + "alloy-sol-types", + "itertools 0.13.0", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "alloy-serde" version = "0.1.0" @@ -496,6 +673,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "alloy-serde" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-primitives", + "serde", + "serde_json", +] + [[package]] name = "alloy-signer" version = "0.1.4" @@ -510,16 +697,28 @@ dependencies = [ "thiserror", ] +[[package]] +name = "alloy-signer" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-primitives", + "async-trait", + "auto_impl", + "elliptic-curve", + "k256", + "thiserror", +] + [[package]] name = "alloy-signer-local" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dfc9c26fe6c6f1bad818c9a976de9044dd12e1f75f1f156a801ee3e8148c1b6" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ - "alloy-consensus 0.1.4", - "alloy-network", + "alloy-consensus 0.2.1", + "alloy-network 0.2.1", "alloy-primitives", - "alloy-signer", + "alloy-signer 0.2.1", "async-trait", "k256", "rand 0.8.5", @@ -636,6 +835,24 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-transport" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-json-rpc 0.2.1", + "base64 0.22.1", + "futures-util", + "futures-utils-wasm", + "serde", + "serde_json", + "thiserror", + "tokio", + "tower", + "tracing", + "url", +] + [[package]] name = "alloy-transport-http" version = "0.1.4" @@ -651,14 +868,27 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-transport-http" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" +dependencies = [ + "alloy-json-rpc 0.2.1", + "alloy-transport 0.2.1", + "reqwest 0.12.5", + "serde_json", + "tower", + "tracing", + "url", +] + [[package]] name = "alloy-transport-ws" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aec83fd052684556c78c54df111433493267234d82321c2236560c752f595f20" +version = "0.2.1" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-pubsub", - "alloy-transport 0.1.4", + "alloy-transport 0.2.1", "futures", "http 1.1.0", "rustls", @@ -1878,6 +2108,20 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "dashmap" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -3282,10 +3526,10 @@ dependencies = [ [[package]] name = "kinode" -version = "0.9.0" +version = "0.9.1" dependencies = [ "aes-gcm", - "alloy", + "alloy 0.2.1", "alloy-primitives", "alloy-sol-macro", "alloy-sol-types", @@ -3296,7 +3540,7 @@ dependencies = [ "chrono", "clap", "crossterm", - "dashmap", + "dashmap 5.5.3", "flate2", "futures", "generic-array", @@ -3340,7 +3584,7 @@ dependencies = [ [[package]] name = "kinode_lib" -version = "0.9.0" +version = "0.9.1" dependencies = [ "lib", ] @@ -3372,7 +3616,7 @@ name = "kinode_process_lib" version = "0.9.0" source = "git+https://github.com/kinode-dao/process_lib?tag=v0.9.0#284f202376b3cd3ce0c03aa660a006fc6187f236" dependencies = [ - "alloy", + "alloy 0.1.4", "alloy-primitives", "alloy-sol-macro", "alloy-sol-types", @@ -3394,7 +3638,7 @@ name = "kinode_process_lib" version = "0.9.0" source = "git+https://github.com/kinode-dao/process_lib?branch=develop#5c1d8ed36cf10688808c09357ef0e43225396097" dependencies = [ - "alloy", + "alloy 0.1.4", "alloy-primitives", "alloy-sol-macro", "alloy-sol-types", @@ -3511,9 +3755,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "lib" -version = "0.9.0" +version = "0.9.1" dependencies = [ - "alloy", + "alloy 0.2.1", "kit 0.6.8", "lazy_static", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index 1c389fc58..0ce8d435e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kinode_lib" authors = ["KinodeDAO"] -version = "0.9.0" +version = "0.9.1" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org" diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index c0e15786e..eb8d02727 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kinode" authors = ["KinodeDAO"] -version = "0.9.0" +version = "0.9.1" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org" @@ -26,7 +26,7 @@ simulation-mode = [] [dependencies] aes-gcm = "0.10.3" -alloy = { version = "0.1.3", features = [ +alloy = { git = "https://github.com/kinode-dao/alloy.git", rev = "e672f3e", features = [ "consensus", "contract", "json-rpc", diff --git a/kinode/packages/app_store/app_store/src/http_api.rs b/kinode/packages/app_store/app_store/src/http_api.rs index 321e9f72a..98ea77a30 100644 --- a/kinode/packages/app_store/app_store/src/http_api.rs +++ b/kinode/packages/app_store/app_store/src/http_api.rs @@ -8,7 +8,7 @@ use crate::{ use kinode_process_lib::{ http::{self, server, Method, StatusCode}, - println, Address, LazyLoadBlob, PackageId, Request, + Address, LazyLoadBlob, PackageId, Request, }; use kinode_process_lib::{SendError, SendErrorKind}; use serde_json::json; @@ -226,13 +226,6 @@ fn get_package_id(url_params: &HashMap) -> anyhow::Result) -> anyhow::Result { - let Some(version_hash) = url_params.get("version_hash") else { - return Err(anyhow::anyhow!("Missing version_hash")); - }; - Ok(version_hash.to_string()) -} - fn gen_package_info(id: &PackageId, state: &PackageState) -> serde_json::Value { // installed package info json!({ diff --git a/kinode/packages/app_store/app_store/src/state.rs b/kinode/packages/app_store/app_store/src/state.rs index 23619a026..5027c7a4b 100644 --- a/kinode/packages/app_store/app_store/src/state.rs +++ b/kinode/packages/app_store/app_store/src/state.rs @@ -1,5 +1,5 @@ use crate::{utils, VFS_TIMEOUT}; -use kinode_process_lib::{kimap, println, vfs, PackageId}; +use kinode_process_lib::{kimap, vfs, PackageId}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; diff --git a/kinode/packages/app_store/app_store/src/utils.rs b/kinode/packages/app_store/app_store/src/utils.rs index 6bf4bf200..e04e42b6d 100644 --- a/kinode/packages/app_store/app_store/src/utils.rs +++ b/kinode/packages/app_store/app_store/src/utils.rs @@ -11,10 +11,7 @@ use { get_blob, kernel_types as kt, println, vfs, Address, LazyLoadBlob, PackageId, ProcessId, Request, }, - std::{ - collections::{HashMap, HashSet}, - str::FromStr, - }, + std::collections::{HashMap, HashSet}, }; // quite annoyingly, we must convert from our gen'd version of PackageId diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index 61d0b1d42..0d596c543 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -12,7 +12,7 @@ use alloy_sol_types::SolEvent; use kinode::process::chain::ChainResponses; use kinode_process_lib::{ await_message, call_init, eth, get_blob, get_state, http, kernel_types as kt, kimap, - print_to_terminal, println, Address, Message, PackageId, Request, Response, + print_to_terminal, println, timer, Address, Message, PackageId, Request, Response, }; use std::{ collections::{HashMap, HashSet}, @@ -40,10 +40,7 @@ const KIMAP_ADDRESS: &'static str = kimap::KIMAP_ADDRESS; // optimism #[cfg(feature = "simulation-mode")] const KIMAP_ADDRESS: &str = "0xcA92476B2483aBD5D82AEBF0b56701Bb2e9be658"; -#[cfg(not(feature = "simulation-mode"))] -const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK; -#[cfg(feature = "simulation-mode")] -const KIMAP_FIRST_BLOCK: u64 = 1; +const DELAY_MS: u64 = 1_000; // 1s #[derive(Debug, Serialize, Deserialize)] pub struct State { @@ -106,7 +103,18 @@ fn init(our: Address) { } fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> { - if message.is_request() { + if !message.is_request() { + if message.is_local(&our) && message.source().process == "timer:distro:sys" { + // handling of ETH RPC subscriptions delayed by DELAY_MS + // to allow kns to have a chance to process block: handle now + let Some(context) = message.context() else { + return Err(anyhow::anyhow!("foo")); + }; + let log = serde_json::from_slice(context)?; + handle_eth_log(our, state, log)?; + return Ok(()); + } + } else { let req: Req = serde_json::from_slice(message.body())?; match req { Req::Eth(eth_result) => { @@ -118,8 +126,10 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow } if let Ok(eth::EthSub { result, .. }) = eth_result { - if let eth::SubscriptionResult::Log(log) = result { - handle_eth_log(our, state, *log)?; + if let eth::SubscriptionResult::Log(ref log) = result { + // delay handling of ETH RPC subscriptions by DELAY_MS + // to allow kns to have a chance to process block + timer::set_timer(DELAY_MS, Some(serde_json::to_vec(log)?)); } } else { // attempt to resubscribe @@ -130,21 +140,15 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow } } Req::Request(chains) => { - handle_local_request(our, state, chains)?; + handle_local_request(state, chains)?; } } - } else { - return Err(anyhow::anyhow!("not a request")); } Ok(()) } -fn handle_local_request( - our: &Address, - state: &mut State, - req: ChainRequests, -) -> anyhow::Result<()> { +fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result<()> { match req { ChainRequests::GetApp(package_id) => { let onchain_app = state @@ -244,7 +248,6 @@ fn handle_eth_log(our: &Address, state: &mut State, log: eth::Log) -> anyhow::Re // the app store exclusively looks for ~metadata-uri postings: if one is // observed, we then *query* for ~metadata-hash to verify the content // at the URI. - // let metadata_uri = String::from_utf8_lossy(¬e.data).to_string(); let is_our_package = &package_id.publisher() == &our.node(); @@ -254,7 +257,21 @@ fn handle_eth_log(our: &Address, state: &mut State, log: eth::Log) -> anyhow::Re let hash_note = format!("~metadata-hash.{}", note.parent_path); // owner can change which we don't track (yet?) so don't save, need to get when desired - let (tba, _owner, data) = state.kimap.get(&hash_note).map_err(|e| { + let (tba, _owner, data) = match state.kimap.get(&hash_note) { + Ok(gr) => Ok(gr), + Err(e) => match e { + eth::EthError::RpcError(_) => { + // retry on RpcError after DELAY_MS sleep + // sleep here rather than with, e.g., a message to + // `timer:distro:sys` so that events are processed in + // order of receipt + std::thread::sleep(std::time::Duration::from_millis(DELAY_MS)); + state.kimap.get(&hash_note) + } + _ => Err(e), + }, + } + .map_err(|e| { println!("Couldn't find {hash_note}: {e:?}"); anyhow::anyhow!("metadata hash mismatch") })?; @@ -264,9 +281,7 @@ fn handle_eth_log(our: &Address, state: &mut State, log: eth::Log) -> anyhow::Re // if ~metadata-uri is also empty, this is an unpublish action! if metadata_uri.is_empty() { state.published.remove(&package_id); - if is_our_package { - state.listings.remove(&package_id); - } + state.listings.remove(&package_id); return Ok(()); } return Err(anyhow::anyhow!("metadata hash not found")); @@ -341,6 +356,7 @@ pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State) { let filter = app_store_filter(state); // get past logs, subscribe to new ones. // subscribe first so we don't miss any logs + println!("subscribing..."); state.kimap.provider.subscribe_loop(1, filter.clone()); for log in fetch_logs( &state.kimap.provider, diff --git a/kinode/packages/app_store/download/src/lib.rs b/kinode/packages/app_store/download/src/lib.rs index cba03b02e..a777f5f6f 100644 --- a/kinode/packages/app_store/download/src/lib.rs +++ b/kinode/packages/app_store/download/src/lib.rs @@ -62,7 +62,7 @@ fn init(our: Address) { }; match response { - DownloadResponses::Error(e) => { + DownloadResponses::Error(_e) => { println!("download: error"); } DownloadResponses::Success => { diff --git a/kinode/packages/app_store/downloads/src/lib.rs b/kinode/packages/app_store/downloads/src/lib.rs index e6f63f4e5..4048bede4 100644 --- a/kinode/packages/app_store/downloads/src/lib.rs +++ b/kinode/packages/app_store/downloads/src/lib.rs @@ -7,17 +7,13 @@ use crate::kinode::process::downloads::{ DownloadResponses, Entry, FileEntry, HashMismatch, LocalDownloadRequest, RemoteDownloadRequest, RemoveFileRequest, }; -use std::{ - collections::{HashMap, HashSet}, - io::Read, - str::FromStr, -}; +use std::{collections::HashSet, io::Read, str::FromStr}; use ft_worker_lib::{spawn_receive_transfer, spawn_send_transfer}; use kinode_process_lib::{ await_message, call_init, get_blob, get_state, http::client, - kernel_types as kt, print_to_terminal, println, set_state, + print_to_terminal, println, set_state, vfs::{self, Directory, File}, Address, Message, PackageId, ProcessId, Request, Response, }; @@ -113,7 +109,7 @@ fn handle_message( state: &mut State, message: &Message, downloads: &mut Directory, - tmp: &mut Directory, + _tmp: &mut Directory, auto_updates: &mut HashSet<(PackageId, String)>, ) -> anyhow::Result<()> { if message.is_request() { diff --git a/kinode/packages/app_store/pkg/manifest.json b/kinode/packages/app_store/pkg/manifest.json index c16430201..315d9641f 100644 --- a/kinode/packages/app_store/pkg/manifest.json +++ b/kinode/packages/app_store/pkg/manifest.json @@ -49,7 +49,8 @@ "kns_indexer:kns_indexer:sys", "vfs:distro:sys", "http_client:distro:sys", - "eth:distro:sys" + "eth:distro:sys", + "timer:distro:sys" ], "public": false }, @@ -98,4 +99,4 @@ ], "public": false } -] \ No newline at end of file +] diff --git a/kinode/packages/app_store/ui/package-lock.json b/kinode/packages/app_store/ui/package-lock.json index 54d875285..f65b58a3e 100644 --- a/kinode/packages/app_store/ui/package-lock.json +++ b/kinode/packages/app_store/ui/package-lock.json @@ -3749,6 +3749,8 @@ }, "node_modules/@parcel/watcher-wasm/node_modules/napi-wasm": { "version": "1.1.0", + "resolved": "https://registry.npmjs.org/napi-wasm/-/napi-wasm-1.1.0.tgz", + "integrity": "sha512-lHwIAJbmLSjF9VDRm9GoVOy9AGp3aIvkjv+Kvz9h16QR3uSVYH78PNQUnT2U4X53mhlnV2M7wrhibQ3GHicDmg==", "inBundle": true, "license": "MIT" }, diff --git a/kinode/packages/app_store/ui/src/pages/PublishPage.tsx b/kinode/packages/app_store/ui/src/pages/PublishPage.tsx index 2692c5372..bdd28c25e 100644 --- a/kinode/packages/app_store/ui/src/pages/PublishPage.tsx +++ b/kinode/packages/app_store/ui/src/pages/PublishPage.tsx @@ -150,11 +150,12 @@ export default function PublishPage() { address: tba as `0x${string}`, functionName: 'execute', args: [ - KIMAP, + MULTICALL, BigInt(0), multicall, 1 - ] + ], + gas: BigInt(1000000), }); } catch (error) { diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 5e14c5475..4b93a38b3 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -4,7 +4,7 @@ use crate::kinode::process::kns_indexer::{ use alloy_primitives::keccak256; use alloy_sol_types::SolEvent; use kinode_process_lib::{ - await_message, call_init, eth, kimap, net, print_to_terminal, println, Address, Message, + await_message, call_init, eth, kimap, net, print_to_terminal, println, timer, Address, Message, Request, Response, }; use serde::{Deserialize, Serialize}; @@ -36,6 +36,8 @@ const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK; // optimism const KIMAP_FIRST_BLOCK: u64 = 1; // local const MAX_PENDING_ATTEMPTS: u8 = 3; +const SUBSCRIPTION_TIMEOUT: u64 = 60; +const DELAY_MS: u64 = 1_000; // 1s #[derive(Clone, Debug, Serialize, Deserialize)] struct State { @@ -113,7 +115,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // 60s timeout -- these calls can take a long time // if they do time out, we try them again - let eth_provider: eth::Provider = eth::Provider::new(state.chain_id, 60); + let eth_provider: eth::Provider = eth::Provider::new(state.chain_id, SUBSCRIPTION_TIMEOUT); print_to_terminal( 1, @@ -128,16 +130,17 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { println!("subscribing to new logs..."); eth_provider.subscribe_loop(1, mints_filter.clone()); eth_provider.subscribe_loop(2, notes_filter.clone()); - listen_to_new_blocks(); // sub_id: 3 - - // if block in state is < current_block, get logs from that part. - println!("syncing old logs..."); // if subscription results come back in the wrong order, we store them here // until the right block is reached. - let mut pending_requests: BTreeMap> = BTreeMap::new(); + + // pending_requests temporarily on timeout. + // very naughty. + // let mut pending_requests: BTreeMap> = BTreeMap::new(); let mut pending_notes: BTreeMap> = BTreeMap::new(); + // if block in state is < current_block, get logs from that part. + println!("syncing old logs..."); fetch_and_process_logs( ð_provider, &mut state, @@ -156,7 +159,20 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { let Ok(message) = await_message() else { continue; }; + // if true, time to go check current block number and handle pending notes. + let tick = message.is_local(&our) && message.source().process == "timer:distro:sys"; let Message::Request { source, body, .. } = message else { + if tick { + handle_eth_message( + &mut state, + ð_provider, + tick, + &mut pending_notes, + &[], + &mints_filter, + ¬es_filter, + )?; + } continue; }; @@ -164,7 +180,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { handle_eth_message( &mut state, ð_provider, - &mut pending_requests, + tick, &mut pending_notes, &body, &mints_filter, @@ -178,45 +194,24 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { ref hash, ref block, }) => { - // make sure we've seen the whole block - if *block < state.last_block { - Response::new() - .body(serde_json::to_vec(&IndexerResponses::Name( - state.names.get(hash).cloned(), - ))?) - .send()?; - } else { - pending_requests - .entry(*block) - .or_insert(vec![]) - .push(request); - } + // TODO: make sure we've seen the whole block, while actually + // sending a response to the proper place. + Response::new() + .body(serde_json::to_vec(&IndexerResponses::Name( + state.names.get(hash).cloned(), + ))?) + .send()?; } + IndexerRequests::NodeInfo(NodeInfoRequest { ref name, block }) => { - // make sure we've seen the whole block - if block < state.last_block { - Response::new() - .body(serde_json::to_vec(&IndexerResponses::NodeInfo( - state.nodes.get(name).cloned(), - ))?) - .send()?; - } else { - pending_requests - .entry(block) - .or_insert(vec![]) - .push(request); - } + Response::new() + .body(serde_json::to_vec(&IndexerResponses::NodeInfo( + state.nodes.get(name).cloned(), + ))?) + .send()?; } IndexerRequests::GetState(GetStateRequest { block }) => { - // make sure we've seen the whole block - if block < state.last_block { - Response::new().body(serde_json::to_vec(&state)?).send()?; - } else { - pending_requests - .entry(block) - .or_insert(vec![]) - .push(request); - } + Response::new().body(serde_json::to_vec(&state)?).send()?; } } } @@ -226,7 +221,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { fn handle_eth_message( state: &mut State, eth_provider: ð::Provider, - pending_requests: &mut BTreeMap>, + tick: bool, pending_notes: &mut BTreeMap>, body: &[u8], mints_filter: ð::Filter, @@ -238,85 +233,29 @@ fn handle_eth_message( if let Err(e) = handle_log(state, pending_notes, &log) { print_to_terminal(1, &format!("log-handling error! {e:?}")); } - } else if let eth::SubscriptionResult::Header(header) = result { - if let Some(block) = header.number { - // risque.. - // pending_requests/notes are kicked off with block numbers - // that are ahead of state.last_block. can be risky if event subscriptions and newHeads - // are completely out of sync. - state.last_block = block; - } } } Ok(Err(e)) => { - print_to_terminal( - 1, - &format!("got eth subscription error ({e:?}), resubscribing"), - ); + println!("got eth subscription error ({e:?}), resubscribing"); if e.id == 1 { eth_provider.subscribe_loop(1, mints_filter.clone()); } else if e.id == 2 { eth_provider.subscribe_loop(2, notes_filter.clone()); - } else if e.id == 3 { - listen_to_new_blocks(); } } - Err(e) => { - return Err(e.into()); + _ => {} + } + if tick { + let block_number = eth_provider.get_block_number(); + if let Ok(block_number) = block_number { + print_to_terminal(2, &format!("new block: {}", block_number)); + state.last_block = block_number; } } - - handle_pending_requests(state, pending_requests)?; handle_pending_notes(state, pending_notes)?; - Ok(()) -} -fn handle_pending_requests( - state: &mut State, - pending_requests: &mut BTreeMap>, -) -> anyhow::Result<()> { - // check the pending_requests btreemap to see if there are any requests that - // can be handled now that the state block has been updated - if pending_requests.is_empty() { - return Ok(()); - } - let mut blocks_to_remove = vec![]; - for (block, requests) in pending_requests.iter() { - // make sure we've seen the whole block - if *block < state.last_block { - for request in requests.iter() { - match request { - IndexerRequests::NamehashToName(NamehashToNameRequest { hash, .. }) => { - Response::new() - .body(serde_json::to_vec(&IndexerResponses::Name( - state.names.get(hash).cloned(), - ))?) - .send() - .unwrap(); - } - IndexerRequests::NodeInfo(NodeInfoRequest { name, .. }) => { - Response::new() - .body(serde_json::to_vec(&IndexerResponses::NodeInfo( - state.nodes.get(name).cloned(), - ))?) - .send() - .unwrap(); - } - IndexerRequests::GetState(GetStateRequest { .. }) => { - Response::new() - .body(serde_json::to_vec(&state)?) - .send() - .unwrap(); - } - } - } - blocks_to_remove.push(*block); - } else { - break; - } - } - for block in blocks_to_remove.iter() { - pending_requests.remove(block); + if !pending_notes.is_empty() { + timer::set_timer(DELAY_MS, None); } Ok(()) @@ -337,10 +276,7 @@ fn handle_pending_notes( for (note, attempt) in notes.drain(..) { if attempt >= MAX_PENDING_ATTEMPTS { // skip notes that have exceeded max attempts - print_to_terminal( - 1, - &format!("dropping note from block {block} after {attempt} attempts"), - ); + println!("dropping note from block {block} after {attempt} attempts"); continue; } if let Err(e) = handle_note(state, ¬e) { @@ -458,6 +394,10 @@ fn handle_log( pending_notes: &mut BTreeMap>, log: ð::Log, ) -> anyhow::Result<()> { + if let Some(block) = log.block_number { + state.last_block = block; + } + match log.topics()[0] { kimap::contract::Mint::SIGNATURE_HASH => { let decoded = kimap::contract::Mint::decode_log_data(log.data(), true).unwrap(); @@ -493,22 +433,15 @@ fn handle_log( if !kimap::valid_note(¬e) { return Err(anyhow::anyhow!("skipping invalid note: {note}")); } - - if let Err(e) = handle_note(state, &decoded) { - match e.downcast_ref::() { - None => print_to_terminal(1, &format!("note handling error: {e:?}")), - Some(ee) => match ee { - KnsError::NoParentError => { - print_to_terminal(1, &format!("note awaiting mint: place in pending")); - if let Some(block_number) = log.block_number { - pending_notes - .entry(block_number) - .or_default() - .push((decoded, 0)); - } - } - }, - } + if let Some(block_number) = log.block_number { + print_to_terminal( + 1, + &format!("adding note to pending_notes for block {block_number}"), + ); + pending_notes + .entry(block_number) + .or_default() + .push((decoded, 0)); } } _log => { @@ -516,10 +449,6 @@ fn handle_log( } }; - if let Some(block) = log.block_number { - state.last_block = block; - } - Ok(()) } @@ -643,17 +572,3 @@ pub fn bytes_to_port(bytes: &[u8]) -> anyhow::Result { _ => Err(anyhow::anyhow!("Invalid byte length for port")), } } - -fn listen_to_new_blocks() { - let eth_newheads_sub = eth::EthAction::SubscribeLogs { - sub_id: 3, - chain_id: CHAIN_ID, - kind: eth::SubscriptionKind::NewHeads, - params: eth::Params::Bool(false), - }; - - Request::to(("our", "eth", "distro", "sys")) - .body(serde_json::to_vec(ð_newheads_sub).unwrap()) - .send() - .unwrap(); -} diff --git a/kinode/packages/kns_indexer/pkg/manifest.json b/kinode/packages/kns_indexer/pkg/manifest.json index d29393b94..863653260 100644 --- a/kinode/packages/kns_indexer/pkg/manifest.json +++ b/kinode/packages/kns_indexer/pkg/manifest.json @@ -7,11 +7,13 @@ "request_capabilities": [ "eth:distro:sys", "http_server:distro:sys", - "net:distro:sys" + "net:distro:sys", + "timer:distro:sys" ], "grant_capabilities": [ "eth:distro:sys", - "http_server:distro:sys" + "http_server:distro:sys", + "timer:distro:sys" ], "public": false } diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 77812ea2d..bd6802c96 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -100,7 +100,7 @@ type ResponseChannels = Arc>; #[derive(Debug)] enum ActiveSub { - Local(JoinHandle<()>), + Local((tokio::sync::mpsc::Sender, JoinHandle<()>)), Remote { provider_node: String, handle: JoinHandle<()>, @@ -111,8 +111,9 @@ enum ActiveSub { impl ActiveSub { async fn close(&self, sub_id: u64, state: &ModuleState) { match self { - ActiveSub::Local(handle) => { - handle.abort(); + ActiveSub::Local((close_sender, _handle)) => { + close_sender.send(true).await.unwrap(); + //handle.abort(); } ActiveSub::Remote { provider_node, @@ -506,13 +507,19 @@ async fn handle_eth_action( verbose_print( &state.print_tx, &format!( - "eth: handling {} from {}", + "eth: handling {} from {}; active_subs len: {:?}", + //"eth: handling {} from {}", match ð_action { EthAction::SubscribeLogs { .. } => "subscribe", EthAction::UnsubscribeLogs(_) => "unsubscribe", EthAction::Request { .. } => "request", }, - km.source + km.source, + state + .active_subscriptions + .iter() + .map(|v| v.len()) + .collect::>(), ), ) .await; diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 21e78021c..a49cbf972 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -70,40 +70,54 @@ pub async fn create_new_subscription( let send_to_loop = send_to_loop.clone(); let print_tx = print_tx.clone(); let active_subscriptions = active_subscriptions.clone(); + let providers = providers.clone(); + let (close_sender, close_receiver) = tokio::sync::mpsc::channel(1); match maybe_raw_sub { - Ok(rx) => { + Ok((rx, chain_id)) => { subs.insert( sub_id, // this is a local sub, as in, we connect to the rpc endpoint - ActiveSub::Local(tokio::spawn(async move { - // await the subscription error and kill it if so - let e = maintain_local_subscription( - &our, - sub_id, - rx, - &target, - &rsvp, - &send_to_loop, - &active_subscriptions, - ) - .await; - verbose_print( - &print_tx, - &format!("eth: closed local subscription due to error {e:?}"), - ) - .await; - kernel_message( - &our, - rand::random(), - target.clone(), - rsvp, - true, - None, - EthSubResult::Err(e), - &send_to_loop, - ) - .await; - })), + ActiveSub::Local(( + close_sender, + tokio::spawn(async move { + // await the subscription error and kill it if so + let r = maintain_local_subscription( + &our, + sub_id, + rx, + &target, + &rsvp, + &send_to_loop, + &active_subscriptions, + chain_id, + &providers, + close_receiver, + &print_tx, + ) + .await; + let Err(e) = r else { + return; + }; + verbose_print( + &print_tx, + &format!( + "eth: closed local subscription due to error {e:?}" + ), + ) + .await; + kernel_message( + &our, + rand::random(), + target.clone(), + rsvp, + true, + None, + EthSubResult::Err(e), + &send_to_loop, + ) + .await; + }), + )), ); } Err((provider_node, remote_sub_id)) => { @@ -169,7 +183,7 @@ async fn build_subscription( providers: &Providers, response_channels: &ResponseChannels, print_tx: &PrintSender, -) -> Result, EthError> { +) -> Result, EthError> { let EthAction::SubscribeLogs { chain_id, kind, @@ -244,7 +258,7 @@ async fn build_subscription( ) .await; } - return Ok(Ok(rx)); + return Ok(Ok((rx, chain_id))); } Err(rpc_error) => { verbose_print( @@ -367,37 +381,79 @@ async fn maintain_local_subscription( rsvp: &Option
, send_to_loop: &MessageSender, active_subscriptions: &ActiveSubscriptions, -) -> EthSubError { - while let Ok(value) = rx.recv().await { - let result: SubscriptionResult = match serde_json::from_str(value.get()) { - Ok(res) => res, - Err(e) => { - return EthSubError { - id: sub_id, - error: e.to_string(), - } - } - }; - kernel_message( - our, - rand::random(), - target.clone(), - rsvp.clone(), - true, - None, - EthSubResult::Ok(EthSub { id: sub_id, result }), - &send_to_loop, - ) - .await; + chain_id: u64, + providers: &Providers, + mut close_receiver: tokio::sync::mpsc::Receiver, + print_tx: &PrintSender, +) -> Result<(), EthSubError> { + loop { + tokio::select! { + _ = close_receiver.recv() => { + unsubscribe(rx, &chain_id, providers, print_tx).await; + return Ok(()); + }, + value = rx.recv() => { + let Ok(value) = value else { + break; + }; + let result: SubscriptionResult = match serde_json::from_str(value.get()) { + Ok(res) => res, + Err(e) => { + return Err(EthSubError { + id: sub_id, + error: e.to_string(), + }); + } + }; + kernel_message( + our, + rand::random(), + target.clone(), + rsvp.clone(), + true, + None, + EthSubResult::Ok(EthSub { id: sub_id, result }), + &send_to_loop, + ) + .await; + }, + } } active_subscriptions .entry(target.clone()) .and_modify(|sub_map| { sub_map.remove(&sub_id); }); - EthSubError { + unsubscribe(rx, &chain_id, providers, print_tx).await; + Err(EthSubError { id: sub_id, - error: "subscription closed unexpectedly".to_string(), + error: format!("subscription ({target}) closed unexpectedly"), + }) +} + +async fn unsubscribe( + rx: RawSubscription, + chain_id: &u64, + providers: &Providers, + print_tx: &PrintSender, +) { + let alloy_sub_id = rx.local_id(); + let alloy_sub_id = alloy_sub_id.clone().into(); + let Some(chain_providers) = providers.get_mut(chain_id) else { + return; //? + }; + for url in chain_providers.urls.iter() { + let Some(pubsub) = url.pubsub.as_ref() else { + continue; + }; + if let Err(err) = pubsub.unsubscribe(alloy_sub_id) { + let _ = print_tx + .send(Printout { + verbosity: 0, + content: format!("unsubscribe from ETH RPC failed: {err:?}"), + }) + .await; + } } } diff --git a/lib/Cargo.toml b/lib/Cargo.toml index c82aba34e..704ace6c7 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lib" authors = ["KinodeDAO"] -version = "0.9.0" +version = "0.9.1" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org" @@ -15,7 +15,7 @@ kit = { git = "https://github.com/kinode-dao/kit", tag = "v0.6.8" } tokio = "1.28" [dependencies] -alloy = { version = "0.1.3", features = [ +alloy = { git = "https://github.com/kinode-dao/alloy.git", rev = "e672f3e", features = [ "json-rpc", "rpc-types", "rpc-types-eth",