diff --git a/Cargo.lock b/Cargo.lock index 1bc1f703e..bd1476831 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3482,12 +3482,12 @@ dependencies = [ "alloy-primitives", "alloy-sol-types", "anyhow", - "bincode", "hex", "kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)", "rmp-serde", "serde", "serde_json", + "thiserror", "wit-bindgen", ] diff --git a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml index 437acb47e..dae91ceb7 100644 --- a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml +++ b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml @@ -10,12 +10,12 @@ simulation-mode = [] anyhow = "1.0" alloy-primitives = "0.7.0" alloy-sol-types = "0.7.0" -bincode = "1.3.3" hex = "0.4.3" kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.9.0" } rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +thiserror = "1.0" wit-bindgen = "0.24.0" [lib] diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 85ddf42e9..5e14c5475 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -35,6 +35,8 @@ const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK; // optimism #[cfg(feature = "simulation-mode")] const KIMAP_FIRST_BLOCK: u64 = 1; // local +const MAX_PENDING_ATTEMPTS: u8 = 3; + #[derive(Clone, Debug, Serialize, Deserialize)] struct State { chain_id: u64, @@ -43,7 +45,6 @@ struct State { // namehash to human readable name names: HashMap, // human readable name to most recent on-chain routing information as json - // TODO: optional params knsUpdate? also include tba. nodes: HashMap, // last block we have an update from last_block: u64, @@ -57,6 +58,12 @@ enum IndexerResponses { GetState(State), } +#[derive(Debug, thiserror::Error)] +enum KnsError { + #[error("Parent node for note not found")] + NoParentError, +} + call_init!(init); fn init(our: Address) { println!("indexing on contract address {KIMAP_ADDRESS}"); @@ -125,28 +132,40 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // if block in state is < current_block, get logs from that part. println!("syncing old logs..."); - fetch_and_process_logs(ð_provider, &our, &mut state, mints_filter.clone()); - fetch_and_process_logs(ð_provider, &our, &mut state, notes_filter.clone()); - println!("done syncing old logs."); + // if subscription results come back in the wrong order, we store them here + // until the right block is reached. let mut pending_requests: BTreeMap> = BTreeMap::new(); + let mut pending_notes: BTreeMap> = BTreeMap::new(); + + fetch_and_process_logs( + ð_provider, + &mut state, + mints_filter.clone(), + &mut pending_notes, + ); + fetch_and_process_logs( + ð_provider, + &mut state, + notes_filter.clone(), + &mut pending_notes, + ); + println!("done syncing old logs."); loop { let Ok(message) = await_message() else { continue; }; let Message::Request { source, body, .. } = message else { - // TODO we could store the subscription ID for eth - // in case we want to cancel/reset it continue; }; if source.process == "eth:distro:sys" { handle_eth_message( - &our, &mut state, ð_provider, &mut pending_requests, + &mut pending_notes, &body, &mints_filter, ¬es_filter, @@ -205,10 +224,10 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { } fn handle_eth_message( - our: &Address, state: &mut State, eth_provider: ð::Provider, pending_requests: &mut BTreeMap>, + pending_notes: &mut BTreeMap>, body: &[u8], mints_filter: ð::Filter, notes_filter: ð::Filter, @@ -216,19 +235,24 @@ fn handle_eth_message( match serde_json::from_slice::(body) { Ok(Ok(eth::EthSub { result, .. })) => { if let eth::SubscriptionResult::Log(log) = result { - if let Err(e) = handle_log(our, state, &log) { - // print errors at verbosity=1 + if let Err(e) = handle_log(state, pending_notes, &log) { print_to_terminal(1, &format!("log-handling error! {e:?}")); } } else if let eth::SubscriptionResult::Header(header) = result { if let Some(block) = header.number { // risque.. + // pending_requests/notes are kicked off with block numbers + // that are ahead of state.last_block. can be risky if event subscriptions and newHeads + // are completely out of sync. state.last_block = block; } } } Ok(Err(e)) => { - println!("got eth subscription error ({e:?}), resubscribing"); + print_to_terminal( + 1, + &format!("got eth subscription error ({e:?}), resubscribing"), + ); if e.id == 1 { eth_provider.subscribe_loop(1, mints_filter.clone()); } else if e.id == 2 { @@ -243,8 +267,7 @@ fn handle_eth_message( } handle_pending_requests(state, pending_requests)?; - - // set_state(&bincode::serialize(state)?); + handle_pending_notes(state, pending_notes)?; Ok(()) } @@ -295,11 +318,147 @@ fn handle_pending_requests( for block in blocks_to_remove.iter() { pending_requests.remove(block); } + + Ok(()) +} + +fn handle_pending_notes( + state: &mut State, + pending_notes: &mut BTreeMap>, +) -> anyhow::Result<()> { + if pending_notes.is_empty() { + return Ok(()); + } + let mut blocks_to_remove = vec![]; + + for (block, notes) in pending_notes.iter_mut() { + if *block < state.last_block { + let mut keep_notes = Vec::new(); + for (note, attempt) in notes.drain(..) { + if attempt >= MAX_PENDING_ATTEMPTS { + // skip notes that have exceeded max attempts + print_to_terminal( + 1, + &format!("dropping note from block {block} after {attempt} attempts"), + ); + continue; + } + if let Err(e) = handle_note(state, ¬e) { + match e.downcast_ref::() { + None => { + print_to_terminal(1, &format!("pending note handling error: {e:?}")) + } + Some(ee) => match ee { + KnsError::NoParentError => { + print_to_terminal( + 1, + &format!("note still awaiting mint; attempt {attempt}"), + ); + keep_notes.push((note, attempt + 1)); + } + }, + } + } + } + if keep_notes.is_empty() { + blocks_to_remove.push(*block); + } else { + *notes = keep_notes; + } + } + } + + // remove processed blocks + for block in blocks_to_remove { + pending_notes.remove(&block); + } + + Ok(()) +} + +fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Result<()> { + let note_label = String::from_utf8(note.label.to_vec())?; + let node_hash = note.parenthash.to_string(); + + if !kimap::valid_note(¬e_label) { + return Err(anyhow::anyhow!("skipping invalid note: {note_label}")); + } + + let Some(node_name) = get_parent_name(&state.names, &node_hash) else { + return Err(KnsError::NoParentError.into()); + }; + + match note_label.as_str() { + "~ws-port" => { + let ws = bytes_to_port(¬e.data)?; + if let Some(node) = state.nodes.get_mut(&node_name) { + node.ports.insert("ws".to_string(), ws); + // port defined, -> direct + node.routers = vec![]; + } + } + "~tcp-port" => { + let tcp = bytes_to_port(¬e.data)?; + if let Some(node) = state.nodes.get_mut(&node_name) { + node.ports.insert("tcp".to_string(), tcp); + // port defined, -> direct + node.routers = vec![]; + } + } + "~net-key" => { + if note.data.len() != 32 { + return Err(anyhow::anyhow!("invalid net-key length")); + } + if let Some(node) = state.nodes.get_mut(&node_name) { + node.public_key = hex::encode(¬e.data); + } + } + "~routers" => { + let routers = decode_routers(¬e.data, state); + if let Some(node) = state.nodes.get_mut(&node_name) { + node.routers = routers; + // -> indirect + node.ports = BTreeMap::new(); + node.ips = vec![]; + } + } + "~ip" => { + let ip = bytes_to_ip(¬e.data)?; + if let Some(node) = state.nodes.get_mut(&node_name) { + node.ips = vec![ip.to_string()]; + // -> direct + node.routers = vec![]; + } + } + _other => { + // Ignore unknown notes + } + } + + // only send an update if we have a *full* set of data for networking: + // a node name, plus either or + if let Some(node_info) = state.nodes.get(&node_name) { + if !node_info.public_key.is_empty() + && ((!node_info.ips.is_empty() && !node_info.ports.is_empty()) + || node_info.routers.len() > 0) + { + Request::to(("our", "net", "distro", "sys")) + .body(rmp_serde::to_vec(&net::NetAction::KnsUpdate( + node_info.clone(), + ))?) + .send()?; + } + } + Ok(()) } -fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Result<()> { - let node_name = match log.topics()[0] { +fn handle_log( + state: &mut State, + pending_notes: &mut BTreeMap>, + log: ð::Log, +) -> anyhow::Result<()> { + match log.topics()[0] { kimap::contract::Mint::SIGNATURE_HASH => { let decoded = kimap::contract::Mint::decode_log_data(log.data(), true).unwrap(); let parent_hash = decoded.parenthash.to_string(); @@ -326,72 +485,33 @@ fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Resul routers: Vec::new(), }, ); - full_name } kimap::contract::Note::SIGNATURE_HASH => { let decoded = kimap::contract::Note::decode_log_data(log.data(), true).unwrap(); - - let note = String::from_utf8(decoded.label.to_vec())?; - let node_hash = decoded.parenthash.to_string(); + let note: String = String::from_utf8(decoded.label.to_vec())?; if !kimap::valid_note(¬e) { return Err(anyhow::anyhow!("skipping invalid note: {note}")); } - let Some(node_name) = get_parent_name(&state.names, &node_hash) else { - return Err(anyhow::anyhow!("parent node for note not found")); - }; - - match note.as_str() { - "~ws-port" => { - let ws = bytes_to_port(&decoded.data)?; - if let Some(node) = state.nodes.get_mut(&node_name) { - node.ports.insert("ws".to_string(), ws); - // port defined, -> direct - node.routers = vec![]; - } - } - "~tcp-port" => { - let tcp = bytes_to_port(&decoded.data)?; - if let Some(node) = state.nodes.get_mut(&node_name) { - node.ports.insert("tcp".to_string(), tcp); - // port defined, -> direct - node.routers = vec![]; - } - } - "~net-key" => { - if decoded.data.len() != 32 { - return Err(anyhow::anyhow!("invalid net-key length")); - } - if let Some(node) = state.nodes.get_mut(&node_name) { - node.public_key = decoded.data.to_string(); - } - } - "~routers" => { - let routers = decode_routers(&decoded.data, &state); - if let Some(node) = state.nodes.get_mut(&node_name) { - node.routers = routers; - // -> indirect - node.ports = BTreeMap::new(); - node.ips = vec![]; - }; - } - "~ip" => { - let ip = bytes_to_ip(&decoded.data)?; - if let Some(node) = state.nodes.get_mut(&node_name) { - node.ips = vec![ip.to_string()]; - // -> direct - node.routers = vec![]; - }; - } - _other => { - // println!("unknown note: {other}"); + if let Err(e) = handle_note(state, &decoded) { + match e.downcast_ref::() { + None => print_to_terminal(1, &format!("note handling error: {e:?}")), + Some(ee) => match ee { + KnsError::NoParentError => { + print_to_terminal(1, &format!("note awaiting mint: place in pending")); + if let Some(block_number) = log.block_number { + pending_notes + .entry(block_number) + .or_default() + .push((decoded, 0)); + } + } + }, } } - node_name } _log => { - // println!("unknown log: {log:?}"); return Ok(()); } }; @@ -400,21 +520,6 @@ fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Resul state.last_block = block; } - // only send an update if we have a *full* set of data for networking: - // a node name, plus either or - - if let Some(node_info) = state.nodes.get(&node_name) { - if !node_info.public_key.is_empty() - && ((!node_info.ips.is_empty() && !node_info.ports.is_empty()) - || node_info.routers.len() > 0) - { - Request::to((&our.node, "net", "distro", "sys")) - .body(rmp_serde::to_vec(&net::NetAction::KnsUpdate( - node_info.clone(), - ))?) - .send()?; - } - } Ok(()) } @@ -422,16 +527,16 @@ fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Resul fn fetch_and_process_logs( eth_provider: ð::Provider, - our: &Address, state: &mut State, filter: eth::Filter, + pending_notes: &mut BTreeMap>, ) { let filter = filter.from_block(KIMAP_FIRST_BLOCK); loop { match eth_provider.get_logs(&filter) { Ok(logs) => { for log in logs { - if let Err(e) = handle_log(our, state, &log) { + if let Err(e) = handle_log(state, pending_notes, &log) { print_to_terminal(1, &format!("log-handling error! {e:?}")); } }