Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
336 changes: 290 additions & 46 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion kinode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ simulation-mode = []

[dependencies]
aes-gcm = "0.10.3"
alloy = { version = "0.1.3", features = [
#alloy = { version = "0.1.3", features = [
alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "c73e70d", features = [
"consensus",
"contract",
"json-rpc",
Expand Down
2 changes: 2 additions & 0 deletions kinode/packages/app_store/chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow
}
} else {
// attempt to resubscribe
println!("attempting resub");
state
.kimap
.provider
Expand Down Expand Up @@ -341,6 +342,7 @@ pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State) {
let filter = app_store_filter(state);
// get past logs, subscribe to new ones.
// subscribe first so we don't miss any logs
println!("subscribing...");
state.kimap.provider.subscribe_loop(1, filter.clone());
for log in fetch_logs(
&state.kimap.provider,
Expand Down
2 changes: 2 additions & 0 deletions kinode/packages/app_store/ui/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 8 additions & 10 deletions kinode/packages/kns_indexer/kns_indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,8 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
}
}

if !state.listening_newblocks
&& (!pending_requests.is_empty() || !pending_notes.is_empty())
{
print_to_terminal(0, "subscribing to newHeads...");
if !state.listening_newblocks && !pending_requests.is_empty() {
print_to_terminal(0, "subscribing to newHeads for req...");
listen_to_new_blocks_loop(); // sub_id: 3
state.listening_newblocks = true;
}
Expand Down Expand Up @@ -269,6 +267,7 @@ fn handle_eth_message(
} else if e.id == 2 {
eth_provider.subscribe_loop(2, notes_filter.clone());
} else if e.id == 3 {
print_to_terminal(0, "subscribing to newHeads for retry...");
listen_to_new_blocks_loop();
}
}
Expand Down Expand Up @@ -527,16 +526,15 @@ fn handle_log(
.entry(block_number)
.or_default()
.push((decoded, 0));
if !state.listening_newblocks {
print_to_terminal(0, "subscribing to newHeads for note...");
listen_to_new_blocks_loop(); // sub_id: 3
state.listening_newblocks = true;
}
}
}
},
}

if !state.listening_newblocks && !pending_notes.is_empty() {
print_to_terminal(0, "subscribing to newHeads...");
listen_to_new_blocks_loop(); // sub_id: 3
state.listening_newblocks = true;
}
}
}
_log => {
Expand Down
10 changes: 8 additions & 2 deletions kinode/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,13 +507,19 @@ async fn handle_eth_action(
verbose_print(
&state.print_tx,
&format!(
"eth: handling {} from {}",
"eth: handling {} from {}; active_subs len: {:?}",
//"eth: handling {} from {}",
match &eth_action {
EthAction::SubscribeLogs { .. } => "subscribe",
EthAction::UnsubscribeLogs(_) => "unsubscribe",
EthAction::Request { .. } => "request",
},
km.source
km.source,
state
.active_subscriptions
.iter()
.map(|v| v.len())
.collect::<Vec<_>>(),
),
)
.await;
Expand Down
38 changes: 25 additions & 13 deletions kinode/src/eth/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ async fn build_subscription(
)
.await;
}
let alloy_sub_id = rx.local_id();
let alloy_sub_id: alloy::primitives::U256 = alloy_sub_id.clone().into();
println!("{target} making sub {:?}", alloy_sub_id);
return Ok(Ok((rx, chain_id)));
}
Err(rpc_error) => {
Expand Down Expand Up @@ -387,22 +390,12 @@ async fn maintain_local_subscription(
loop {
tokio::select! {
_ = close_receiver.recv() => {
let alloy_sub_id = rx.local_id();
let alloy_sub_id = alloy_sub_id.clone().into();
let Some(chain_providers) = providers.get_mut(&chain_id) else {
return Ok(()); //?
};
for url in chain_providers.urls.iter() {
let Some(pubsub) = url.pubsub.as_ref() else {
continue;
};
let x = pubsub.unsubscribe(alloy_sub_id);
println!("we just tried unsubscribing unsubscribed: {:?}", x);
}
unsubscribe(rx, &chain_id, providers);
return Ok(());
},
value = rx.recv() => {
let Ok(value) = value else {
println!("sub failed: {:?}\r", value.unwrap_err());
break;
};
let result: SubscriptionResult = match serde_json::from_str(value.get()) {
Expand Down Expand Up @@ -433,12 +426,31 @@ async fn maintain_local_subscription(
.and_modify(|sub_map| {
sub_map.remove(&sub_id);
});
unsubscribe(rx, &chain_id, providers);
Err(EthSubError {
id: sub_id,
error: "subscription closed unexpectedly".to_string(),
error: format!("subscription ({target}) closed unexpectedly"),
})
}

fn unsubscribe(rx: RawSubscription, chain_id: &u64, providers: &Providers) {
let alloy_sub_id = rx.local_id();
let alloy_sub_id = alloy_sub_id.clone().into();
let Some(chain_providers) = providers.get_mut(chain_id) else {
return; //?
};
for url in chain_providers.urls.iter() {
let Some(pubsub) = url.pubsub.as_ref() else {
continue;
};
let x = pubsub.unsubscribe(alloy_sub_id);
println!(
"we just tried unsubscribing {:?} unsubscribed: {:?}\r",
alloy_sub_id, x
);
}
}

/// handle the subscription updates from a remote provider,
/// and also perform keepalive checks on that provider.
/// current keepalive is 30s, this can be adjusted as desired
Expand Down
3 changes: 2 additions & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ kit = { git = "https://github.com/kinode-dao/kit", tag = "v0.6.8" }
tokio = "1.28"

[dependencies]
alloy = { version = "0.1.3", features = [
#alloy = { version = "0.1.3", features = [
alloy = { git = "https://github.com/bitful-pannul/alloy.git", rev = "c73e70d", features = [
"json-rpc",
"rpc-types",
"rpc-types-eth",
Expand Down