diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 190e97395..cec64a51a 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -322,6 +322,7 @@ async fn handle_message( ) .await; } + Ok(()) } Message::Request(req) => { let timeout = req.expects_response.unwrap_or(60); @@ -330,7 +331,7 @@ async fn handle_message( }; match req { IncomingReq::EthAction(eth_action) => { - return handle_eth_action(state, km, timeout, eth_action).await; + handle_eth_action(state, km, timeout, eth_action).await } IncomingReq::EthConfigAction(eth_config_action) => { kernel_message( @@ -344,29 +345,47 @@ async fn handle_message( &state.send_to_loop, ) .await; + Ok(()) } IncomingReq::EthSubResult(eth_sub_result) => { // forward this to rsvp, if we have the sub id in our active subs let Some(rsvp) = km.rsvp else { + verbose_print( + &state.print_tx, + "eth: got eth_sub_result with no rsvp, ignoring", + ) + .await; return Ok(()); // no rsvp, no need to forward }; let sub_id = match eth_sub_result { Ok(EthSub { id, .. }) => id, Err(EthSubError { id, .. }) => id, }; - if let Some(sub_map) = state.active_subscriptions.get(&rsvp) { - if let Some(ActiveSub::Remote { - provider_node, - sender, - .. - }) = sub_map.get(&sub_id) - { - if provider_node == &km.source.node { - if let Ok(()) = sender.send(eth_sub_result).await { - // successfully sent a subscription update from a - // remote provider to one of our processes - return Ok(()); + if let Some(mut sub_map) = state.active_subscriptions.get_mut(&rsvp) { + if let Some(sub) = sub_map.get(&sub_id) { + if let ActiveSub::Remote { + provider_node, + sender, + .. + } = sub + { + if provider_node == &km.source.node { + if let Ok(()) = sender.send(eth_sub_result).await { + // successfully sent a subscription update from a + // remote provider to one of our processes + return Ok(()); + } } + // failed to send subscription update to process, + // unsubscribe from provider and close + verbose_print( + &state.print_tx, + "eth: got eth_sub_result but provider node did not match or local sub was already closed", + ) + .await; + sub.close(sub_id, state).await; + sub_map.remove(&sub_id); + return Ok(()); } } } @@ -374,13 +393,16 @@ async fn handle_message( // so they can stop sending us updates verbose_print( &state.print_tx, - "eth: got eth_sub_result but no matching sub found, unsubscribing", + &format!( + "eth: got eth_sub_result but no matching sub {} found, unsubscribing", + sub_id + ), ) .await; kernel_message( - &state.our.clone(), + &state.our, km.id, - km.source.clone(), + km.source, None, true, None, @@ -388,6 +410,7 @@ async fn handle_message( &state.send_to_loop, ) .await; + Ok(()) } IncomingReq::SubKeepalive(sub_id) => { // source expects that we have a local sub for them with this id @@ -420,11 +443,11 @@ async fn handle_message( &state.send_to_loop, ) .await; + Ok(()) } } } } - Ok(()) } async fn handle_eth_action( @@ -479,12 +502,32 @@ async fn handle_eth_action( .await; } EthAction::UnsubscribeLogs(sub_id) => { - let mut sub_map = state - .active_subscriptions - .entry(km.source.clone()) - .or_insert(HashMap::new()); + let Some(mut sub_map) = state.active_subscriptions.get_mut(&km.source) else { + verbose_print( + &state.print_tx, + &format!( + "eth: got unsubscribe from {} but no subscription found", + km.source + ), + ) + .await; + error_message( + &state.our, + km.id, + km.source, + EthError::MalformedRequest, + &state.send_to_loop, + ) + .await; + return Ok(()); + }; if let Some(sub) = sub_map.remove(&sub_id) { sub.close(sub_id, state).await; + verbose_print( + &state.print_tx, + &format!("eth: closed subscription {} for {}", sub_id, km.source.node), + ) + .await; kernel_message( &state.our, km.id, @@ -499,7 +542,10 @@ async fn handle_eth_action( } else { verbose_print( &state.print_tx, - "eth: got unsubscribe but no matching subscription found", + &format!( + "eth: got unsubscribe from {} but no subscription {} found", + km.source, sub_id + ), ) .await; error_message( diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs index 781f84415..21e78021c 100644 --- a/kinode/src/eth/subscription.rs +++ b/kinode/src/eth/subscription.rs @@ -113,7 +113,6 @@ pub async fn create_new_subscription( let (keepalive_err_sender, keepalive_err_receiver) = tokio::sync::mpsc::channel(1); response_channels.insert(keepalive_km_id, keepalive_err_sender); - let response_channels = response_channels.clone(); subs.insert( remote_sub_id, ActiveSub::Remote { @@ -471,7 +470,7 @@ async fn maintain_remote_subscription( true, Some(30), IncomingReq::SubKeepalive(remote_sub_id), - &send_to_loop, + send_to_loop, ).await; } _incoming = net_error_rx.recv() => { @@ -488,6 +487,23 @@ async fn maintain_remote_subscription( } } }; + // tell provider node we don't need their services anymore + // (in case they did not close the subscription on their side, + // such as in the 2-hour timeout case) + kernel_message( + our, + rand::random(), + Address { + node: provider_node.to_string(), + process: ETH_PROCESS_ID.clone(), + }, + None, + true, + None, + EthAction::UnsubscribeLogs(remote_sub_id), + send_to_loop, + ) + .await; active_subscriptions .entry(target.clone()) .and_modify(|sub_map| {