From 612c796fbf842ff688c87bd425c46b042ca140f8 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Wed, 21 Aug 2024 15:21:41 +0300 Subject: [PATCH 01/22] kns_indexer: make newHeads subscription more robust --- .../kns_indexer/kns_indexer/src/lib.rs | 54 +++++++++++++------ 1 file changed, 39 insertions(+), 15 deletions(-) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 5e14c5475..cef1b2cd1 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -36,6 +36,7 @@ const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK; // optimism const KIMAP_FIRST_BLOCK: u64 = 1; // local const MAX_PENDING_ATTEMPTS: u8 = 3; +const SUBSCRIPTION_TIMEOUT: u64 = 60; #[derive(Clone, Debug, Serialize, Deserialize)] struct State { @@ -113,7 +114,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // 60s timeout -- these calls can take a long time // if they do time out, we try them again - let eth_provider: eth::Provider = eth::Provider::new(state.chain_id, 60); + let eth_provider: eth::Provider = eth::Provider::new(state.chain_id, SUBSCRIPTION_TIMEOUT); print_to_terminal( 1, @@ -128,7 +129,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { println!("subscribing to new logs..."); eth_provider.subscribe_loop(1, mints_filter.clone()); eth_provider.subscribe_loop(2, notes_filter.clone()); - listen_to_new_blocks(); // sub_id: 3 + listen_to_new_blocks_loop(); // sub_id: 3 // if block in state is < current_block, get logs from that part. println!("syncing old logs..."); @@ -250,7 +251,7 @@ fn handle_eth_message( } Ok(Err(e)) => { print_to_terminal( - 1, + 0, &format!("got eth subscription error ({e:?}), resubscribing"), ); if e.id == 1 { @@ -258,7 +259,7 @@ fn handle_eth_message( } else if e.id == 2 { eth_provider.subscribe_loop(2, notes_filter.clone()); } else if e.id == 3 { - listen_to_new_blocks(); + listen_to_new_blocks_loop(); } } Err(e) => { @@ -644,16 +645,39 @@ pub fn bytes_to_port(bytes: &[u8]) -> anyhow::Result { } } -fn listen_to_new_blocks() { - let eth_newheads_sub = eth::EthAction::SubscribeLogs { - sub_id: 3, - chain_id: CHAIN_ID, - kind: eth::SubscriptionKind::NewHeads, - params: eth::Params::Bool(false), - }; +fn listen_to_new_blocks_loop() { + loop { + let eth_newheads_sub = eth::EthAction::SubscribeLogs { + sub_id: 3, + chain_id: CHAIN_ID, + kind: eth::SubscriptionKind::NewHeads, + params: eth::Params::Bool(false), + }; - Request::to(("our", "eth", "distro", "sys")) - .body(serde_json::to_vec(ð_newheads_sub).unwrap()) - .send() - .unwrap(); + match Request::to(("our", "eth", "distro", "sys")) + .body(serde_json::to_vec(ð_newheads_sub).unwrap()) + .send_and_await_response(SUBSCRIPTION_TIMEOUT) + { + Ok(Ok(Message::Response { body, .. })) => { + match serde_json::from_slice::(&body) { + Ok(eth::EthResponse::Ok) => { + print_to_terminal(0, "successfully subscribed to newHeads"); + break; + } + Ok(eth::EthResponse::Err(e)) => { + print_to_terminal(0, &format!("failed to subscribe to new blocks: {e:?}")); + } + _ => { + print_to_terminal(0, "sailed to subscribe to new blocks, weird response."); + } + } + } + _ => { + print_to_terminal(0, "Failed to subscribe to new blocks, no response."); + } + } + + print_to_terminal(0, "retrying new block subscription in 5 seconds..."); + std::thread::sleep(std::time::Duration::from_secs(5)); + } } From 8226b0a09c818c3b31cfa9c9a8322a407ac6fc0c Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Wed, 21 Aug 2024 16:11:49 +0300 Subject: [PATCH 02/22] kns_indexer: unsubscribe newHeads after pending cleared --- .../packages/kns_indexer/kns_indexer/src/lib.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index cef1b2cd1..e110b3027 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -129,7 +129,6 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { println!("subscribing to new logs..."); eth_provider.subscribe_loop(1, mints_filter.clone()); eth_provider.subscribe_loop(2, notes_filter.clone()); - listen_to_new_blocks_loop(); // sub_id: 3 // if block in state is < current_block, get logs from that part. println!("syncing old logs..."); @@ -220,6 +219,11 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { } } } + + if !pending_requests.is_empty() || !pending_notes.is_empty() { + print_to_terminal(0, "subscribing to newHeads..."); + listen_to_new_blocks_loop(); // sub_id: 3 + } } } } @@ -269,6 +273,16 @@ fn handle_eth_message( handle_pending_requests(state, pending_requests)?; handle_pending_notes(state, pending_notes)?; + + // if both pending_requests and pending_notes are empty, we kill the newHeads subscription + if pending_requests.is_empty() && pending_notes.is_empty() { + if let Err(e) = eth_provider.unsubscribe(3) { + print_to_terminal(0, &format!("failed to unsubscribe from newHeads: {e:?}")); + } else { + print_to_terminal(0, "unsubscribed from newHeads"); + } + } + Ok(()) } From 311842f129c5f11fb93a2f997143c72e011b2dc4 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Wed, 21 Aug 2024 16:16:15 +0300 Subject: [PATCH 03/22] kns_indexer: do not unsubscribe if sub not open --- kinode/packages/kns_indexer/kns_indexer/src/lib.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index e110b3027..aef9788e3 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -49,6 +49,8 @@ struct State { nodes: HashMap, // last block we have an update from last_block: u64, + // whether we are listening for new blocks + listening_newblocks: bool, } // note: not defined in wit api right now like IndexerRequests. @@ -80,6 +82,7 @@ fn init(our: Address) { nodes: HashMap::new(), names: HashMap::new(), last_block: KIMAP_FIRST_BLOCK, + listening_newblocks: false, }; if let Err(e) = main(our, state) { @@ -220,9 +223,12 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { } } - if !pending_requests.is_empty() || !pending_notes.is_empty() { + if !state.listening_newblocks + && (!pending_requests.is_empty() || !pending_notes.is_empty()) + { print_to_terminal(0, "subscribing to newHeads..."); listen_to_new_blocks_loop(); // sub_id: 3 + state.listening_newblocks = true; } } } @@ -275,10 +281,11 @@ fn handle_eth_message( handle_pending_notes(state, pending_notes)?; // if both pending_requests and pending_notes are empty, we kill the newHeads subscription - if pending_requests.is_empty() && pending_notes.is_empty() { + if state.listening_newblocks && pending_requests.is_empty() && pending_notes.is_empty() { if let Err(e) = eth_provider.unsubscribe(3) { print_to_terminal(0, &format!("failed to unsubscribe from newHeads: {e:?}")); } else { + state.listening_newblocks = false; print_to_terminal(0, "unsubscribed from newHeads"); } } From 2697f0a8f3f8db43323b474f8a67e0c2f4f0d590 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Wed, 21 Aug 2024 16:41:49 +0300 Subject: [PATCH 04/22] kns: wip --- .../kns_indexer/kns_indexer/src/lib.rs | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index aef9788e3..e78c71b7a 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -277,6 +277,7 @@ fn handle_eth_message( } } + println!("handling pending requests and notes..."); handle_pending_requests(state, pending_requests)?; handle_pending_notes(state, pending_notes)?; @@ -515,21 +516,20 @@ fn handle_log( if !kimap::valid_note(¬e) { return Err(anyhow::anyhow!("skipping invalid note: {note}")); } - - if let Err(e) = handle_note(state, &decoded) { - match e.downcast_ref::() { - None => print_to_terminal(1, &format!("note handling error: {e:?}")), - Some(ee) => match ee { - KnsError::NoParentError => { - print_to_terminal(1, &format!("note awaiting mint: place in pending")); - if let Some(block_number) = log.block_number { - pending_notes - .entry(block_number) - .or_default() - .push((decoded, 0)); - } - } - }, + if let Some(block_number) = log.block_number { + print_to_terminal( + 0, + &format!("adding note to pending_notes for block {block_number}"), + ); + pending_notes + .entry(block_number) + .or_default() + .push((decoded, 0)); + + if !state.listening_newblocks && !pending_notes.is_empty() { + print_to_terminal(0, "subscribing to newHeads..."); + listen_to_new_blocks_loop(); // sub_id: 3 + state.listening_newblocks = true; } } } From f8a7e8fc5399ec2d201fbe28fabb8aefa5f9fcd3 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 21 Aug 2024 08:14:36 -0700 Subject: [PATCH 05/22] kns: sub for notes --- kinode/packages/kns_indexer/kns_indexer/src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index aef9788e3..8b32e25c2 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -531,6 +531,12 @@ fn handle_log( } }, } + + if !state.listening_newblocks && !pending_notes.is_empty() { + print_to_terminal(0, "subscribing to newHeads..."); + listen_to_new_blocks_loop(); // sub_id: 3 + state.listening_newblocks = true; + } } } _log => { From fe50f314d96d0c2e9f52fe6fda50d42b4eef0a07 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 21 Aug 2024 09:48:49 -0700 Subject: [PATCH 06/22] add wip --- kinode/src/eth/mod.rs | 7 +- kinode/src/eth/subscription.rs | 149 +++++++++++++++++++++------------ 2 files changed, 98 insertions(+), 58 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 77812ea2d..f6c67d852 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -100,7 +100,7 @@ type ResponseChannels = Arc>; #[derive(Debug)] enum ActiveSub { - Local(JoinHandle<()>), + Local((tokio::sync::mpsc::Sender, JoinHandle<()>)), Remote { provider_node: String, handle: JoinHandle<()>, @@ -111,8 +111,9 @@ enum ActiveSub { impl ActiveSub { async fn close(&self, sub_id: u64, state: &ModuleState) { match self { - ActiveSub::Local(handle) => { - handle.abort(); + ActiveSub::Local((close_sender, _handle)) => { + close_sender.send(true).await.unwrap(); + //handle.abort(); } ActiveSub::Remote { provider_node, diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 21e78021c..7a8ed7da9 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -1,4 +1,5 @@ use crate::eth::*; +use alloy::primitives::{B256, U256}; use alloy::pubsub::RawSubscription; use alloy::rpc::types::eth::pubsub::SubscriptionResult; @@ -70,40 +71,51 @@ pub async fn create_new_subscription( let send_to_loop = send_to_loop.clone(); let print_tx = print_tx.clone(); let active_subscriptions = active_subscriptions.clone(); + let providers = providers.clone(); + let (close_sender, close_receiver) = tokio::sync::mpsc::channel(1); match maybe_raw_sub { - Ok(rx) => { + Ok((rx, chain_id)) => { subs.insert( sub_id, // this is a local sub, as in, we connect to the rpc endpoint - ActiveSub::Local(tokio::spawn(async move { - // await the subscription error and kill it if so - let e = maintain_local_subscription( - &our, - sub_id, - rx, - &target, - &rsvp, - &send_to_loop, - &active_subscriptions, - ) - .await; - verbose_print( - &print_tx, - &format!("eth: closed local subscription due to error {e:?}"), - ) - .await; - kernel_message( - &our, - rand::random(), - target.clone(), - rsvp, - true, - None, - EthSubResult::Err(e), - &send_to_loop, - ) - .await; - })), + ActiveSub::Local(( + close_sender, + tokio::spawn(async move { + // await the subscription error and kill it if so + let r = maintain_local_subscription( + &our, + sub_id, + rx, + &target, + &rsvp, + &send_to_loop, + &active_subscriptions, + chain_id, + &providers, + close_receiver, + ) + .await; + let Err(e) = r else { + return; + }; + verbose_print( + &print_tx, + &format!("eth: closed local subscription due to error {e:?}"), + ) + .await; + kernel_message( + &our, + rand::random(), + target.clone(), + rsvp, + true, + None, + EthSubResult::Err(e), + &send_to_loop, + ) + .await; + }), + )), ); } Err((provider_node, remote_sub_id)) => { @@ -169,7 +181,7 @@ async fn build_subscription( providers: &Providers, response_channels: &ResponseChannels, print_tx: &PrintSender, -) -> Result, EthError> { +) -> Result, EthError> { let EthAction::SubscribeLogs { chain_id, kind, @@ -244,7 +256,7 @@ async fn build_subscription( ) .await; } - return Ok(Ok(rx)); + return Ok(Ok((rx, chain_id))); } Err(rpc_error) => { verbose_print( @@ -367,38 +379,65 @@ async fn maintain_local_subscription( rsvp: &Option
, send_to_loop: &MessageSender, active_subscriptions: &ActiveSubscriptions, -) -> EthSubError { - while let Ok(value) = rx.recv().await { - let result: SubscriptionResult = match serde_json::from_str(value.get()) { - Ok(res) => res, - Err(e) => { - return EthSubError { - id: sub_id, - error: e.to_string(), + chain_id: u64, + providers: &Providers, + mut close_receiver: tokio::sync::mpsc::Receiver, +) -> Result<(), EthSubError> { + loop { + tokio::select! { + _ = close_receiver.recv() => { + let alloy_sub_id = rx.local_id(); + let alloy_sub_id = alloy_sub_id.into(); + //let alloy_sub_id = rx.local_id().; + //let alloy_sub_id = alloy::primitives::U256::from(alloy_sub_id.clone()); + //let alloy_sub_id: alloy::primitives::Uint<256, 4> = alloy::primitives::Uint::from(alloy_sub_id); + let Some(chain_providers) = providers.get_mut(&chain_id) else { + return Ok(()); //? + }; + for url in chain_providers.urls { + let Some(pubsub) = url.pubsub else { + continue; + }; + pubsub.unsubscribe(alloy_sub_id); } - } - }; - kernel_message( - our, - rand::random(), - target.clone(), - rsvp.clone(), - true, - None, - EthSubResult::Ok(EthSub { id: sub_id, result }), - &send_to_loop, - ) - .await; + return Ok(()); + }, + value = rx.recv() => { + let Ok(value) = value else { + break; + }; + let result: SubscriptionResult = match serde_json::from_str(value.get()) { + Ok(res) => res, + Err(e) => { + return Err(EthSubError { + id: sub_id, + error: e.to_string(), + }); + } + }; + kernel_message( + our, + rand::random(), + target.clone(), + rsvp.clone(), + true, + None, + EthSubResult::Ok(EthSub { id: sub_id, result }), + &send_to_loop, + ) + .await; + }, + } } active_subscriptions .entry(target.clone()) .and_modify(|sub_map| { sub_map.remove(&sub_id); }); - EthSubError { + Err(EthSubError { id: sub_id, error: "subscription closed unexpectedly".to_string(), - } + }) } /// handle the subscription updates from a remote provider, From 9c20cf5c7c1411fc2ae52006805b8a798249fe36 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 21 Aug 2024 16:49:14 +0000 Subject: [PATCH 07/22] Format Rust code using rustfmt --- kinode/src/eth/subscription.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 7a8ed7da9..cdc0bd466 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -100,7 +100,9 @@ pub async fn create_new_subscription( }; verbose_print( &print_tx, - &format!("eth: closed local subscription due to error {e:?}"), + &format!( + "eth: closed local subscription due to error {e:?}" + ), ) .await; kernel_message( From e1106b40973a62f5aaf9463135a96f389f98b278 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Wed, 21 Aug 2024 20:01:45 +0300 Subject: [PATCH 08/22] eth: borrow fixes unsubscribe --- kinode/src/eth/subscription.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index cdc0bd466..d78eac99a 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -1,5 +1,4 @@ use crate::eth::*; -use alloy::primitives::{B256, U256}; use alloy::pubsub::RawSubscription; use alloy::rpc::types::eth::pubsub::SubscriptionResult; @@ -389,18 +388,16 @@ async fn maintain_local_subscription( tokio::select! { _ = close_receiver.recv() => { let alloy_sub_id = rx.local_id(); - let alloy_sub_id = alloy_sub_id.into(); - //let alloy_sub_id = rx.local_id().; - //let alloy_sub_id = alloy::primitives::U256::from(alloy_sub_id.clone()); - //let alloy_sub_id: alloy::primitives::Uint<256, 4> = alloy::primitives::Uint::from(alloy_sub_id); + let alloy_sub_id = alloy_sub_id.clone().into(); let Some(chain_providers) = providers.get_mut(&chain_id) else { return Ok(()); //? }; - for url in chain_providers.urls { - let Some(pubsub) = url.pubsub else { + for url in chain_providers.urls.iter() { + let Some(pubsub) = url.pubsub.as_ref() else { continue; }; - pubsub.unsubscribe(alloy_sub_id); + let x = pubsub.unsubscribe(alloy_sub_id); + println!("we just tried unsubscribing unsubscribed: {:?}", x); } return Ok(()); }, From db9eda4d2e0211485bc1b9bdac0c5b68ef9b58f2 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 21 Aug 2024 15:28:14 -0700 Subject: [PATCH 09/22] add some prints (and unsub in another case) --- Cargo.lock | 336 +++++++++++++++--- kinode/Cargo.toml | 3 +- kinode/packages/app_store/chain/src/lib.rs | 2 + .../packages/app_store/ui/package-lock.json | 2 + .../kns_indexer/kns_indexer/src/lib.rs | 18 +- kinode/src/eth/mod.rs | 6 +- kinode/src/eth/subscription.rs | 35 +- lib/Cargo.toml | 3 +- 8 files changed, 332 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd1476831..760f5844e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,21 +97,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ba1c79677c9ce51c8d45e20845b05e6fb070ea2c863fba03ad6af2c778474bd" dependencies = [ "alloy-consensus 0.1.4", - "alloy-contract", "alloy-core", "alloy-eips 0.1.4", "alloy-genesis 0.1.4", "alloy-json-rpc 0.1.4", - "alloy-network", - "alloy-provider", - "alloy-pubsub", - "alloy-rpc-client", + "alloy-provider 0.1.4", + "alloy-rpc-client 0.1.4", "alloy-rpc-types 0.1.4", "alloy-serde 0.1.4", - "alloy-signer", + "alloy-transport-http 0.1.4", +] + +[[package]] +name = "alloy" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-consensus 0.2.1", + "alloy-contract", + "alloy-core", + "alloy-eips 0.2.1", + "alloy-genesis 0.2.1", + "alloy-json-rpc 0.2.1", + "alloy-network 0.2.1", + "alloy-provider 0.2.1", + "alloy-pubsub", + "alloy-rpc-client 0.2.1", + "alloy-rpc-types 0.2.1", + "alloy-serde 0.2.1", + "alloy-signer 0.2.1", "alloy-signer-local", - "alloy-transport 0.1.4", - "alloy-transport-http", + "alloy-transport 0.2.1", + "alloy-transport-http 0.2.1", "alloy-transport-ws", ] @@ -153,21 +170,34 @@ dependencies = [ "serde", ] +[[package]] +name = "alloy-consensus" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-eips 0.2.1", + "alloy-primitives", + "alloy-rlp", + "alloy-serde 0.2.1", + "c-kzg", + "serde", +] + [[package]] name = "alloy-contract" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dc6957ff706f9e5f6fd42f52a93e4bce476b726c92d077b348de28c4a76730c" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", - "alloy-network", + "alloy-network 0.2.1", + "alloy-network-primitives", "alloy-primitives", - "alloy-provider", + "alloy-provider 0.2.1", "alloy-pubsub", - "alloy-rpc-types-eth", + "alloy-rpc-types-eth 0.2.1", "alloy-sol-types", - "alloy-transport 0.1.4", + "alloy-transport 0.2.1", "futures", "futures-util", "thiserror", @@ -230,6 +260,20 @@ dependencies = [ "sha2", ] +[[package]] +name = "alloy-eips" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-primitives", + "alloy-rlp", + "alloy-serde 0.2.1", + "c-kzg", + "once_cell", + "serde", + "sha2", +] + [[package]] name = "alloy-genesis" version = "0.1.0" @@ -251,6 +295,16 @@ dependencies = [ "serde", ] +[[package]] +name = "alloy-genesis" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-primitives", + "alloy-serde 0.2.1", + "serde", +] + [[package]] name = "alloy-json-abi" version = "0.7.7" @@ -288,6 +342,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "alloy-json-rpc" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-primitives", + "alloy-sol-types", + "serde", + "serde_json", + "thiserror", + "tracing", +] + [[package]] name = "alloy-network" version = "0.1.4" @@ -298,9 +365,9 @@ dependencies = [ "alloy-eips 0.1.4", "alloy-json-rpc 0.1.4", "alloy-primitives", - "alloy-rpc-types-eth", + "alloy-rpc-types-eth 0.1.4", "alloy-serde 0.1.4", - "alloy-signer", + "alloy-signer 0.1.4", "alloy-sol-types", "async-trait", "auto_impl", @@ -308,6 +375,36 @@ dependencies = [ "thiserror", ] +[[package]] +name = "alloy-network" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-consensus 0.2.1", + "alloy-eips 0.2.1", + "alloy-json-rpc 0.2.1", + "alloy-network-primitives", + "alloy-primitives", + "alloy-rpc-types-eth 0.2.1", + "alloy-serde 0.2.1", + "alloy-signer 0.2.1", + "alloy-sol-types", + "async-trait", + "auto_impl", + "futures-utils-wasm", + "thiserror", +] + +[[package]] +name = "alloy-network-primitives" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-primitives", + "alloy-serde 0.2.1", + "serde", +] + [[package]] name = "alloy-primitives" version = "0.7.7" @@ -340,18 +437,50 @@ dependencies = [ "alloy-consensus 0.1.4", "alloy-eips 0.1.4", "alloy-json-rpc 0.1.4", - "alloy-network", + "alloy-network 0.1.4", "alloy-primitives", - "alloy-pubsub", - "alloy-rpc-client", - "alloy-rpc-types-eth", + "alloy-rpc-client 0.1.4", + "alloy-rpc-types-eth 0.1.4", "alloy-transport 0.1.4", - "alloy-transport-http", + "alloy-transport-http 0.1.4", + "async-stream", + "async-trait", + "auto_impl", + "dashmap 5.5.3", + "futures", + "futures-utils-wasm", + "lru", + "pin-project", + "reqwest 0.12.5", + "serde", + "serde_json", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "alloy-provider" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-chains", + "alloy-consensus 0.2.1", + "alloy-eips 0.2.1", + "alloy-json-rpc 0.2.1", + "alloy-network 0.2.1", + "alloy-network-primitives", + "alloy-primitives", + "alloy-pubsub", + "alloy-rpc-client 0.2.1", + "alloy-rpc-types-eth 0.2.1", + "alloy-transport 0.2.1", + "alloy-transport-http 0.2.1", "alloy-transport-ws", "async-stream", "async-trait", "auto_impl", - "dashmap", + "dashmap 6.0.1", "futures", "futures-utils-wasm", "lru", @@ -359,6 +488,7 @@ dependencies = [ "reqwest 0.12.5", "serde", "serde_json", + "thiserror", "tokio", "tracing", "url", @@ -366,13 +496,12 @@ dependencies = [ [[package]] name = "alloy-pubsub" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a7341322d9bc0e49f6e9fd9f2eb8e30f73806f2dd12cbb3d6bab2694c921f87" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" dependencies = [ - "alloy-json-rpc 0.1.4", + "alloy-json-rpc 0.2.1", "alloy-primitives", - "alloy-transport 0.1.4", + "alloy-transport 0.2.1", "bimap", "futures", "serde", @@ -412,10 +541,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ba31bae67773fd5a60020bea900231f8396202b7feca4d0c70c6b59308ab4a8" dependencies = [ "alloy-json-rpc 0.1.4", + "alloy-transport 0.1.4", + "alloy-transport-http 0.1.4", + "futures", + "pin-project", + "reqwest 0.12.5", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tower", + "tracing", + "url", +] + +[[package]] +name = "alloy-rpc-client" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-json-rpc 0.2.1", "alloy-primitives", "alloy-pubsub", - "alloy-transport 0.1.4", - "alloy-transport-http", + "alloy-transport 0.2.1", + "alloy-transport-http 0.2.1", "alloy-transport-ws", "futures", "pin-project", @@ -453,10 +602,20 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "184a7a42c7ba9141cc9e76368356168c282c3bc3d9e5d78f3556bdfe39343447" dependencies = [ - "alloy-rpc-types-eth", + "alloy-rpc-types-eth 0.1.4", "alloy-serde 0.1.4", ] +[[package]] +name = "alloy-rpc-types" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-rpc-types-eth 0.2.1", + "alloy-serde 0.2.1", + "serde", +] + [[package]] name = "alloy-rpc-types-eth" version = "0.1.4" @@ -475,6 +634,24 @@ dependencies = [ "thiserror", ] +[[package]] +name = "alloy-rpc-types-eth" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-consensus 0.2.1", + "alloy-eips 0.2.1", + "alloy-network-primitives", + "alloy-primitives", + "alloy-rlp", + "alloy-serde 0.2.1", + "alloy-sol-types", + "itertools 0.13.0", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "alloy-serde" version = "0.1.0" @@ -496,6 +673,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "alloy-serde" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-primitives", + "serde", + "serde_json", +] + [[package]] name = "alloy-signer" version = "0.1.4" @@ -510,16 +697,28 @@ dependencies = [ "thiserror", ] +[[package]] +name = "alloy-signer" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-primitives", + "async-trait", + "auto_impl", + "elliptic-curve", + "k256", + "thiserror", +] + [[package]] name = "alloy-signer-local" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dfc9c26fe6c6f1bad818c9a976de9044dd12e1f75f1f156a801ee3e8148c1b6" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" dependencies = [ - "alloy-consensus 0.1.4", - "alloy-network", + "alloy-consensus 0.2.1", + "alloy-network 0.2.1", "alloy-primitives", - "alloy-signer", + "alloy-signer 0.2.1", "async-trait", "k256", "rand 0.8.5", @@ -636,6 +835,24 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-transport" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-json-rpc 0.2.1", + "base64 0.22.1", + "futures-util", + "futures-utils-wasm", + "serde", + "serde_json", + "thiserror", + "tokio", + "tower", + "tracing", + "url", +] + [[package]] name = "alloy-transport-http" version = "0.1.4" @@ -651,14 +868,27 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-transport-http" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +dependencies = [ + "alloy-json-rpc 0.2.1", + "alloy-transport 0.2.1", + "reqwest 0.12.5", + "serde_json", + "tower", + "tracing", + "url", +] + [[package]] name = "alloy-transport-ws" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aec83fd052684556c78c54df111433493267234d82321c2236560c752f595f20" +version = "0.2.1" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" dependencies = [ "alloy-pubsub", - "alloy-transport 0.1.4", + "alloy-transport 0.2.1", "futures", "http 1.1.0", "rustls", @@ -1878,6 +2108,20 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "dashmap" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -3285,7 +3529,7 @@ name = "kinode" version = "0.9.0" dependencies = [ "aes-gcm", - "alloy", + "alloy 0.2.1", "alloy-primitives", "alloy-sol-macro", "alloy-sol-types", @@ -3296,7 +3540,7 @@ dependencies = [ "chrono", "clap", "crossterm", - "dashmap", + "dashmap 5.5.3", "flate2", "futures", "generic-array", @@ -3372,7 +3616,7 @@ name = "kinode_process_lib" version = "0.9.0" source = "git+https://github.com/kinode-dao/process_lib?tag=v0.9.0#284f202376b3cd3ce0c03aa660a006fc6187f236" dependencies = [ - "alloy", + "alloy 0.1.4", "alloy-primitives", "alloy-sol-macro", "alloy-sol-types", @@ -3394,7 +3638,7 @@ name = "kinode_process_lib" version = "0.9.0" source = "git+https://github.com/kinode-dao/process_lib?branch=develop#5c1d8ed36cf10688808c09357ef0e43225396097" dependencies = [ - "alloy", + "alloy 0.1.4", "alloy-primitives", "alloy-sol-macro", "alloy-sol-types", @@ -3513,7 +3757,7 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" name = "lib" version = "0.9.0" dependencies = [ - "alloy", + "alloy 0.2.1", "kit 0.6.8", "lazy_static", "rand 0.8.5", diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index c0e15786e..6f78be02f 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -26,7 +26,8 @@ simulation-mode = [] [dependencies] aes-gcm = "0.10.3" -alloy = { version = "0.1.3", features = [ +#alloy = { version = "0.1.3", features = [ +alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "c73e70d", features = [ "consensus", "contract", "json-rpc", diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index 61d0b1d42..ca7ae7d3a 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -123,6 +123,7 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow } } else { // attempt to resubscribe + println!("attempting resub"); state .kimap .provider @@ -341,6 +342,7 @@ pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State) { let filter = app_store_filter(state); // get past logs, subscribe to new ones. // subscribe first so we don't miss any logs + println!("subscribing..."); state.kimap.provider.subscribe_loop(1, filter.clone()); for log in fetch_logs( &state.kimap.provider, diff --git a/kinode/packages/app_store/ui/package-lock.json b/kinode/packages/app_store/ui/package-lock.json index 54d875285..f65b58a3e 100644 --- a/kinode/packages/app_store/ui/package-lock.json +++ b/kinode/packages/app_store/ui/package-lock.json @@ -3749,6 +3749,8 @@ }, "node_modules/@parcel/watcher-wasm/node_modules/napi-wasm": { "version": "1.1.0", + "resolved": "https://registry.npmjs.org/napi-wasm/-/napi-wasm-1.1.0.tgz", + "integrity": "sha512-lHwIAJbmLSjF9VDRm9GoVOy9AGp3aIvkjv+Kvz9h16QR3uSVYH78PNQUnT2U4X53mhlnV2M7wrhibQ3GHicDmg==", "inBundle": true, "license": "MIT" }, diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 8b32e25c2..ce61063a4 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -223,10 +223,8 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { } } - if !state.listening_newblocks - && (!pending_requests.is_empty() || !pending_notes.is_empty()) - { - print_to_terminal(0, "subscribing to newHeads..."); + if !state.listening_newblocks && !pending_requests.is_empty() { + print_to_terminal(0, "subscribing to newHeads for req..."); listen_to_new_blocks_loop(); // sub_id: 3 state.listening_newblocks = true; } @@ -269,6 +267,7 @@ fn handle_eth_message( } else if e.id == 2 { eth_provider.subscribe_loop(2, notes_filter.clone()); } else if e.id == 3 { + print_to_terminal(0, "subscribing to newHeads for retry..."); listen_to_new_blocks_loop(); } } @@ -527,16 +526,15 @@ fn handle_log( .entry(block_number) .or_default() .push((decoded, 0)); + if !state.listening_newblocks { + print_to_terminal(0, "subscribing to newHeads for note..."); + listen_to_new_blocks_loop(); // sub_id: 3 + state.listening_newblocks = true; + } } } }, } - - if !state.listening_newblocks && !pending_notes.is_empty() { - print_to_terminal(0, "subscribing to newHeads..."); - listen_to_new_blocks_loop(); // sub_id: 3 - state.listening_newblocks = true; - } } } _log => { diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index f6c67d852..d8b50ed3f 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -507,13 +507,15 @@ async fn handle_eth_action( verbose_print( &state.print_tx, &format!( - "eth: handling {} from {}", + "eth: handling {} from {}; active_subs len: {:?}", + //"eth: handling {} from {}", match ð_action { EthAction::SubscribeLogs { .. } => "subscribe", EthAction::UnsubscribeLogs(_) => "unsubscribe", EthAction::Request { .. } => "request", }, - km.source + km.source, + state.active_subscriptions.iter().map(|v| v.len()).collect::>(), ), ) .await; diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index d78eac99a..75334c232 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -257,6 +257,9 @@ async fn build_subscription( ) .await; } + let alloy_sub_id = rx.local_id(); + let alloy_sub_id: alloy::primitives::U256 = alloy_sub_id.clone().into(); + println!("{target} making sub {:?}", alloy_sub_id); return Ok(Ok((rx, chain_id))); } Err(rpc_error) => { @@ -387,22 +390,12 @@ async fn maintain_local_subscription( loop { tokio::select! { _ = close_receiver.recv() => { - let alloy_sub_id = rx.local_id(); - let alloy_sub_id = alloy_sub_id.clone().into(); - let Some(chain_providers) = providers.get_mut(&chain_id) else { - return Ok(()); //? - }; - for url in chain_providers.urls.iter() { - let Some(pubsub) = url.pubsub.as_ref() else { - continue; - }; - let x = pubsub.unsubscribe(alloy_sub_id); - println!("we just tried unsubscribing unsubscribed: {:?}", x); - } + unsubscribe(rx, &chain_id, providers); return Ok(()); }, value = rx.recv() => { let Ok(value) = value else { + println!("sub failed: {:?}\r", value.unwrap_err()); break; }; let result: SubscriptionResult = match serde_json::from_str(value.get()) { @@ -433,12 +426,28 @@ async fn maintain_local_subscription( .and_modify(|sub_map| { sub_map.remove(&sub_id); }); + unsubscribe(rx, &chain_id, providers); Err(EthSubError { id: sub_id, - error: "subscription closed unexpectedly".to_string(), + error: format!("subscription ({target}) closed unexpectedly"), }) } +fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) { + let alloy_sub_id = rx.local_id(); + let alloy_sub_id = alloy_sub_id.clone().into(); + let Some(chain_providers) = providers.get_mut(chain_id) else { + return; //? + }; + for url in chain_providers.urls.iter() { + let Some(pubsub) = url.pubsub.as_ref() else { + continue; + }; + let x = pubsub.unsubscribe(alloy_sub_id); + println!("we just tried unsubscribing {:?} unsubscribed: {:?}\r", alloy_sub_id, x); + } +} + /// handle the subscription updates from a remote provider, /// and also perform keepalive checks on that provider. /// current keepalive is 30s, this can be adjusted as desired diff --git a/lib/Cargo.toml b/lib/Cargo.toml index c82aba34e..a3add244a 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -15,7 +15,8 @@ kit = { git = "https://github.com/kinode-dao/kit", tag = "v0.6.8" } tokio = "1.28" [dependencies] -alloy = { version = "0.1.3", features = [ +#alloy = { version = "0.1.3", features = [ +alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "c73e70d", features = [ "json-rpc", "rpc-types", "rpc-types-eth", From 79c8abf3b689ca192257bf8824a3049edd1bd813 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 21 Aug 2024 22:28:43 +0000 Subject: [PATCH 10/22] Format Rust code using rustfmt --- kinode/src/eth/mod.rs | 6 +++++- kinode/src/eth/subscription.rs | 5 ++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index d8b50ed3f..bd6802c96 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -515,7 +515,11 @@ async fn handle_eth_action( EthAction::Request { .. } => "request", }, km.source, - state.active_subscriptions.iter().map(|v| v.len()).collect::>(), + state + .active_subscriptions + .iter() + .map(|v| v.len()) + .collect::>(), ), ) .await; diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 75334c232..7d2d5ce17 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -444,7 +444,10 @@ fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) { continue; }; let x = pubsub.unsubscribe(alloy_sub_id); - println!("we just tried unsubscribing {:?} unsubscribed: {:?}\r", alloy_sub_id, x); + println!( + "we just tried unsubscribing {:?} unsubscribed: {:?}\r", + alloy_sub_id, x + ); } } From f0a66d056410d08536758fccd4958ee571be7b65 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Thu, 22 Aug 2024 04:19:00 +0300 Subject: [PATCH 11/22] kns_indexer: replace newHeads sub with ticker --- kinode/Cargo.toml | 1 - .../packages/app_store/ui/package-lock.json | 2 - .../kns_indexer/kns_indexer/src/lib.rs | 208 ++++-------------- kinode/packages/kns_indexer/pkg/manifest.json | 6 +- lib/Cargo.toml | 1 - 5 files changed, 48 insertions(+), 170 deletions(-) diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index 6f78be02f..4f2c9be5d 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -26,7 +26,6 @@ simulation-mode = [] [dependencies] aes-gcm = "0.10.3" -#alloy = { version = "0.1.3", features = [ alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "c73e70d", features = [ "consensus", "contract", diff --git a/kinode/packages/app_store/ui/package-lock.json b/kinode/packages/app_store/ui/package-lock.json index f65b58a3e..54d875285 100644 --- a/kinode/packages/app_store/ui/package-lock.json +++ b/kinode/packages/app_store/ui/package-lock.json @@ -3749,8 +3749,6 @@ }, "node_modules/@parcel/watcher-wasm/node_modules/napi-wasm": { "version": "1.1.0", - "resolved": "https://registry.npmjs.org/napi-wasm/-/napi-wasm-1.1.0.tgz", - "integrity": "sha512-lHwIAJbmLSjF9VDRm9GoVOy9AGp3aIvkjv+Kvz9h16QR3uSVYH78PNQUnT2U4X53mhlnV2M7wrhibQ3GHicDmg==", "inBundle": true, "license": "MIT" }, diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index ce61063a4..cd5059fa3 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -4,7 +4,7 @@ use crate::kinode::process::kns_indexer::{ use alloy_primitives::keccak256; use alloy_sol_types::SolEvent; use kinode_process_lib::{ - await_message, call_init, eth, kimap, net, print_to_terminal, println, Address, Message, + await_message, call_init, eth, kimap, net, print_to_terminal, println, timer, Address, Message, Request, Response, }; use serde::{Deserialize, Serialize}; @@ -37,6 +37,7 @@ const KIMAP_FIRST_BLOCK: u64 = 1; // local const MAX_PENDING_ATTEMPTS: u8 = 3; const SUBSCRIPTION_TIMEOUT: u64 = 60; +const NEW_BLOCK_TICK: u64 = 3000; // 3s #[derive(Clone, Debug, Serialize, Deserialize)] struct State { @@ -49,8 +50,6 @@ struct State { nodes: HashMap, // last block we have an update from last_block: u64, - // whether we are listening for new blocks - listening_newblocks: bool, } // note: not defined in wit api right now like IndexerRequests. @@ -82,7 +81,6 @@ fn init(our: Address) { nodes: HashMap::new(), names: HashMap::new(), last_block: KIMAP_FIRST_BLOCK, - listening_newblocks: false, }; if let Err(e) = main(our, state) { @@ -138,7 +136,10 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // if subscription results come back in the wrong order, we store them here // until the right block is reached. - let mut pending_requests: BTreeMap> = BTreeMap::new(); + + // pending_requests temporarily on timeout. + // very naughty. + // let mut pending_requests: BTreeMap> = BTreeMap::new(); let mut pending_notes: BTreeMap> = BTreeMap::new(); fetch_and_process_logs( @@ -159,7 +160,20 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { let Ok(message) = await_message() else { continue; }; + // if true, time to go check current block number and handle pending notes. + let tick = message.is_local(&our) && message.source().process == "timer:distro:sys"; let Message::Request { source, body, .. } = message else { + if tick { + handle_eth_message( + &mut state, + ð_provider, + tick, + &mut pending_notes, + &[], + &mints_filter, + ¬es_filter, + )?; + } continue; }; @@ -167,7 +181,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { handle_eth_message( &mut state, ð_provider, - &mut pending_requests, + tick, &mut pending_notes, &body, &mints_filter, @@ -181,53 +195,26 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { ref hash, ref block, }) => { - // make sure we've seen the whole block - if *block < state.last_block { - Response::new() - .body(serde_json::to_vec(&IndexerResponses::Name( - state.names.get(hash).cloned(), - ))?) - .send()?; - } else { - pending_requests - .entry(*block) - .or_insert(vec![]) - .push(request); - } + // TODO: make sure we've seen the whole block, while actually + // sending a response to the proper place. + Response::new() + .body(serde_json::to_vec(&IndexerResponses::Name( + state.names.get(hash).cloned(), + ))?) + .send()?; } + IndexerRequests::NodeInfo(NodeInfoRequest { ref name, block }) => { - // make sure we've seen the whole block - if block < state.last_block { - Response::new() - .body(serde_json::to_vec(&IndexerResponses::NodeInfo( - state.nodes.get(name).cloned(), - ))?) - .send()?; - } else { - pending_requests - .entry(block) - .or_insert(vec![]) - .push(request); - } + Response::new() + .body(serde_json::to_vec(&IndexerResponses::NodeInfo( + state.nodes.get(name).cloned(), + ))?) + .send()?; } IndexerRequests::GetState(GetStateRequest { block }) => { - // make sure we've seen the whole block - if block < state.last_block { - Response::new().body(serde_json::to_vec(&state)?).send()?; - } else { - pending_requests - .entry(block) - .or_insert(vec![]) - .push(request); - } + Response::new().body(serde_json::to_vec(&state)?).send()?; } } - - if !state.listening_newblocks && !pending_requests.is_empty() { - print_to_terminal(0, "subscribing to newHeads for req..."); - listen_to_new_blocks_loop(); // sub_id: 3 - state.listening_newblocks = true; - } } } } @@ -235,7 +222,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { fn handle_eth_message( state: &mut State, eth_provider: ð::Provider, - pending_requests: &mut BTreeMap>, + tick: bool, pending_notes: &mut BTreeMap>, body: &[u8], mints_filter: ð::Filter, @@ -247,14 +234,6 @@ fn handle_eth_message( if let Err(e) = handle_log(state, pending_notes, &log) { print_to_terminal(1, &format!("log-handling error! {e:?}")); } - } else if let eth::SubscriptionResult::Header(header) = result { - if let Some(block) = header.number { - // risque.. - // pending_requests/notes are kicked off with block numbers - // that are ahead of state.last_block. can be risky if event subscriptions and newHeads - // are completely out of sync. - state.last_block = block; - } } } Ok(Err(e)) => { @@ -266,78 +245,21 @@ fn handle_eth_message( eth_provider.subscribe_loop(1, mints_filter.clone()); } else if e.id == 2 { eth_provider.subscribe_loop(2, notes_filter.clone()); - } else if e.id == 3 { - print_to_terminal(0, "subscribing to newHeads for retry..."); - listen_to_new_blocks_loop(); } } - Err(e) => { - return Err(e.into()); - } + _ => {} } - - handle_pending_requests(state, pending_requests)?; - handle_pending_notes(state, pending_notes)?; - - // if both pending_requests and pending_notes are empty, we kill the newHeads subscription - if state.listening_newblocks && pending_requests.is_empty() && pending_notes.is_empty() { - if let Err(e) = eth_provider.unsubscribe(3) { - print_to_terminal(0, &format!("failed to unsubscribe from newHeads: {e:?}")); - } else { - state.listening_newblocks = false; - print_to_terminal(0, "unsubscribed from newHeads"); + if tick { + let block_number = eth_provider.get_block_number(); + if let Ok(block_number) = block_number { + print_to_terminal(1, &format!("new block: {}", block_number)); + state.last_block = block_number; } } + handle_pending_notes(state, pending_notes)?; - Ok(()) -} - -fn handle_pending_requests( - state: &mut State, - pending_requests: &mut BTreeMap>, -) -> anyhow::Result<()> { - // check the pending_requests btreemap to see if there are any requests that - // can be handled now that the state block has been updated - if pending_requests.is_empty() { - return Ok(()); - } - let mut blocks_to_remove = vec![]; - for (block, requests) in pending_requests.iter() { - // make sure we've seen the whole block - if *block < state.last_block { - for request in requests.iter() { - match request { - IndexerRequests::NamehashToName(NamehashToNameRequest { hash, .. }) => { - Response::new() - .body(serde_json::to_vec(&IndexerResponses::Name( - state.names.get(hash).cloned(), - ))?) - .send() - .unwrap(); - } - IndexerRequests::NodeInfo(NodeInfoRequest { name, .. }) => { - Response::new() - .body(serde_json::to_vec(&IndexerResponses::NodeInfo( - state.nodes.get(name).cloned(), - ))?) - .send() - .unwrap(); - } - IndexerRequests::GetState(GetStateRequest { .. }) => { - Response::new() - .body(serde_json::to_vec(&state)?) - .send() - .unwrap(); - } - } - } - blocks_to_remove.push(*block); - } else { - break; - } - } - for block in blocks_to_remove.iter() { - pending_requests.remove(block); + if !pending_notes.is_empty() { + timer::set_timer(NEW_BLOCK_TICK, None); } Ok(()) @@ -526,11 +448,6 @@ fn handle_log( .entry(block_number) .or_default() .push((decoded, 0)); - if !state.listening_newblocks { - print_to_terminal(0, "subscribing to newHeads for note..."); - listen_to_new_blocks_loop(); // sub_id: 3 - state.listening_newblocks = true; - } } } }, @@ -669,40 +586,3 @@ pub fn bytes_to_port(bytes: &[u8]) -> anyhow::Result { _ => Err(anyhow::anyhow!("Invalid byte length for port")), } } - -fn listen_to_new_blocks_loop() { - loop { - let eth_newheads_sub = eth::EthAction::SubscribeLogs { - sub_id: 3, - chain_id: CHAIN_ID, - kind: eth::SubscriptionKind::NewHeads, - params: eth::Params::Bool(false), - }; - - match Request::to(("our", "eth", "distro", "sys")) - .body(serde_json::to_vec(ð_newheads_sub).unwrap()) - .send_and_await_response(SUBSCRIPTION_TIMEOUT) - { - Ok(Ok(Message::Response { body, .. })) => { - match serde_json::from_slice::(&body) { - Ok(eth::EthResponse::Ok) => { - print_to_terminal(0, "successfully subscribed to newHeads"); - break; - } - Ok(eth::EthResponse::Err(e)) => { - print_to_terminal(0, &format!("failed to subscribe to new blocks: {e:?}")); - } - _ => { - print_to_terminal(0, "sailed to subscribe to new blocks, weird response."); - } - } - } - _ => { - print_to_terminal(0, "Failed to subscribe to new blocks, no response."); - } - } - - print_to_terminal(0, "retrying new block subscription in 5 seconds..."); - std::thread::sleep(std::time::Duration::from_secs(5)); - } -} diff --git a/kinode/packages/kns_indexer/pkg/manifest.json b/kinode/packages/kns_indexer/pkg/manifest.json index d29393b94..863653260 100644 --- a/kinode/packages/kns_indexer/pkg/manifest.json +++ b/kinode/packages/kns_indexer/pkg/manifest.json @@ -7,11 +7,13 @@ "request_capabilities": [ "eth:distro:sys", "http_server:distro:sys", - "net:distro:sys" + "net:distro:sys", + "timer:distro:sys" ], "grant_capabilities": [ "eth:distro:sys", - "http_server:distro:sys" + "http_server:distro:sys", + "timer:distro:sys" ], "public": false } diff --git a/lib/Cargo.toml b/lib/Cargo.toml index a3add244a..aafefc673 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -15,7 +15,6 @@ kit = { git = "https://github.com/kinode-dao/kit", tag = "v0.6.8" } tokio = "1.28" [dependencies] -#alloy = { version = "0.1.3", features = [ alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "c73e70d", features = [ "json-rpc", "rpc-types", From defe7654f5039bd3e17001b2ff6f0e5b932b56bc Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Thu, 22 Aug 2024 04:24:56 +0300 Subject: [PATCH 12/22] app_store UI: hotfix unpublish --- kinode/packages/app_store/ui/src/pages/PublishPage.tsx | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kinode/packages/app_store/ui/src/pages/PublishPage.tsx b/kinode/packages/app_store/ui/src/pages/PublishPage.tsx index 2692c5372..bdd28c25e 100644 --- a/kinode/packages/app_store/ui/src/pages/PublishPage.tsx +++ b/kinode/packages/app_store/ui/src/pages/PublishPage.tsx @@ -150,11 +150,12 @@ export default function PublishPage() { address: tba as `0x${string}`, functionName: 'execute', args: [ - KIMAP, + MULTICALL, BigInt(0), multicall, 1 - ] + ], + gas: BigInt(1000000), }); } catch (error) { From d72462dfe3311402d8be8879cbd0f915b8323cb4 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 21 Aug 2024 20:01:42 -0700 Subject: [PATCH 13/22] app_store: delay kns queries by 5s to allow kns time to process block --- kinode/packages/app_store/chain/src/lib.rs | 37 +++++++++++-------- kinode/packages/app_store/pkg/manifest.json | 5 ++- .../kns_indexer/kns_indexer/src/lib.rs | 8 ++-- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index ca7ae7d3a..e4d3ed5de 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -12,7 +12,7 @@ use alloy_sol_types::SolEvent; use kinode::process::chain::ChainResponses; use kinode_process_lib::{ await_message, call_init, eth, get_blob, get_state, http, kernel_types as kt, kimap, - print_to_terminal, println, Address, Message, PackageId, Request, Response, + print_to_terminal, println, timer, Address, Message, PackageId, Request, Response, }; use std::{ collections::{HashMap, HashSet}, @@ -45,6 +45,8 @@ const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK; #[cfg(feature = "simulation-mode")] const KIMAP_FIRST_BLOCK: u64 = 1; +const DELAY_MS: u64 = 5_000; + #[derive(Debug, Serialize, Deserialize)] pub struct State { /// the kimap helper we are using @@ -106,7 +108,18 @@ fn init(our: Address) { } fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow::Result<()> { - if message.is_request() { + if !message.is_request() { + if message.is_local(&our) && message.source().process == "timer:distro:sys" { + // handling of ETH RPC subscriptions delayed by DELAY_MS + // to allow kns to have a chance to process block: handle now + let Some(context) = message.context() else { + return Err(anyhow::anyhow!("foo")); + }; + let log = serde_json::from_slice(context)?; + handle_eth_log(our, state, log)?; + return Ok(()); + } + } else { let req: Req = serde_json::from_slice(message.body())?; match req { Req::Eth(eth_result) => { @@ -118,8 +131,10 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow } if let Ok(eth::EthSub { result, .. }) = eth_result { - if let eth::SubscriptionResult::Log(log) = result { - handle_eth_log(our, state, *log)?; + if let eth::SubscriptionResult::Log(ref log) = result { + // delay handling of ETH RPC subscriptions by DELAY_MS + // to allow kns to have a chance to process block + timer::set_timer(DELAY_MS, Some(serde_json::to_vec(log)?)); } } else { // attempt to resubscribe @@ -131,21 +146,15 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow } } Req::Request(chains) => { - handle_local_request(our, state, chains)?; + handle_local_request(state, chains)?; } } - } else { - return Err(anyhow::anyhow!("not a request")); } Ok(()) } -fn handle_local_request( - our: &Address, - state: &mut State, - req: ChainRequests, -) -> anyhow::Result<()> { +fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result<()> { match req { ChainRequests::GetApp(package_id) => { let onchain_app = state @@ -265,9 +274,7 @@ fn handle_eth_log(our: &Address, state: &mut State, log: eth::Log) -> anyhow::Re // if ~metadata-uri is also empty, this is an unpublish action! if metadata_uri.is_empty() { state.published.remove(&package_id); - if is_our_package { - state.listings.remove(&package_id); - } + state.listings.remove(&package_id); return Ok(()); } return Err(anyhow::anyhow!("metadata hash not found")); diff --git a/kinode/packages/app_store/pkg/manifest.json b/kinode/packages/app_store/pkg/manifest.json index c16430201..315d9641f 100644 --- a/kinode/packages/app_store/pkg/manifest.json +++ b/kinode/packages/app_store/pkg/manifest.json @@ -49,7 +49,8 @@ "kns_indexer:kns_indexer:sys", "vfs:distro:sys", "http_client:distro:sys", - "eth:distro:sys" + "eth:distro:sys", + "timer:distro:sys" ], "public": false }, @@ -98,4 +99,4 @@ ], "public": false } -] \ No newline at end of file +] diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index cd5059fa3..f62752125 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -401,6 +401,10 @@ fn handle_log( pending_notes: &mut BTreeMap>, log: ð::Log, ) -> anyhow::Result<()> { + if let Some(block) = log.block_number { + state.last_block = block; + } + match log.topics()[0] { kimap::contract::Mint::SIGNATURE_HASH => { let decoded = kimap::contract::Mint::decode_log_data(log.data(), true).unwrap(); @@ -459,10 +463,6 @@ fn handle_log( } }; - if let Some(block) = log.block_number { - state.last_block = block; - } - Ok(()) } From 881d4a6b93547ad2d10f0b8c037213f13f5708d0 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 21 Aug 2024 20:07:58 -0700 Subject: [PATCH 14/22] bump to version 0.9.1 --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- kinode/Cargo.toml | 2 +- lib/Cargo.toml | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 760f5844e..dcad6afd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3526,7 +3526,7 @@ dependencies = [ [[package]] name = "kinode" -version = "0.9.0" +version = "0.9.1" dependencies = [ "aes-gcm", "alloy 0.2.1", @@ -3584,7 +3584,7 @@ dependencies = [ [[package]] name = "kinode_lib" -version = "0.9.0" +version = "0.9.1" dependencies = [ "lib", ] diff --git a/Cargo.toml b/Cargo.toml index 1c389fc58..0ce8d435e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kinode_lib" authors = ["KinodeDAO"] -version = "0.9.0" +version = "0.9.1" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org" diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index 4f2c9be5d..b6ae0bd86 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kinode" authors = ["KinodeDAO"] -version = "0.9.0" +version = "0.9.1" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org" diff --git a/lib/Cargo.toml b/lib/Cargo.toml index aafefc673..a11d6d7c0 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lib" authors = ["KinodeDAO"] -version = "0.9.0" +version = "0.9.1" edition = "2021" description = "A general-purpose sovereign cloud computing platform" homepage = "https://kinode.org" From ac078d72ded7d7a8ba71fecb68ed429ad13f7eaa Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 21 Aug 2024 21:30:38 -0700 Subject: [PATCH 15/22] remove some prints & fix a bug --- Cargo.lock | 40 +++++++++---------- kinode/Cargo.toml | 2 +- kinode/packages/app_store/chain/src/lib.rs | 1 - .../packages/app_store/ui/package-lock.json | 2 + .../kns_indexer/kns_indexer/src/lib.rs | 11 +---- kinode/src/eth/subscription.rs | 6 --- lib/Cargo.toml | 2 +- 7 files changed, 26 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dcad6afd8..58e0870f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,7 +111,7 @@ dependencies = [ [[package]] name = "alloy" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-consensus 0.2.1", "alloy-contract", @@ -173,7 +173,7 @@ dependencies = [ [[package]] name = "alloy-consensus" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-eips 0.2.1", "alloy-primitives", @@ -186,7 +186,7 @@ dependencies = [ [[package]] name = "alloy-contract" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", @@ -263,7 +263,7 @@ dependencies = [ [[package]] name = "alloy-eips" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-primitives", "alloy-rlp", @@ -298,7 +298,7 @@ dependencies = [ [[package]] name = "alloy-genesis" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-primitives", "alloy-serde 0.2.1", @@ -345,7 +345,7 @@ dependencies = [ [[package]] name = "alloy-json-rpc" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -378,7 +378,7 @@ dependencies = [ [[package]] name = "alloy-network" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-consensus 0.2.1", "alloy-eips 0.2.1", @@ -398,7 +398,7 @@ dependencies = [ [[package]] name = "alloy-network-primitives" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-primitives", "alloy-serde 0.2.1", @@ -462,7 +462,7 @@ dependencies = [ [[package]] name = "alloy-provider" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-chains", "alloy-consensus 0.2.1", @@ -497,7 +497,7 @@ dependencies = [ [[package]] name = "alloy-pubsub" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-json-rpc 0.2.1", "alloy-primitives", @@ -558,7 +558,7 @@ dependencies = [ [[package]] name = "alloy-rpc-client" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-json-rpc 0.2.1", "alloy-primitives", @@ -609,7 +609,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-rpc-types-eth 0.2.1", "alloy-serde 0.2.1", @@ -637,7 +637,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types-eth" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-consensus 0.2.1", "alloy-eips 0.2.1", @@ -676,7 +676,7 @@ dependencies = [ [[package]] name = "alloy-serde" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-primitives", "serde", @@ -700,7 +700,7 @@ dependencies = [ [[package]] name = "alloy-signer" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-primitives", "async-trait", @@ -713,7 +713,7 @@ dependencies = [ [[package]] name = "alloy-signer-local" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-consensus 0.2.1", "alloy-network 0.2.1", @@ -838,7 +838,7 @@ dependencies = [ [[package]] name = "alloy-transport" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-json-rpc 0.2.1", "base64 0.22.1", @@ -871,7 +871,7 @@ dependencies = [ [[package]] name = "alloy-transport-http" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-json-rpc 0.2.1", "alloy-transport 0.2.1", @@ -885,7 +885,7 @@ dependencies = [ [[package]] name = "alloy-transport-ws" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=c73e70d#c73e70dab6069246bbf20162fbe17c5b11d0c668" +source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" dependencies = [ "alloy-pubsub", "alloy-transport 0.2.1", @@ -3755,7 +3755,7 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "lib" -version = "0.9.0" +version = "0.9.1" dependencies = [ "alloy 0.2.1", "kit 0.6.8", diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index b6ae0bd86..1409ab1b1 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -26,7 +26,7 @@ simulation-mode = [] [dependencies] aes-gcm = "0.10.3" -alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "c73e70d", features = [ +alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "bd900b4", features = [ "consensus", "contract", "json-rpc", diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index e4d3ed5de..87a9768e4 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -138,7 +138,6 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow } } else { // attempt to resubscribe - println!("attempting resub"); state .kimap .provider diff --git a/kinode/packages/app_store/ui/package-lock.json b/kinode/packages/app_store/ui/package-lock.json index 54d875285..f65b58a3e 100644 --- a/kinode/packages/app_store/ui/package-lock.json +++ b/kinode/packages/app_store/ui/package-lock.json @@ -3749,6 +3749,8 @@ }, "node_modules/@parcel/watcher-wasm/node_modules/napi-wasm": { "version": "1.1.0", + "resolved": "https://registry.npmjs.org/napi-wasm/-/napi-wasm-1.1.0.tgz", + "integrity": "sha512-lHwIAJbmLSjF9VDRm9GoVOy9AGp3aIvkjv+Kvz9h16QR3uSVYH78PNQUnT2U4X53mhlnV2M7wrhibQ3GHicDmg==", "inBundle": true, "license": "MIT" }, diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 484e8eaaa..068662ecb 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -131,9 +131,6 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { eth_provider.subscribe_loop(1, mints_filter.clone()); eth_provider.subscribe_loop(2, notes_filter.clone()); - // if block in state is < current_block, get logs from that part. - println!("syncing old logs..."); - // if subscription results come back in the wrong order, we store them here // until the right block is reached. @@ -142,6 +139,8 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // let mut pending_requests: BTreeMap> = BTreeMap::new(); let mut pending_notes: BTreeMap> = BTreeMap::new(); + // if block in state is < current_block, get logs from that part. + println!("syncing old logs..."); fetch_and_process_logs( ð_provider, &mut state, @@ -449,12 +448,6 @@ fn handle_log( .entry(block_number) .or_default() .push((decoded, 0)); - - if !state.listening_newblocks && !pending_notes.is_empty() { - print_to_terminal(0, "subscribing to newHeads..."); - listen_to_new_blocks_loop(); // sub_id: 3 - state.listening_newblocks = true; - } } } _log => { diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 7d2d5ce17..f9240b0a7 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -259,7 +259,6 @@ async fn build_subscription( } let alloy_sub_id = rx.local_id(); let alloy_sub_id: alloy::primitives::U256 = alloy_sub_id.clone().into(); - println!("{target} making sub {:?}", alloy_sub_id); return Ok(Ok((rx, chain_id))); } Err(rpc_error) => { @@ -395,7 +394,6 @@ async fn maintain_local_subscription( }, value = rx.recv() => { let Ok(value) = value else { - println!("sub failed: {:?}\r", value.unwrap_err()); break; }; let result: SubscriptionResult = match serde_json::from_str(value.get()) { @@ -444,10 +442,6 @@ fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) { continue; }; let x = pubsub.unsubscribe(alloy_sub_id); - println!( - "we just tried unsubscribing {:?} unsubscribed: {:?}\r", - alloy_sub_id, x - ); } } diff --git a/lib/Cargo.toml b/lib/Cargo.toml index a11d6d7c0..811a5c9aa 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -15,7 +15,7 @@ kit = { git = "https://github.com/kinode-dao/kit", tag = "v0.6.8" } tokio = "1.28" [dependencies] -alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "c73e70d", features = [ +alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "bd900b4", features = [ "json-rpc", "rpc-types", "rpc-types-eth", From 290ae3d461ea66ab283b96b919c81f834a367bb9 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Thu, 22 Aug 2024 11:21:44 -0700 Subject: [PATCH 16/22] eth: remove dead code; add error print --- kinode/src/eth/subscription.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index f9240b0a7..21cd4a9e0 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -257,8 +257,6 @@ async fn build_subscription( ) .await; } - let alloy_sub_id = rx.local_id(); - let alloy_sub_id: alloy::primitives::U256 = alloy_sub_id.clone().into(); return Ok(Ok((rx, chain_id))); } Err(rpc_error) => { @@ -389,7 +387,7 @@ async fn maintain_local_subscription( loop { tokio::select! { _ = close_receiver.recv() => { - unsubscribe(rx, &chain_id, providers); + unsubscribe(rx, &chain_id, providers).await; return Ok(()); }, value = rx.recv() => { @@ -424,14 +422,14 @@ async fn maintain_local_subscription( .and_modify(|sub_map| { sub_map.remove(&sub_id); }); - unsubscribe(rx, &chain_id, providers); + unsubscribe(rx, &chain_id, providers).await; Err(EthSubError { id: sub_id, error: format!("subscription ({target}) closed unexpectedly"), }) } -fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) { +async fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) { let alloy_sub_id = rx.local_id(); let alloy_sub_id = alloy_sub_id.clone().into(); let Some(chain_providers) = providers.get_mut(chain_id) else { @@ -441,7 +439,14 @@ fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) { let Some(pubsub) = url.pubsub.as_ref() else { continue; }; - let x = pubsub.unsubscribe(alloy_sub_id); + if let Err(err) = pubsub.unsubscribe(alloy_sub_id) { + let _ = print_tx + .send(Printout { + verbosity: 0, + content: "unsubscribe from ETH RPC failed".to_string(), + }) + .await; + } } } From e93eaf3ed206361ea34b6450daabb46462c96164 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Thu, 22 Aug 2024 11:23:32 -0700 Subject: [PATCH 17/22] kns/app_store: change delay to 1s --- kinode/packages/app_store/chain/src/lib.rs | 2 +- kinode/packages/kns_indexer/kns_indexer/src/lib.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index 87a9768e4..42ef424d1 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -45,7 +45,7 @@ const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK; #[cfg(feature = "simulation-mode")] const KIMAP_FIRST_BLOCK: u64 = 1; -const DELAY_MS: u64 = 5_000; +const DELAY_MS: u64 = 1_000; // 1s #[derive(Debug, Serialize, Deserialize)] pub struct State { diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 068662ecb..2216bc241 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -37,7 +37,7 @@ const KIMAP_FIRST_BLOCK: u64 = 1; // local const MAX_PENDING_ATTEMPTS: u8 = 3; const SUBSCRIPTION_TIMEOUT: u64 = 60; -const NEW_BLOCK_TICK: u64 = 3000; // 3s +const DELAY_MS: u64 = 1_000; // 1s #[derive(Clone, Debug, Serialize, Deserialize)] struct State { @@ -258,7 +258,7 @@ fn handle_eth_message( handle_pending_notes(state, pending_notes)?; if !pending_notes.is_empty() { - timer::set_timer(NEW_BLOCK_TICK, None); + timer::set_timer(DELAY_MS, None); } Ok(()) From b2479fa07b4dddc42824884a31e62fa722549f3b Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Thu, 22 Aug 2024 11:27:03 -0700 Subject: [PATCH 18/22] kns: change print verbosity --- kinode/packages/kns_indexer/kns_indexer/src/lib.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 2216bc241..4b93a38b3 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -236,10 +236,7 @@ fn handle_eth_message( } } Ok(Err(e)) => { - print_to_terminal( - 0, - &format!("got eth subscription error ({e:?}), resubscribing"), - ); + println!("got eth subscription error ({e:?}), resubscribing"); if e.id == 1 { eth_provider.subscribe_loop(1, mints_filter.clone()); } else if e.id == 2 { @@ -251,7 +248,7 @@ fn handle_eth_message( if tick { let block_number = eth_provider.get_block_number(); if let Ok(block_number) = block_number { - print_to_terminal(1, &format!("new block: {}", block_number)); + print_to_terminal(2, &format!("new block: {}", block_number)); state.last_block = block_number; } } @@ -279,10 +276,7 @@ fn handle_pending_notes( for (note, attempt) in notes.drain(..) { if attempt >= MAX_PENDING_ATTEMPTS { // skip notes that have exceeded max attempts - print_to_terminal( - 1, - &format!("dropping note from block {block} after {attempt} attempts"), - ); + println!("dropping note from block {block} after {attempt} attempts"); continue; } if let Err(e) = handle_note(state, ¬e) { @@ -441,7 +435,7 @@ fn handle_log( } if let Some(block_number) = log.block_number { print_to_terminal( - 0, + 1, &format!("adding note to pending_notes for block {block_number}"), ); pending_notes From 5a63d1a42fe2b1e1357632bacfe17b7b8aa85e76 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Thu, 22 Aug 2024 11:40:46 -0700 Subject: [PATCH 19/22] use forked alloy with working unsubscribe --- kinode/Cargo.toml | 2 +- lib/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index 1409ab1b1..eb8d02727 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -26,7 +26,7 @@ simulation-mode = [] [dependencies] aes-gcm = "0.10.3" -alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "bd900b4", features = [ +alloy = { git = "https://github.com/kinode-dao/alloy.git", rev = "e672f3e", features = [ "consensus", "contract", "json-rpc", diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 811a5c9aa..704ace6c7 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -15,7 +15,7 @@ kit = { git = "https://github.com/kinode-dao/kit", tag = "v0.6.8" } tokio = "1.28" [dependencies] -alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "bd900b4", features = [ +alloy = { git = "https://github.com/kinode-dao/alloy.git", rev = "e672f3e", features = [ "json-rpc", "rpc-types", "rpc-types-eth", From 7678e1e01d96f039d03e05b7a3ac60ed9e3de6d9 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Thu, 22 Aug 2024 12:00:01 -0700 Subject: [PATCH 20/22] fix some bugs --- Cargo.lock | 38 +++++++++++++++++----------------- kinode/src/eth/subscription.rs | 15 ++++++++++---- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58e0870f7..e90bd38a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,7 +111,7 @@ dependencies = [ [[package]] name = "alloy" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-consensus 0.2.1", "alloy-contract", @@ -173,7 +173,7 @@ dependencies = [ [[package]] name = "alloy-consensus" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-eips 0.2.1", "alloy-primitives", @@ -186,7 +186,7 @@ dependencies = [ [[package]] name = "alloy-contract" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", @@ -263,7 +263,7 @@ dependencies = [ [[package]] name = "alloy-eips" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-primitives", "alloy-rlp", @@ -298,7 +298,7 @@ dependencies = [ [[package]] name = "alloy-genesis" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-primitives", "alloy-serde 0.2.1", @@ -345,7 +345,7 @@ dependencies = [ [[package]] name = "alloy-json-rpc" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -378,7 +378,7 @@ dependencies = [ [[package]] name = "alloy-network" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-consensus 0.2.1", "alloy-eips 0.2.1", @@ -398,7 +398,7 @@ dependencies = [ [[package]] name = "alloy-network-primitives" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-primitives", "alloy-serde 0.2.1", @@ -462,7 +462,7 @@ dependencies = [ [[package]] name = "alloy-provider" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-chains", "alloy-consensus 0.2.1", @@ -497,7 +497,7 @@ dependencies = [ [[package]] name = "alloy-pubsub" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-json-rpc 0.2.1", "alloy-primitives", @@ -558,7 +558,7 @@ dependencies = [ [[package]] name = "alloy-rpc-client" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-json-rpc 0.2.1", "alloy-primitives", @@ -609,7 +609,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-rpc-types-eth 0.2.1", "alloy-serde 0.2.1", @@ -637,7 +637,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types-eth" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-consensus 0.2.1", "alloy-eips 0.2.1", @@ -676,7 +676,7 @@ dependencies = [ [[package]] name = "alloy-serde" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-primitives", "serde", @@ -700,7 +700,7 @@ dependencies = [ [[package]] name = "alloy-signer" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-primitives", "async-trait", @@ -713,7 +713,7 @@ dependencies = [ [[package]] name = "alloy-signer-local" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-consensus 0.2.1", "alloy-network 0.2.1", @@ -838,7 +838,7 @@ dependencies = [ [[package]] name = "alloy-transport" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-json-rpc 0.2.1", "base64 0.22.1", @@ -871,7 +871,7 @@ dependencies = [ [[package]] name = "alloy-transport-http" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-json-rpc 0.2.1", "alloy-transport 0.2.1", @@ -885,7 +885,7 @@ dependencies = [ [[package]] name = "alloy-transport-ws" version = "0.2.1" -source = "git+https://github.com/bitful-pannul/alloy.git?rev=bd900b4#bd900b4564410c376c811629afcf8e269a0a0896" +source = "git+https://github.com/kinode-dao/alloy.git?rev=e672f3e#e672f3e9be2720a76c1f6aba45243db0187cdccb" dependencies = [ "alloy-pubsub", "alloy-transport 0.2.1", diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 21cd4a9e0..a49cbf972 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -92,6 +92,7 @@ pub async fn create_new_subscription( chain_id, &providers, close_receiver, + &print_tx, ) .await; let Err(e) = r else { @@ -383,11 +384,12 @@ async fn maintain_local_subscription( chain_id: u64, providers: &Providers, mut close_receiver: tokio::sync::mpsc::Receiver, + print_tx: &PrintSender, ) -> Result<(), EthSubError> { loop { tokio::select! { _ = close_receiver.recv() => { - unsubscribe(rx, &chain_id, providers).await; + unsubscribe(rx, &chain_id, providers, print_tx).await; return Ok(()); }, value = rx.recv() => { @@ -422,14 +424,19 @@ async fn maintain_local_subscription( .and_modify(|sub_map| { sub_map.remove(&sub_id); }); - unsubscribe(rx, &chain_id, providers).await; + unsubscribe(rx, &chain_id, providers, print_tx).await; Err(EthSubError { id: sub_id, error: format!("subscription ({target}) closed unexpectedly"), }) } -async fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) { +async fn unsubscribe( + rx: RawSubscription, + chain_id: &u64, + providers: &Providers, + print_tx: &PrintSender, +) { let alloy_sub_id = rx.local_id(); let alloy_sub_id = alloy_sub_id.clone().into(); let Some(chain_providers) = providers.get_mut(chain_id) else { @@ -443,7 +450,7 @@ async fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) let _ = print_tx .send(Printout { verbosity: 0, - content: "unsubscribe from ETH RPC failed".to_string(), + content: format!("unsubscribe from ETH RPC failed: {err:?}"), }) .await; } From 5f635c73932f171068155a4863001e21014d48bc Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Thu, 22 Aug 2024 14:08:34 -0700 Subject: [PATCH 21/22] app_store: retry once on RPCError --- kinode/packages/app_store/chain/src/lib.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index 42ef424d1..70c679c09 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -253,7 +253,6 @@ fn handle_eth_log(our: &Address, state: &mut State, log: eth::Log) -> anyhow::Re // the app store exclusively looks for ~metadata-uri postings: if one is // observed, we then *query* for ~metadata-hash to verify the content // at the URI. - // let metadata_uri = String::from_utf8_lossy(¬e.data).to_string(); let is_our_package = &package_id.publisher() == &our.node(); @@ -263,7 +262,21 @@ fn handle_eth_log(our: &Address, state: &mut State, log: eth::Log) -> anyhow::Re let hash_note = format!("~metadata-hash.{}", note.parent_path); // owner can change which we don't track (yet?) so don't save, need to get when desired - let (tba, _owner, data) = state.kimap.get(&hash_note).map_err(|e| { + let (tba, _owner, data) = match state.kimap.get(&hash_note) { + Ok(gr) => Ok(gr), + Err(e) => match e { + eth::EthError::RpcError(_) => { + // retry on RpcError after DELAY_MS sleep + // sleep here rather than with, e.g., a message to + // `timer:distro:sys` so that events are processed in + // order of receipt + std::thread::sleep(std::time::Duration::from_millis(DELAY_MS)); + state.kimap.get(&hash_note) + } + _ => Err(e), + }, + } + .map_err(|e| { println!("Couldn't find {hash_note}: {e:?}"); anyhow::anyhow!("metadata hash mismatch") })?; From b5f1759a63a09d864321f980283ae267317dc308 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Thu, 22 Aug 2024 14:12:02 -0700 Subject: [PATCH 22/22] app_store: clean up compiler warnings --- kinode/packages/app_store/app_store/src/http_api.rs | 9 +-------- kinode/packages/app_store/app_store/src/state.rs | 2 +- kinode/packages/app_store/app_store/src/utils.rs | 5 +---- kinode/packages/app_store/chain/src/lib.rs | 5 ----- kinode/packages/app_store/download/src/lib.rs | 2 +- kinode/packages/app_store/downloads/src/lib.rs | 10 +++------- 6 files changed, 7 insertions(+), 26 deletions(-) diff --git a/kinode/packages/app_store/app_store/src/http_api.rs b/kinode/packages/app_store/app_store/src/http_api.rs index 321e9f72a..98ea77a30 100644 --- a/kinode/packages/app_store/app_store/src/http_api.rs +++ b/kinode/packages/app_store/app_store/src/http_api.rs @@ -8,7 +8,7 @@ use crate::{ use kinode_process_lib::{ http::{self, server, Method, StatusCode}, - println, Address, LazyLoadBlob, PackageId, Request, + Address, LazyLoadBlob, PackageId, Request, }; use kinode_process_lib::{SendError, SendErrorKind}; use serde_json::json; @@ -226,13 +226,6 @@ fn get_package_id(url_params: &HashMap) -> anyhow::Result) -> anyhow::Result { - let Some(version_hash) = url_params.get("version_hash") else { - return Err(anyhow::anyhow!("Missing version_hash")); - }; - Ok(version_hash.to_string()) -} - fn gen_package_info(id: &PackageId, state: &PackageState) -> serde_json::Value { // installed package info json!({ diff --git a/kinode/packages/app_store/app_store/src/state.rs b/kinode/packages/app_store/app_store/src/state.rs index 23619a026..5027c7a4b 100644 --- a/kinode/packages/app_store/app_store/src/state.rs +++ b/kinode/packages/app_store/app_store/src/state.rs @@ -1,5 +1,5 @@ use crate::{utils, VFS_TIMEOUT}; -use kinode_process_lib::{kimap, println, vfs, PackageId}; +use kinode_process_lib::{kimap, vfs, PackageId}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; diff --git a/kinode/packages/app_store/app_store/src/utils.rs b/kinode/packages/app_store/app_store/src/utils.rs index 6bf4bf200..e04e42b6d 100644 --- a/kinode/packages/app_store/app_store/src/utils.rs +++ b/kinode/packages/app_store/app_store/src/utils.rs @@ -11,10 +11,7 @@ use { get_blob, kernel_types as kt, println, vfs, Address, LazyLoadBlob, PackageId, ProcessId, Request, }, - std::{ - collections::{HashMap, HashSet}, - str::FromStr, - }, + std::collections::{HashMap, HashSet}, }; // quite annoyingly, we must convert from our gen'd version of PackageId diff --git a/kinode/packages/app_store/chain/src/lib.rs b/kinode/packages/app_store/chain/src/lib.rs index 70c679c09..0d596c543 100644 --- a/kinode/packages/app_store/chain/src/lib.rs +++ b/kinode/packages/app_store/chain/src/lib.rs @@ -40,11 +40,6 @@ const KIMAP_ADDRESS: &'static str = kimap::KIMAP_ADDRESS; // optimism #[cfg(feature = "simulation-mode")] const KIMAP_ADDRESS: &str = "0xcA92476B2483aBD5D82AEBF0b56701Bb2e9be658"; -#[cfg(not(feature = "simulation-mode"))] -const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK; -#[cfg(feature = "simulation-mode")] -const KIMAP_FIRST_BLOCK: u64 = 1; - const DELAY_MS: u64 = 1_000; // 1s #[derive(Debug, Serialize, Deserialize)] diff --git a/kinode/packages/app_store/download/src/lib.rs b/kinode/packages/app_store/download/src/lib.rs index cba03b02e..a777f5f6f 100644 --- a/kinode/packages/app_store/download/src/lib.rs +++ b/kinode/packages/app_store/download/src/lib.rs @@ -62,7 +62,7 @@ fn init(our: Address) { }; match response { - DownloadResponses::Error(e) => { + DownloadResponses::Error(_e) => { println!("download: error"); } DownloadResponses::Success => { diff --git a/kinode/packages/app_store/downloads/src/lib.rs b/kinode/packages/app_store/downloads/src/lib.rs index e6f63f4e5..4048bede4 100644 --- a/kinode/packages/app_store/downloads/src/lib.rs +++ b/kinode/packages/app_store/downloads/src/lib.rs @@ -7,17 +7,13 @@ use crate::kinode::process::downloads::{ DownloadResponses, Entry, FileEntry, HashMismatch, LocalDownloadRequest, RemoteDownloadRequest, RemoveFileRequest, }; -use std::{ - collections::{HashMap, HashSet}, - io::Read, - str::FromStr, -}; +use std::{collections::HashSet, io::Read, str::FromStr}; use ft_worker_lib::{spawn_receive_transfer, spawn_send_transfer}; use kinode_process_lib::{ await_message, call_init, get_blob, get_state, http::client, - kernel_types as kt, print_to_terminal, println, set_state, + print_to_terminal, println, set_state, vfs::{self, Directory, File}, Address, Message, PackageId, ProcessId, Request, Response, }; @@ -113,7 +109,7 @@ fn handle_message( state: &mut State, message: &Message, downloads: &mut Directory, - tmp: &mut Directory, + _tmp: &mut Directory, auto_updates: &mut HashSet<(PackageId, String)>, ) -> anyhow::Result<()> { if message.is_request() {