From 239f68ec5fefd93138e2aa3307edbe78c56dd67b Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Mon, 19 Aug 2024 12:03:18 -0700 Subject: [PATCH 1/8] kns: add skeleton for pending notes --- kinode/packages/kns_indexer/kns_indexer/src/lib.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 85ddf42e9..9863185ce 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -45,6 +45,12 @@ struct State { // human readable name to most recent on-chain routing information as json // TODO: optional params knsUpdate? also include tba. nodes: HashMap, + // pending notes + // when a new kimap node is minted, notes can come in before + // the mint; in that case, we store the note and process it in + // the next block, since we then have a guarantee the mint has + // been processed + pending_notes: BTreeMap>, // TODO: fill in note type // last block we have an update from last_block: u64, } @@ -71,6 +77,7 @@ fn init(our: Address) { contract_address: KIMAP_ADDRESS.parse::().unwrap(), nodes: HashMap::new(), names: HashMap::new(), + pending_notes: BTreeMap::new(), last_block: KIMAP_FIRST_BLOCK, }; @@ -295,6 +302,8 @@ fn handle_pending_requests( for block in blocks_to_remove.iter() { pending_requests.remove(block); } + + // TODO: handle_log() on each pending_note for blocks older than current block Ok(()) } @@ -339,6 +348,7 @@ fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Resul } let Some(node_name) = get_parent_name(&state.names, &node_hash) else { + // TODO: put in `pending_note` return Err(anyhow::anyhow!("parent node for note not found")); }; From ed34ec71893233aa5211627474e9db737d4fada5 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Mon, 19 Aug 2024 23:05:59 +0300 Subject: [PATCH 2/8] kns_indexer: pending_notes handling --- .../kns_indexer/kns_indexer/src/lib.rs | 239 +++++++++++------- 1 file changed, 148 insertions(+), 91 deletions(-) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 9863185ce..d783eeadd 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -43,14 +43,7 @@ struct State { // namehash to human readable name names: HashMap, // human readable name to most recent on-chain routing information as json - // TODO: optional params knsUpdate? also include tba. nodes: HashMap, - // pending notes - // when a new kimap node is minted, notes can come in before - // the mint; in that case, we store the note and process it in - // the next block, since we then have a guarantee the mint has - // been processed - pending_notes: BTreeMap>, // TODO: fill in note type // last block we have an update from last_block: u64, } @@ -77,7 +70,6 @@ fn init(our: Address) { contract_address: KIMAP_ADDRESS.parse::().unwrap(), nodes: HashMap::new(), names: HashMap::new(), - pending_notes: BTreeMap::new(), last_block: KIMAP_FIRST_BLOCK, }; @@ -132,11 +124,25 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // if block in state is < current_block, get logs from that part. println!("syncing old logs..."); - fetch_and_process_logs(ð_provider, &our, &mut state, mints_filter.clone()); - fetch_and_process_logs(ð_provider, &our, &mut state, notes_filter.clone()); - println!("done syncing old logs."); + // if subscription results come back in the wrong order, we store them here + // until the right block is reached. let mut pending_requests: BTreeMap> = BTreeMap::new(); + let mut pending_notes: BTreeMap> = BTreeMap::new(); + + fetch_and_process_logs( + ð_provider, + &mut state, + mints_filter.clone(), + &mut pending_notes, + ); + fetch_and_process_logs( + ð_provider, + &mut state, + notes_filter.clone(), + &mut pending_notes, + ); + println!("done syncing old logs."); loop { let Ok(message) = await_message() else { @@ -150,10 +156,10 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { if source.process == "eth:distro:sys" { handle_eth_message( - &our, &mut state, ð_provider, &mut pending_requests, + &mut pending_notes, &body, &mints_filter, ¬es_filter, @@ -212,10 +218,10 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { } fn handle_eth_message( - our: &Address, state: &mut State, eth_provider: ð::Provider, pending_requests: &mut BTreeMap>, + pending_notes: &mut BTreeMap>, body: &[u8], mints_filter: ð::Filter, notes_filter: ð::Filter, @@ -223,19 +229,24 @@ fn handle_eth_message( match serde_json::from_slice::(body) { Ok(Ok(eth::EthSub { result, .. })) => { if let eth::SubscriptionResult::Log(log) = result { - if let Err(e) = handle_log(our, state, &log) { - // print errors at verbosity=1 + if let Err(e) = handle_log(state, pending_notes, &log) { print_to_terminal(1, &format!("log-handling error! {e:?}")); } } else if let eth::SubscriptionResult::Header(header) = result { if let Some(block) = header.number { // risque.. + // pending_requests/notes are kicked off with block numbers + // that are ahead of state.last_block. can be risky if event subscriptions and newHeads + // are completely out of sync. state.last_block = block; } } } Ok(Err(e)) => { - println!("got eth subscription error ({e:?}), resubscribing"); + print_to_terminal( + 1, + &format!("got eth subscription error ({e:?}), resubscribing"), + ); if e.id == 1 { eth_provider.subscribe_loop(1, mints_filter.clone()); } else if e.id == 2 { @@ -250,8 +261,7 @@ fn handle_eth_message( } handle_pending_requests(state, pending_requests)?; - - // set_state(&bincode::serialize(state)?); + handle_pending_notes(state, pending_notes)?; Ok(()) } @@ -303,12 +313,120 @@ fn handle_pending_requests( pending_requests.remove(block); } - // TODO: handle_log() on each pending_note for blocks older than current block Ok(()) } -fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Result<()> { - let node_name = match log.topics()[0] { +fn handle_pending_notes( + state: &mut State, + pending_notes: &mut BTreeMap>, +) -> anyhow::Result<()> { + if pending_notes.is_empty() { + return Ok(()); + } + let mut blocks_to_remove = vec![]; + + for (block, notes) in pending_notes.iter() { + // make sure we've seen the whole block + if *block < state.last_block { + for note in notes.iter() { + handle_note(state, note)?; + } + blocks_to_remove.push(*block); + } else { + break; + } + } + for block in blocks_to_remove.iter() { + pending_notes.remove(block); + } + + Ok(()) +} + +fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Result<()> { + let note_label = String::from_utf8(note.label.to_vec())?; + let node_hash = note.parenthash.to_string(); + + if !kimap::valid_note(¬e_label) { + return Err(anyhow::anyhow!("skipping invalid note: {note_label}")); + } + + let Some(node_name) = get_parent_name(&state.names, &node_hash) else { + return Err(anyhow::anyhow!("parent node for note not found")); + }; + + match note_label.as_str() { + "~ws-port" => { + let ws = bytes_to_port(¬e.data)?; + if let Some(node) = state.nodes.get_mut(&node_name) { + node.ports.insert("ws".to_string(), ws); + // port defined, -> direct + node.routers = vec![]; + } + } + "~tcp-port" => { + let tcp = bytes_to_port(¬e.data)?; + if let Some(node) = state.nodes.get_mut(&node_name) { + node.ports.insert("tcp".to_string(), tcp); + // port defined, -> direct + node.routers = vec![]; + } + } + "~net-key" => { + if note.data.len() != 32 { + return Err(anyhow::anyhow!("invalid net-key length")); + } + if let Some(node) = state.nodes.get_mut(&node_name) { + node.public_key = hex::encode(¬e.data); + } + } + "~routers" => { + let routers = decode_routers(¬e.data, state); + if let Some(node) = state.nodes.get_mut(&node_name) { + node.routers = routers; + // -> indirect + node.ports = BTreeMap::new(); + node.ips = vec![]; + } + } + "~ip" => { + let ip = bytes_to_ip(¬e.data)?; + if let Some(node) = state.nodes.get_mut(&node_name) { + node.ips = vec![ip.to_string()]; + // -> direct + node.routers = vec![]; + } + } + _other => { + // Ignore unknown notes + } + } + + // only send an update if we have a *full* set of data for networking: + // a node name, plus either or + + if let Some(node_info) = state.nodes.get(&node_name) { + if !node_info.public_key.is_empty() + && ((!node_info.ips.is_empty() && !node_info.ports.is_empty()) + || node_info.routers.len() > 0) + { + Request::to(("our", "net", "distro", "sys")) + .body(rmp_serde::to_vec(&net::NetAction::KnsUpdate( + node_info.clone(), + ))?) + .send()?; + } + } + + Ok(()) +} + +fn handle_log( + state: &mut State, + pending_notes: &mut BTreeMap>, + log: ð::Log, +) -> anyhow::Result<()> { + match log.topics()[0] { kimap::contract::Mint::SIGNATURE_HASH => { let decoded = kimap::contract::Mint::decode_log_data(log.data(), true).unwrap(); let parent_hash = decoded.parenthash.to_string(); @@ -335,73 +453,27 @@ fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Resul routers: Vec::new(), }, ); - full_name } kimap::contract::Note::SIGNATURE_HASH => { let decoded = kimap::contract::Note::decode_log_data(log.data(), true).unwrap(); - - let note = String::from_utf8(decoded.label.to_vec())?; + let note: String = String::from_utf8(decoded.label.to_vec())?; let node_hash = decoded.parenthash.to_string(); if !kimap::valid_note(¬e) { return Err(anyhow::anyhow!("skipping invalid note: {note}")); } - let Some(node_name) = get_parent_name(&state.names, &node_hash) else { - // TODO: put in `pending_note` + let Some(_node_name) = get_parent_name(&state.names, &node_hash) else { + pending_notes + .entry(log.block_number.unwrap()) + .or_insert(vec![]) + .push(decoded); return Err(anyhow::anyhow!("parent node for note not found")); }; - match note.as_str() { - "~ws-port" => { - let ws = bytes_to_port(&decoded.data)?; - if let Some(node) = state.nodes.get_mut(&node_name) { - node.ports.insert("ws".to_string(), ws); - // port defined, -> direct - node.routers = vec![]; - } - } - "~tcp-port" => { - let tcp = bytes_to_port(&decoded.data)?; - if let Some(node) = state.nodes.get_mut(&node_name) { - node.ports.insert("tcp".to_string(), tcp); - // port defined, -> direct - node.routers = vec![]; - } - } - "~net-key" => { - if decoded.data.len() != 32 { - return Err(anyhow::anyhow!("invalid net-key length")); - } - if let Some(node) = state.nodes.get_mut(&node_name) { - node.public_key = decoded.data.to_string(); - } - } - "~routers" => { - let routers = decode_routers(&decoded.data, &state); - if let Some(node) = state.nodes.get_mut(&node_name) { - node.routers = routers; - // -> indirect - node.ports = BTreeMap::new(); - node.ips = vec![]; - }; - } - "~ip" => { - let ip = bytes_to_ip(&decoded.data)?; - if let Some(node) = state.nodes.get_mut(&node_name) { - node.ips = vec![ip.to_string()]; - // -> direct - node.routers = vec![]; - }; - } - _other => { - // println!("unknown note: {other}"); - } - } - node_name + handle_note(state, &decoded)?; } _log => { - // println!("unknown log: {log:?}"); return Ok(()); } }; @@ -410,21 +482,6 @@ fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Resul state.last_block = block; } - // only send an update if we have a *full* set of data for networking: - // a node name, plus either or - - if let Some(node_info) = state.nodes.get(&node_name) { - if !node_info.public_key.is_empty() - && ((!node_info.ips.is_empty() && !node_info.ports.is_empty()) - || node_info.routers.len() > 0) - { - Request::to((&our.node, "net", "distro", "sys")) - .body(rmp_serde::to_vec(&net::NetAction::KnsUpdate( - node_info.clone(), - ))?) - .send()?; - } - } Ok(()) } @@ -432,16 +489,16 @@ fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Resul fn fetch_and_process_logs( eth_provider: ð::Provider, - our: &Address, state: &mut State, filter: eth::Filter, + pending_notes: &mut BTreeMap>, ) { let filter = filter.from_block(KIMAP_FIRST_BLOCK); loop { match eth_provider.get_logs(&filter) { Ok(logs) => { for log in logs { - if let Err(e) = handle_log(our, state, &log) { + if let Err(e) = handle_log(state, pending_notes, &log) { print_to_terminal(1, &format!("log-handling error! {e:?}")); } } From 02f6ded88e1ee2b652ff3b59791c80d86126a7b3 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Mon, 19 Aug 2024 23:44:59 +0300 Subject: [PATCH 3/8] kns_indexer: print fix --- kinode/packages/kns_indexer/kns_indexer/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index d783eeadd..ebf184fed 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -149,8 +149,6 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { continue; }; let Message::Request { source, body, .. } = message else { - // TODO we could store the subscription ID for eth - // in case we want to cancel/reset it continue; }; @@ -468,7 +466,9 @@ fn handle_log( .entry(log.block_number.unwrap()) .or_insert(vec![]) .push(decoded); - return Err(anyhow::anyhow!("parent node for note not found")); + return Err(anyhow::anyhow!( + "parent node for note not found, storing in pending_notes" + )); }; handle_note(state, &decoded)?; From 687312b5ed318728d974c69332925ef3f6b13913 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Mon, 19 Aug 2024 15:03:28 -0700 Subject: [PATCH 4/8] kns: protect from block ticker out-of-order --- Cargo.lock | 24 ++++++- .../kns_indexer/kns_indexer/Cargo.toml | 2 +- .../kns_indexer/kns_indexer/src/lib.rs | 70 ++++++++++++------- 3 files changed, 70 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1bc1f703e..32d59ccfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3411,6 +3411,28 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "kinode_process_lib" +version = "0.9.0" +source = "git+https://github.com/kinode-dao/process_lib?rev=64b4d9e#64b4d9ea06a5271708c6bf0a604634702e2ccfee" +dependencies = [ + "alloy", + "alloy-primitives", + "alloy-sol-macro", + "alloy-sol-types", + "anyhow", + "bincode", + "http 1.1.0", + "mime_guess", + "rand 0.8.5", + "rmp-serde", + "serde", + "serde_json", + "thiserror", + "url", + "wit-bindgen", +] + [[package]] name = "kit" version = "0.6.8" @@ -3484,7 +3506,7 @@ dependencies = [ "anyhow", "bincode", "hex", - "kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)", + "kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?rev=64b4d9e)", "rmp-serde", "serde", "serde_json", diff --git a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml index 437acb47e..797292773 100644 --- a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml +++ b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml @@ -12,7 +12,7 @@ alloy-primitives = "0.7.0" alloy-sol-types = "0.7.0" bincode = "1.3.3" hex = "0.4.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.9.0" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "64b4d9e" } rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index ebf184fed..df162bdcb 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -20,6 +20,9 @@ wit_bindgen::generate!({ additional_derives: [serde::Deserialize, serde::Serialize], }); +type PendingNotes = BTreeMap>; +const MAX_PENDING_ATTEMPTS: u8 = 3; + #[cfg(not(feature = "simulation-mode"))] const KIMAP_ADDRESS: &'static str = kimap::KIMAP_ADDRESS; // optimism #[cfg(feature = "simulation-mode")] @@ -128,7 +131,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // if subscription results come back in the wrong order, we store them here // until the right block is reached. let mut pending_requests: BTreeMap> = BTreeMap::new(); - let mut pending_notes: BTreeMap> = BTreeMap::new(); + let mut pending_notes: PendingNotes = BTreeMap::new(); fetch_and_process_logs( ð_provider, @@ -219,7 +222,7 @@ fn handle_eth_message( state: &mut State, eth_provider: ð::Provider, pending_requests: &mut BTreeMap>, - pending_notes: &mut BTreeMap>, + pending_notes: &mut PendingNotes, body: &[u8], mints_filter: ð::Filter, notes_filter: ð::Filter, @@ -314,20 +317,17 @@ fn handle_pending_requests( Ok(()) } -fn handle_pending_notes( - state: &mut State, - pending_notes: &mut BTreeMap>, -) -> anyhow::Result<()> { +fn handle_pending_notes(state: &mut State, pending_notes: &mut PendingNotes) -> anyhow::Result<()> { if pending_notes.is_empty() { return Ok(()); } let mut blocks_to_remove = vec![]; - for (block, notes) in pending_notes.iter() { + for (block, notes) in pending_notes.clone().iter() { // make sure we've seen the whole block if *block < state.last_block { - for note in notes.iter() { - handle_note(state, note)?; + for (note, _) in notes.iter() { + handle_note(state, note, pending_notes)?; } blocks_to_remove.push(*block); } else { @@ -341,7 +341,11 @@ fn handle_pending_notes( Ok(()) } -fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Result<()> { +fn handle_note( + state: &mut State, + note: &kimap::contract::Note, + pending_notes: &mut PendingNotes, +) -> anyhow::Result<()> { let note_label = String::from_utf8(note.label.to_vec())?; let node_hash = note.parenthash.to_string(); @@ -350,7 +354,35 @@ fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Resul } let Some(node_name) = get_parent_name(&state.names, &node_hash) else { - return Err(anyhow::anyhow!("parent node for note not found")); + // give note MAX_PENDING_ATTEMPTS attempts to be loaded into state + // 1. a node that is minted and noted in same block may issue us note + // before mint + // 2. our ticking block number may be issued before mint + // therefore need to allow multiple attempts to get a note in for worst case + let Some(pending_notes_for_block) = pending_notes.get_mut(&state.last_block) else { + let mut pending_notes_for_block = HashMap::new(); + pending_notes_for_block.insert(note, 0); + return Err(anyhow::anyhow!( + "parent node for note not found, storing in pending_notes (attempt 0 of {})", + MAX_PENDING_ATTEMPTS, + )); + }; + let Some(attempt) = pending_notes_for_block.remove(note) else { + pending_notes_for_block.insert(note.clone(), 0); + return Err(anyhow::anyhow!( + "parent node for note not found, storing in pending_notes (attempt 0 of {})", + MAX_PENDING_ATTEMPTS, + )); + }; + if attempt >= MAX_PENDING_ATTEMPTS { + return Err(anyhow::anyhow!("parent node for note not found")); + } + pending_notes_for_block.insert(note.clone(), attempt + 1); + return Err(anyhow::anyhow!( + "parent node for note not found, storing in pending_notes (attempt {} of {})", + attempt + 1, + MAX_PENDING_ATTEMPTS, + )); }; match note_label.as_str() { @@ -421,7 +453,7 @@ fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Resul fn handle_log( state: &mut State, - pending_notes: &mut BTreeMap>, + pending_notes: &mut PendingNotes, log: ð::Log, ) -> anyhow::Result<()> { match log.topics()[0] { @@ -461,17 +493,7 @@ fn handle_log( return Err(anyhow::anyhow!("skipping invalid note: {note}")); } - let Some(_node_name) = get_parent_name(&state.names, &node_hash) else { - pending_notes - .entry(log.block_number.unwrap()) - .or_insert(vec![]) - .push(decoded); - return Err(anyhow::anyhow!( - "parent node for note not found, storing in pending_notes" - )); - }; - - handle_note(state, &decoded)?; + handle_note(state, &decoded, pending_notes)?; } _log => { return Ok(()); @@ -491,7 +513,7 @@ fn fetch_and_process_logs( eth_provider: ð::Provider, state: &mut State, filter: eth::Filter, - pending_notes: &mut BTreeMap>, + pending_notes: &mut PendingNotes, ) { let filter = filter.from_block(KIMAP_FIRST_BLOCK); loop { From 54ae3837ea91ff743e56dbf1002ae0b8b432bfda Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Tue, 20 Aug 2024 01:37:46 +0300 Subject: [PATCH 5/8] kns_indexer: refactor note attempts --- .../kns_indexer/kns_indexer/src/lib.rs | 111 +++++++++--------- 1 file changed, 58 insertions(+), 53 deletions(-) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index df162bdcb..32541b302 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -20,9 +20,6 @@ wit_bindgen::generate!({ additional_derives: [serde::Deserialize, serde::Serialize], }); -type PendingNotes = BTreeMap>; -const MAX_PENDING_ATTEMPTS: u8 = 3; - #[cfg(not(feature = "simulation-mode"))] const KIMAP_ADDRESS: &'static str = kimap::KIMAP_ADDRESS; // optimism #[cfg(feature = "simulation-mode")] @@ -38,6 +35,8 @@ const KIMAP_FIRST_BLOCK: u64 = kimap::KIMAP_FIRST_BLOCK; // optimism #[cfg(feature = "simulation-mode")] const KIMAP_FIRST_BLOCK: u64 = 1; // local +const MAX_PENDING_ATTEMPTS: u8 = 3; + #[derive(Clone, Debug, Serialize, Deserialize)] struct State { chain_id: u64, @@ -131,7 +130,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // if subscription results come back in the wrong order, we store them here // until the right block is reached. let mut pending_requests: BTreeMap> = BTreeMap::new(); - let mut pending_notes: PendingNotes = BTreeMap::new(); + let mut pending_notes: BTreeMap> = BTreeMap::new(); fetch_and_process_logs( ð_provider, @@ -222,7 +221,7 @@ fn handle_eth_message( state: &mut State, eth_provider: ð::Provider, pending_requests: &mut BTreeMap>, - pending_notes: &mut PendingNotes, + pending_notes: &mut BTreeMap>, body: &[u8], mints_filter: ð::Filter, notes_filter: ð::Filter, @@ -317,35 +316,62 @@ fn handle_pending_requests( Ok(()) } -fn handle_pending_notes(state: &mut State, pending_notes: &mut PendingNotes) -> anyhow::Result<()> { +fn handle_pending_notes( + state: &mut State, + pending_notes: &mut BTreeMap>, +) -> anyhow::Result<()> { if pending_notes.is_empty() { return Ok(()); } let mut blocks_to_remove = vec![]; + let mut notes_to_retry = Vec::new(); - for (block, notes) in pending_notes.clone().iter() { - // make sure we've seen the whole block + for (block, notes) in pending_notes.iter_mut() { if *block < state.last_block { - for (note, _) in notes.iter() { - handle_note(state, note, pending_notes)?; + let mut keep_notes = Vec::new(); + for (note, attempt) in notes.drain(..) { + if attempt >= MAX_PENDING_ATTEMPTS { + continue; // skip notes that have exceeded max attempts + } + if let Err(e) = handle_note(state, ¬e) { + print_to_terminal( + 1, + &format!("pending note handling error! {e:?}, attempt {attempt}"), + ); + keep_notes.push((note, attempt + 1)); + } + } + if keep_notes.is_empty() { + blocks_to_remove.push(*block); + } else { + *notes = keep_notes; } - blocks_to_remove.push(*block); } else { - break; + notes_to_retry.extend( + notes + .drain(..) + .map(|(note, attempt)| (*block, note, attempt)), + ); } } - for block in blocks_to_remove.iter() { - pending_notes.remove(block); + + // remove processed blocks + for block in blocks_to_remove { + pending_notes.remove(&block); + } + + // re-insert notes that need to be retried + for (block, note, attempt) in notes_to_retry { + pending_notes + .entry(block) + .or_default() + .push((note, attempt)); } Ok(()) } -fn handle_note( - state: &mut State, - note: &kimap::contract::Note, - pending_notes: &mut PendingNotes, -) -> anyhow::Result<()> { +fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Result<()> { let note_label = String::from_utf8(note.label.to_vec())?; let node_hash = note.parenthash.to_string(); @@ -354,35 +380,7 @@ fn handle_note( } let Some(node_name) = get_parent_name(&state.names, &node_hash) else { - // give note MAX_PENDING_ATTEMPTS attempts to be loaded into state - // 1. a node that is minted and noted in same block may issue us note - // before mint - // 2. our ticking block number may be issued before mint - // therefore need to allow multiple attempts to get a note in for worst case - let Some(pending_notes_for_block) = pending_notes.get_mut(&state.last_block) else { - let mut pending_notes_for_block = HashMap::new(); - pending_notes_for_block.insert(note, 0); - return Err(anyhow::anyhow!( - "parent node for note not found, storing in pending_notes (attempt 0 of {})", - MAX_PENDING_ATTEMPTS, - )); - }; - let Some(attempt) = pending_notes_for_block.remove(note) else { - pending_notes_for_block.insert(note.clone(), 0); - return Err(anyhow::anyhow!( - "parent node for note not found, storing in pending_notes (attempt 0 of {})", - MAX_PENDING_ATTEMPTS, - )); - }; - if attempt >= MAX_PENDING_ATTEMPTS { - return Err(anyhow::anyhow!("parent node for note not found")); - } - pending_notes_for_block.insert(note.clone(), attempt + 1); - return Err(anyhow::anyhow!( - "parent node for note not found, storing in pending_notes (attempt {} of {})", - attempt + 1, - MAX_PENDING_ATTEMPTS, - )); + return Err(anyhow::anyhow!("parent node for note not found")); }; match note_label.as_str() { @@ -434,7 +432,6 @@ fn handle_note( // only send an update if we have a *full* set of data for networking: // a node name, plus either or - if let Some(node_info) = state.nodes.get(&node_name) { if !node_info.public_key.is_empty() && ((!node_info.ips.is_empty() && !node_info.ports.is_empty()) @@ -453,7 +450,7 @@ fn handle_note( fn handle_log( state: &mut State, - pending_notes: &mut PendingNotes, + pending_notes: &mut BTreeMap>, log: ð::Log, ) -> anyhow::Result<()> { match log.topics()[0] { @@ -487,13 +484,21 @@ fn handle_log( kimap::contract::Note::SIGNATURE_HASH => { let decoded = kimap::contract::Note::decode_log_data(log.data(), true).unwrap(); let note: String = String::from_utf8(decoded.label.to_vec())?; - let node_hash = decoded.parenthash.to_string(); if !kimap::valid_note(¬e) { return Err(anyhow::anyhow!("skipping invalid note: {note}")); } - handle_note(state, &decoded, pending_notes)?; + if let Err(e) = handle_note(state, &decoded) { + print_to_terminal(1, &format!("note-handling error! {e:?}")); + // If handling fails (likely due to parent not found), add to pending_notes + if let Some(block_number) = log.block_number { + pending_notes + .entry(block_number) + .or_default() + .push((decoded, 0)); + } + } } _log => { return Ok(()); @@ -513,7 +518,7 @@ fn fetch_and_process_logs( eth_provider: ð::Provider, state: &mut State, filter: eth::Filter, - pending_notes: &mut PendingNotes, + pending_notes: &mut BTreeMap>, ) { let filter = filter.from_block(KIMAP_FIRST_BLOCK); loop { From 17e97a35a96429450697c6126df2451458f16fe6 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Tue, 20 Aug 2024 01:54:56 +0300 Subject: [PATCH 6/8] kns_indexer: hotfix --- kinode/packages/kns_indexer/kns_indexer/src/lib.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 32541b302..5f9b06b36 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -346,12 +346,6 @@ fn handle_pending_notes( } else { *notes = keep_notes; } - } else { - notes_to_retry.extend( - notes - .drain(..) - .map(|(note, attempt)| (*block, note, attempt)), - ); } } From 79b00c4c26e098c1c042ed8b99de5d501a51cc6c Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Mon, 19 Aug 2024 16:43:07 -0700 Subject: [PATCH 7/8] kns: add final tweaks --- Cargo.lock | 2 +- .../kns_indexer/kns_indexer/Cargo.toml | 2 +- .../kns_indexer/kns_indexer/src/lib.rs | 61 ++++++++++++------- 3 files changed, 41 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 32d59ccfc..2d0e4e394 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3504,12 +3504,12 @@ dependencies = [ "alloy-primitives", "alloy-sol-types", "anyhow", - "bincode", "hex", "kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?rev=64b4d9e)", "rmp-serde", "serde", "serde_json", + "thiserror", "wit-bindgen", ] diff --git a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml index 797292773..81a9e1e25 100644 --- a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml +++ b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml @@ -10,12 +10,12 @@ simulation-mode = [] anyhow = "1.0" alloy-primitives = "0.7.0" alloy-sol-types = "0.7.0" -bincode = "1.3.3" hex = "0.4.3" kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "64b4d9e" } rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +thiserror = "1.0" wit-bindgen = "0.24.0" [lib] diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 5f9b06b36..5e14c5475 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -58,6 +58,12 @@ enum IndexerResponses { GetState(State), } +#[derive(Debug, thiserror::Error)] +enum KnsError { + #[error("Parent node for note not found")] + NoParentError, +} + call_init!(init); fn init(our: Address) { println!("indexing on contract address {KIMAP_ADDRESS}"); @@ -324,21 +330,34 @@ fn handle_pending_notes( return Ok(()); } let mut blocks_to_remove = vec![]; - let mut notes_to_retry = Vec::new(); for (block, notes) in pending_notes.iter_mut() { if *block < state.last_block { let mut keep_notes = Vec::new(); for (note, attempt) in notes.drain(..) { if attempt >= MAX_PENDING_ATTEMPTS { - continue; // skip notes that have exceeded max attempts - } - if let Err(e) = handle_note(state, ¬e) { + // skip notes that have exceeded max attempts print_to_terminal( 1, - &format!("pending note handling error! {e:?}, attempt {attempt}"), + &format!("dropping note from block {block} after {attempt} attempts"), ); - keep_notes.push((note, attempt + 1)); + continue; + } + if let Err(e) = handle_note(state, ¬e) { + match e.downcast_ref::() { + None => { + print_to_terminal(1, &format!("pending note handling error: {e:?}")) + } + Some(ee) => match ee { + KnsError::NoParentError => { + print_to_terminal( + 1, + &format!("note still awaiting mint; attempt {attempt}"), + ); + keep_notes.push((note, attempt + 1)); + } + }, + } } } if keep_notes.is_empty() { @@ -354,14 +373,6 @@ fn handle_pending_notes( pending_notes.remove(&block); } - // re-insert notes that need to be retried - for (block, note, attempt) in notes_to_retry { - pending_notes - .entry(block) - .or_default() - .push((note, attempt)); - } - Ok(()) } @@ -374,7 +385,7 @@ fn handle_note(state: &mut State, note: &kimap::contract::Note) -> anyhow::Resul } let Some(node_name) = get_parent_name(&state.names, &node_hash) else { - return Err(anyhow::anyhow!("parent node for note not found")); + return Err(KnsError::NoParentError.into()); }; match note_label.as_str() { @@ -484,13 +495,19 @@ fn handle_log( } if let Err(e) = handle_note(state, &decoded) { - print_to_terminal(1, &format!("note-handling error! {e:?}")); - // If handling fails (likely due to parent not found), add to pending_notes - if let Some(block_number) = log.block_number { - pending_notes - .entry(block_number) - .or_default() - .push((decoded, 0)); + match e.downcast_ref::() { + None => print_to_terminal(1, &format!("note handling error: {e:?}")), + Some(ee) => match ee { + KnsError::NoParentError => { + print_to_terminal(1, &format!("note awaiting mint: place in pending")); + if let Some(block_number) = log.block_number { + pending_notes + .entry(block_number) + .or_default() + .push((decoded, 0)); + } + } + }, } } } From 0eff0dcc10d8d3d599b905505a5a508181407e96 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Mon, 19 Aug 2024 16:45:40 -0700 Subject: [PATCH 8/8] kns: fix versions --- Cargo.lock | 24 +------------------ .../kns_indexer/kns_indexer/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2d0e4e394..bd1476831 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3411,28 +3411,6 @@ dependencies = [ "wit-bindgen", ] -[[package]] -name = "kinode_process_lib" -version = "0.9.0" -source = "git+https://github.com/kinode-dao/process_lib?rev=64b4d9e#64b4d9ea06a5271708c6bf0a604634702e2ccfee" -dependencies = [ - "alloy", - "alloy-primitives", - "alloy-sol-macro", - "alloy-sol-types", - "anyhow", - "bincode", - "http 1.1.0", - "mime_guess", - "rand 0.8.5", - "rmp-serde", - "serde", - "serde_json", - "thiserror", - "url", - "wit-bindgen", -] - [[package]] name = "kit" version = "0.6.8" @@ -3505,7 +3483,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "hex", - "kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?rev=64b4d9e)", + "kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)", "rmp-serde", "serde", "serde_json", diff --git a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml index 81a9e1e25..dae91ceb7 100644 --- a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml +++ b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml @@ -11,7 +11,7 @@ anyhow = "1.0" alloy-primitives = "0.7.0" alloy-sol-types = "0.7.0" hex = "0.4.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "64b4d9e" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.9.0" } rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0"