From f8a7e8fc5399ec2d201fbe28fabb8aefa5f9fcd3 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 21 Aug 2024 08:14:36 -0700 Subject: [PATCH 1/4] kns: sub for notes --- kinode/packages/kns_indexer/kns_indexer/src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index aef9788e3..8b32e25c2 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -531,6 +531,12 @@ fn handle_log( } }, } + + if !state.listening_newblocks && !pending_notes.is_empty() { + print_to_terminal(0, "subscribing to newHeads..."); + listen_to_new_blocks_loop(); // sub_id: 3 + state.listening_newblocks = true; + } } } _log => { From fe50f314d96d0c2e9f52fe6fda50d42b4eef0a07 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Wed, 21 Aug 2024 09:48:49 -0700 Subject: [PATCH 2/4] add wip --- kinode/src/eth/mod.rs | 7 +- kinode/src/eth/subscription.rs | 149 +++++++++++++++++++++------------ 2 files changed, 98 insertions(+), 58 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 77812ea2d..f6c67d852 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -100,7 +100,7 @@ type ResponseChannels = Arc>; #[derive(Debug)] enum ActiveSub { - Local(JoinHandle<()>), + Local((tokio::sync::mpsc::Sender, JoinHandle<()>)), Remote { provider_node: String, handle: JoinHandle<()>, @@ -111,8 +111,9 @@ enum ActiveSub { impl ActiveSub { async fn close(&self, sub_id: u64, state: &ModuleState) { match self { - ActiveSub::Local(handle) => { - handle.abort(); + ActiveSub::Local((close_sender, _handle)) => { + close_sender.send(true).await.unwrap(); + //handle.abort(); } ActiveSub::Remote { provider_node, diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 21e78021c..7a8ed7da9 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -1,4 +1,5 @@ use crate::eth::*; +use alloy::primitives::{B256, U256}; use alloy::pubsub::RawSubscription; use alloy::rpc::types::eth::pubsub::SubscriptionResult; @@ -70,40 +71,51 @@ pub async fn create_new_subscription( let send_to_loop = send_to_loop.clone(); let print_tx = print_tx.clone(); let active_subscriptions = active_subscriptions.clone(); + let providers = providers.clone(); + let (close_sender, close_receiver) = tokio::sync::mpsc::channel(1); match maybe_raw_sub { - Ok(rx) => { + Ok((rx, chain_id)) => { subs.insert( sub_id, // this is a local sub, as in, we connect to the rpc endpoint - ActiveSub::Local(tokio::spawn(async move { - // await the subscription error and kill it if so - let e = maintain_local_subscription( - &our, - sub_id, - rx, - &target, - &rsvp, - &send_to_loop, - &active_subscriptions, - ) - .await; - verbose_print( - &print_tx, - &format!("eth: closed local subscription due to error {e:?}"), - ) - .await; - kernel_message( - &our, - rand::random(), - target.clone(), - rsvp, - true, - None, - EthSubResult::Err(e), - &send_to_loop, - ) - .await; - })), + ActiveSub::Local(( + close_sender, + tokio::spawn(async move { + // await the subscription error and kill it if so + let r = maintain_local_subscription( + &our, + sub_id, + rx, + &target, + &rsvp, + &send_to_loop, + &active_subscriptions, + chain_id, + &providers, + close_receiver, + ) + .await; + let Err(e) = r else { + return; + }; + verbose_print( + &print_tx, + &format!("eth: closed local subscription due to error {e:?}"), + ) + .await; + kernel_message( + &our, + rand::random(), + target.clone(), + rsvp, + true, + None, + EthSubResult::Err(e), + &send_to_loop, + ) + .await; + }), + )), ); } Err((provider_node, remote_sub_id)) => { @@ -169,7 +181,7 @@ async fn build_subscription( providers: &Providers, response_channels: &ResponseChannels, print_tx: &PrintSender, -) -> Result, EthError> { +) -> Result, EthError> { let EthAction::SubscribeLogs { chain_id, kind, @@ -244,7 +256,7 @@ async fn build_subscription( ) .await; } - return Ok(Ok(rx)); + return Ok(Ok((rx, chain_id))); } Err(rpc_error) => { verbose_print( @@ -367,38 +379,65 @@ async fn maintain_local_subscription( rsvp: &Option
, send_to_loop: &MessageSender, active_subscriptions: &ActiveSubscriptions, -) -> EthSubError { - while let Ok(value) = rx.recv().await { - let result: SubscriptionResult = match serde_json::from_str(value.get()) { - Ok(res) => res, - Err(e) => { - return EthSubError { - id: sub_id, - error: e.to_string(), + chain_id: u64, + providers: &Providers, + mut close_receiver: tokio::sync::mpsc::Receiver, +) -> Result<(), EthSubError> { + loop { + tokio::select! { + _ = close_receiver.recv() => { + let alloy_sub_id = rx.local_id(); + let alloy_sub_id = alloy_sub_id.into(); + //let alloy_sub_id = rx.local_id().; + //let alloy_sub_id = alloy::primitives::U256::from(alloy_sub_id.clone()); + //let alloy_sub_id: alloy::primitives::Uint<256, 4> = alloy::primitives::Uint::from(alloy_sub_id); + let Some(chain_providers) = providers.get_mut(&chain_id) else { + return Ok(()); //? + }; + for url in chain_providers.urls { + let Some(pubsub) = url.pubsub else { + continue; + }; + pubsub.unsubscribe(alloy_sub_id); } - } - }; - kernel_message( - our, - rand::random(), - target.clone(), - rsvp.clone(), - true, - None, - EthSubResult::Ok(EthSub { id: sub_id, result }), - &send_to_loop, - ) - .await; + return Ok(()); + }, + value = rx.recv() => { + let Ok(value) = value else { + break; + }; + let result: SubscriptionResult = match serde_json::from_str(value.get()) { + Ok(res) => res, + Err(e) => { + return Err(EthSubError { + id: sub_id, + error: e.to_string(), + }); + } + }; + kernel_message( + our, + rand::random(), + target.clone(), + rsvp.clone(), + true, + None, + EthSubResult::Ok(EthSub { id: sub_id, result }), + &send_to_loop, + ) + .await; + }, + } } active_subscriptions .entry(target.clone()) .and_modify(|sub_map| { sub_map.remove(&sub_id); }); - EthSubError { + Err(EthSubError { id: sub_id, error: "subscription closed unexpectedly".to_string(), - } + }) } /// handle the subscription updates from a remote provider, From 9c20cf5c7c1411fc2ae52006805b8a798249fe36 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 21 Aug 2024 16:49:14 +0000 Subject: [PATCH 3/4] Format Rust code using rustfmt --- kinode/src/eth/subscription.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 7a8ed7da9..cdc0bd466 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -100,7 +100,9 @@ pub async fn create_new_subscription( }; verbose_print( &print_tx, - &format!("eth: closed local subscription due to error {e:?}"), + &format!( + "eth: closed local subscription due to error {e:?}" + ), ) .await; kernel_message( From e1106b40973a62f5aaf9463135a96f389f98b278 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Wed, 21 Aug 2024 20:01:45 +0300 Subject: [PATCH 4/4] eth: borrow fixes unsubscribe --- kinode/src/eth/subscription.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index cdc0bd466..d78eac99a 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -1,5 +1,4 @@ use crate::eth::*; -use alloy::primitives::{B256, U256}; use alloy::pubsub::RawSubscription; use alloy::rpc::types::eth::pubsub::SubscriptionResult; @@ -389,18 +388,16 @@ async fn maintain_local_subscription( tokio::select! { _ = close_receiver.recv() => { let alloy_sub_id = rx.local_id(); - let alloy_sub_id = alloy_sub_id.into(); - //let alloy_sub_id = rx.local_id().; - //let alloy_sub_id = alloy::primitives::U256::from(alloy_sub_id.clone()); - //let alloy_sub_id: alloy::primitives::Uint<256, 4> = alloy::primitives::Uint::from(alloy_sub_id); + let alloy_sub_id = alloy_sub_id.clone().into(); let Some(chain_providers) = providers.get_mut(&chain_id) else { return Ok(()); //? }; - for url in chain_providers.urls { - let Some(pubsub) = url.pubsub else { + for url in chain_providers.urls.iter() { + let Some(pubsub) = url.pubsub.as_ref() else { continue; }; - pubsub.unsubscribe(alloy_sub_id); + let x = pubsub.unsubscribe(alloy_sub_id); + println!("we just tried unsubscribing unsubscribed: {:?}", x); } return Ok(()); },