Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 68 additions & 22 deletions kinode/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ async fn handle_message(
)
.await;
}
Ok(())
}
Message::Request(req) => {
let timeout = req.expects_response.unwrap_or(60);
Expand All @@ -330,7 +331,7 @@ async fn handle_message(
};
match req {
IncomingReq::EthAction(eth_action) => {
return handle_eth_action(state, km, timeout, eth_action).await;
handle_eth_action(state, km, timeout, eth_action).await
}
IncomingReq::EthConfigAction(eth_config_action) => {
kernel_message(
Expand All @@ -344,50 +345,72 @@ async fn handle_message(
&state.send_to_loop,
)
.await;
Ok(())
}
IncomingReq::EthSubResult(eth_sub_result) => {
// forward this to rsvp, if we have the sub id in our active subs
let Some(rsvp) = km.rsvp else {
verbose_print(
&state.print_tx,
"eth: got eth_sub_result with no rsvp, ignoring",
)
.await;
return Ok(()); // no rsvp, no need to forward
};
let sub_id = match eth_sub_result {
Ok(EthSub { id, .. }) => id,
Err(EthSubError { id, .. }) => id,
};
if let Some(sub_map) = state.active_subscriptions.get(&rsvp) {
if let Some(ActiveSub::Remote {
provider_node,
sender,
..
}) = sub_map.get(&sub_id)
{
if provider_node == &km.source.node {
if let Ok(()) = sender.send(eth_sub_result).await {
// successfully sent a subscription update from a
// remote provider to one of our processes
return Ok(());
if let Some(mut sub_map) = state.active_subscriptions.get_mut(&rsvp) {
if let Some(sub) = sub_map.get(&sub_id) {
if let ActiveSub::Remote {
provider_node,
sender,
..
} = sub
{
if provider_node == &km.source.node {
if let Ok(()) = sender.send(eth_sub_result).await {
// successfully sent a subscription update from a
// remote provider to one of our processes
return Ok(());
}
}
// failed to send subscription update to process,
// unsubscribe from provider and close
verbose_print(
&state.print_tx,
"eth: got eth_sub_result but provider node did not match or local sub was already closed",
)
.await;
sub.close(sub_id, state).await;
sub_map.remove(&sub_id);
return Ok(());
}
}
}
// tell the remote provider that we don't have this sub
// so they can stop sending us updates
verbose_print(
&state.print_tx,
"eth: got eth_sub_result but no matching sub found, unsubscribing",
&format!(
"eth: got eth_sub_result but no matching sub {} found, unsubscribing",
sub_id
),
)
.await;
kernel_message(
&state.our.clone(),
&state.our,
km.id,
km.source.clone(),
km.source,
None,
true,
None,
EthAction::UnsubscribeLogs(sub_id),
&state.send_to_loop,
)
.await;
Ok(())
}
IncomingReq::SubKeepalive(sub_id) => {
// source expects that we have a local sub for them with this id
Expand Down Expand Up @@ -420,11 +443,11 @@ async fn handle_message(
&state.send_to_loop,
)
.await;
Ok(())
}
}
}
}
Ok(())
}

async fn handle_eth_action(
Expand Down Expand Up @@ -479,12 +502,32 @@ async fn handle_eth_action(
.await;
}
EthAction::UnsubscribeLogs(sub_id) => {
let mut sub_map = state
.active_subscriptions
.entry(km.source.clone())
.or_insert(HashMap::new());
let Some(mut sub_map) = state.active_subscriptions.get_mut(&km.source) else {
verbose_print(
&state.print_tx,
&format!(
"eth: got unsubscribe from {} but no subscription found",
km.source
),
)
.await;
error_message(
&state.our,
km.id,
km.source,
EthError::MalformedRequest,
&state.send_to_loop,
)
.await;
return Ok(());
};
if let Some(sub) = sub_map.remove(&sub_id) {
sub.close(sub_id, state).await;
verbose_print(
&state.print_tx,
&format!("eth: closed subscription {} for {}", sub_id, km.source.node),
)
.await;
kernel_message(
&state.our,
km.id,
Expand All @@ -499,7 +542,10 @@ async fn handle_eth_action(
} else {
verbose_print(
&state.print_tx,
"eth: got unsubscribe but no matching subscription found",
&format!(
"eth: got unsubscribe from {} but no subscription {} found",
km.source, sub_id
),
)
.await;
error_message(
Expand Down
20 changes: 18 additions & 2 deletions kinode/src/eth/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ pub async fn create_new_subscription(
let (keepalive_err_sender, keepalive_err_receiver) =
tokio::sync::mpsc::channel(1);
response_channels.insert(keepalive_km_id, keepalive_err_sender);
let response_channels = response_channels.clone();
subs.insert(
remote_sub_id,
ActiveSub::Remote {
Expand Down Expand Up @@ -471,7 +470,7 @@ async fn maintain_remote_subscription(
true,
Some(30),
IncomingReq::SubKeepalive(remote_sub_id),
&send_to_loop,
send_to_loop,
).await;
}
_incoming = net_error_rx.recv() => {
Expand All @@ -488,6 +487,23 @@ async fn maintain_remote_subscription(
}
}
};
// tell provider node we don't need their services anymore
// (in case they did not close the subscription on their side,
// such as in the 2-hour timeout case)
kernel_message(
our,
rand::random(),
Address {
node: provider_node.to_string(),
process: ETH_PROCESS_ID.clone(),
},
None,
true,
None,
EthAction::UnsubscribeLogs(remote_sub_id),
send_to_loop,
)
.await;
active_subscriptions
.entry(target.clone())
.and_modify(|sub_map| {
Expand Down