From ba3374e234038ef711f7b14a6c7f484a019828d9 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 5 Mar 2024 17:10:10 -0300 Subject: [PATCH 1/2] change net api to be response-based instead of prints, add scripts for easy usage from term --- Cargo.lock | 40 ++++ Cargo.toml | 1 + .../kns_indexer/kns_indexer/src/lib.rs | 10 +- .../terminal/namehash_to_name/Cargo.toml | 17 ++ .../terminal/namehash_to_name/src/lib.rs | 80 ++++++++ .../terminal/net_diagnostics/Cargo.toml | 17 ++ .../terminal/net_diagnostics/src/lib.rs | 67 +++++++ kinode/packages/terminal/peer/Cargo.toml | 17 ++ kinode/packages/terminal/peer/src/lib.rs | 83 ++++++++ kinode/packages/terminal/peers/Cargo.toml | 17 ++ kinode/packages/terminal/peers/src/lib.rs | 77 ++++++++ kinode/packages/terminal/pkg/scripts.json | 54 +++++- kinode/packages/terminal/terminal/src/lib.rs | 28 ++- kinode/src/eth/mod.rs | 70 ++++--- kinode/src/kernel/mod.rs | 2 +- kinode/src/net/mod.rs | 177 ++++++++---------- kinode/src/net/utils.rs | 2 +- lib/src/core.rs | 21 ++- lib/src/eth.rs | 9 +- 19 files changed, 647 insertions(+), 142 deletions(-) create mode 100644 kinode/packages/terminal/namehash_to_name/Cargo.toml create mode 100644 kinode/packages/terminal/namehash_to_name/src/lib.rs create mode 100644 kinode/packages/terminal/net_diagnostics/Cargo.toml create mode 100644 kinode/packages/terminal/net_diagnostics/src/lib.rs create mode 100644 kinode/packages/terminal/peer/Cargo.toml create mode 100644 kinode/packages/terminal/peer/src/lib.rs create mode 100644 kinode/packages/terminal/peers/Cargo.toml create mode 100644 kinode/packages/terminal/peers/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 872514bc7..ba76a973f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3008,6 +3008,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "namehash_to_name" +version = "0.1.0" +dependencies = [ + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", + "rmp-serde", + "serde", + "wit-bindgen", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -3026,6 +3036,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "net_diagnostics" +version = "0.1.0" +dependencies = [ + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", + "rmp-serde", + "serde", + "wit-bindgen", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -3283,6 +3303,26 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "peer" +version = "0.1.0" +dependencies = [ + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", + "rmp-serde", + "serde", + "wit-bindgen", +] + +[[package]] +name = "peers" +version = "0.1.0" +dependencies = [ + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", + "rmp-serde", + "serde", + "wit-bindgen", +] + [[package]] name = "percent-encoding" version = "2.3.1" diff --git a/Cargo.toml b/Cargo.toml index cbd8822cd..c37931177 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "kinode/packages/kns_indexer/kns_indexer", "kinode/packages/kns_indexer/get_block", "kinode/packages/terminal/terminal", "kinode/packages/terminal/alias", "kinode/packages/terminal/cat", "kinode/packages/terminal/echo", "kinode/packages/terminal/hi", "kinode/packages/terminal/m", "kinode/packages/terminal/top", + "kinode/packages/terminal/namehash_to_name", "kinode/packages/terminal/net_diagnostics", "kinode/packages/terminal/peer", "kinode/packages/terminal/peers", "kinode/packages/tester/tester", "kinode/packages/tester/test_runner", ] default-members = ["lib"] diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index 16b889d1d..3269949e2 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -49,12 +49,12 @@ pub enum IndexerRequests { } #[derive(Clone, Debug, Serialize, Deserialize)] -pub enum NetActions { +pub enum NetAction { KnsUpdate(KnsUpdate), KnsBatchUpdate(Vec), } -impl TryInto> for NetActions { +impl TryInto> for NetAction { type Error = anyhow::Error; fn try_into(self) -> Result, Self::Error> { Ok(rmp_serde::to_vec(&self)?) @@ -172,7 +172,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // shove all state into net::net Request::new() .target((&our.node, "net", "distro", "sys")) - .try_body(NetActions::KnsBatchUpdate( + .try_body(NetAction::KnsBatchUpdate( state.nodes.values().cloned().collect::>(), ))? .send()?; @@ -214,7 +214,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { // shove all state into net::net Request::new() .target((&our.node, "net", "distro", "sys")) - .try_body(NetActions::KnsBatchUpdate( + .try_body(NetAction::KnsBatchUpdate( state.nodes.values().cloned().collect::>(), ))? .send()?; @@ -403,7 +403,7 @@ fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Resul { Request::new() .target((&our.node, "net", "distro", "sys")) - .try_body(NetActions::KnsUpdate(node.clone()))? + .try_body(NetAction::KnsUpdate(node.clone()))? .send()?; } diff --git a/kinode/packages/terminal/namehash_to_name/Cargo.toml b/kinode/packages/terminal/namehash_to_name/Cargo.toml new file mode 100644 index 000000000..b54bcc3e9 --- /dev/null +++ b/kinode/packages/terminal/namehash_to_name/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "namehash_to_name" +version = "0.1.0" +edition = "2021" + + +[dependencies] +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } +rmp-serde = "1.1.2" +serde = { version = "1.0", features = ["derive"] } +wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } + +[lib] +crate-type = ["cdylib"] + +[package.metadata.component] +package = "kinode:process" diff --git a/kinode/packages/terminal/namehash_to_name/src/lib.rs b/kinode/packages/terminal/namehash_to_name/src/lib.rs new file mode 100644 index 000000000..90b041fed --- /dev/null +++ b/kinode/packages/terminal/namehash_to_name/src/lib.rs @@ -0,0 +1,80 @@ +use kinode_process_lib::{ + await_next_request_body, call_init, println, Address, Message, NodeId, Request, +}; +use serde::{Deserialize, Serialize}; + +wit_bindgen::generate!({ + path: "wit", + world: "process", + exports: { + world: Component, + }, +}); + +// types copied from runtime networking core + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Identity { + pub name: NodeId, + pub networking_key: String, + pub ws_routing: Option<(String, u16)>, + pub allowed_routers: Vec, +} + +/// Must be parsed from message pack vector. +/// all Get actions must be sent from local process. used for debugging +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetAction { + /// get a list of peers we are connected to + GetPeers, + /// get the [`Identity`] struct for a single peer + GetPeer(String), + /// get the [`NodeId`] associated with a given namehash, if any + GetName(String), + /// get a user-readable diagnostics string containing networking inforamtion + GetDiagnostics, +} + +/// For now, only sent in response to a ConnectionRequest. +/// Must be parsed from message pack vector +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetResponse { + Accepted(NodeId), + Rejected(NodeId), + /// response to [`NetAction::GetPeers`] + Peers(Vec), + /// response to [`NetAction::GetPeer`] + Peer(Option), + /// response to [`NetAction::GetName`] + Name(Option), + /// response to [`NetAction::GetDiagnostics`]. A user-readable string. + Diagnostics(String), +} + +call_init!(init); + +fn init(_our: Address) { + let Ok(args) = await_next_request_body() else { + println!("failed to get args, aborting"); + return; + }; + let Ok(namehash) = String::from_utf8(args) else { + println!("argument must be a string"); + return; + }; + let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "net", "distro", "sys")) + .body(rmp_serde::to_vec(&NetAction::GetName(namehash.clone())).unwrap()) + .send_and_await_response(5) + else { + println!("failed to get name from networking module"); + return; + }; + let Ok(NetResponse::Name(maybe_name)) = rmp_serde::from_slice(&body) else { + println!("got malformed response from networking module"); + return; + }; + match maybe_name { + Some(name) => println!("{namehash}: {name}"), + None => println!("no name found for {namehash}"), + } +} diff --git a/kinode/packages/terminal/net_diagnostics/Cargo.toml b/kinode/packages/terminal/net_diagnostics/Cargo.toml new file mode 100644 index 000000000..72c8dfd12 --- /dev/null +++ b/kinode/packages/terminal/net_diagnostics/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "net_diagnostics" +version = "0.1.0" +edition = "2021" + + +[dependencies] +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } +rmp-serde = "1.1.2" +serde = { version = "1.0", features = ["derive"] } +wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } + +[lib] +crate-type = ["cdylib"] + +[package.metadata.component] +package = "kinode:process" diff --git a/kinode/packages/terminal/net_diagnostics/src/lib.rs b/kinode/packages/terminal/net_diagnostics/src/lib.rs new file mode 100644 index 000000000..9079fd36b --- /dev/null +++ b/kinode/packages/terminal/net_diagnostics/src/lib.rs @@ -0,0 +1,67 @@ +use kinode_process_lib::{call_init, println, Address, Message, NodeId, Request}; +use serde::{Deserialize, Serialize}; + +wit_bindgen::generate!({ + path: "wit", + world: "process", + exports: { + world: Component, + }, +}); + +// types copied from runtime networking core + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Identity { + pub name: NodeId, + pub networking_key: String, + pub ws_routing: Option<(String, u16)>, + pub allowed_routers: Vec, +} + +/// Must be parsed from message pack vector. +/// all Get actions must be sent from local process. used for debugging +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetAction { + /// get a list of peers we are connected to + GetPeers, + /// get the [`Identity`] struct for a single peer + GetPeer(String), + /// get the [`NodeId`] associated with a given namehash, if any + GetName(String), + /// get a user-readable diagnostics string containing networking inforamtion + GetDiagnostics, +} + +/// For now, only sent in response to a ConnectionRequest. +/// Must be parsed from message pack vector +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetResponse { + Accepted(NodeId), + Rejected(NodeId), + /// response to [`NetAction::GetPeers`] + Peers(Vec), + /// response to [`NetAction::GetPeer`] + Peer(Option), + /// response to [`NetAction::GetName`] + Name(Option), + /// response to [`NetAction::GetDiagnostics`]. A user-readable string. + Diagnostics(String), +} + +call_init!(init); + +fn init(_our: Address) { + let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "net", "distro", "sys")) + .body(rmp_serde::to_vec(&NetAction::GetDiagnostics).unwrap()) + .send_and_await_response(5) + else { + println!("failed to get diagnostics from networking module"); + return; + }; + let Ok(NetResponse::Diagnostics(printout)) = rmp_serde::from_slice(&body) else { + println!("got malformed response from networking module"); + return; + }; + println!("{printout}"); +} diff --git a/kinode/packages/terminal/peer/Cargo.toml b/kinode/packages/terminal/peer/Cargo.toml new file mode 100644 index 000000000..0446cc120 --- /dev/null +++ b/kinode/packages/terminal/peer/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "peer" +version = "0.1.0" +edition = "2021" + + +[dependencies] +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } +rmp-serde = "1.1.2" +serde = { version = "1.0", features = ["derive"] } +wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } + +[lib] +crate-type = ["cdylib"] + +[package.metadata.component] +package = "kinode:process" diff --git a/kinode/packages/terminal/peer/src/lib.rs b/kinode/packages/terminal/peer/src/lib.rs new file mode 100644 index 000000000..9aa05f430 --- /dev/null +++ b/kinode/packages/terminal/peer/src/lib.rs @@ -0,0 +1,83 @@ +use kinode_process_lib::{ + await_next_request_body, call_init, println, Address, Message, NodeId, Request, +}; +use serde::{Deserialize, Serialize}; + +wit_bindgen::generate!({ + path: "wit", + world: "process", + exports: { + world: Component, + }, +}); + +// types copied from runtime networking core + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Identity { + pub name: NodeId, + pub networking_key: String, + pub ws_routing: Option<(String, u16)>, + pub allowed_routers: Vec, +} + +/// Must be parsed from message pack vector. +/// all Get actions must be sent from local process. used for debugging +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetAction { + /// get a list of peers we are connected to + GetPeers, + /// get the [`Identity`] struct for a single peer + GetPeer(String), + /// get the [`NodeId`] associated with a given namehash, if any + GetName(String), + /// get a user-readable diagnostics string containing networking inforamtion + GetDiagnostics, +} + +/// For now, only sent in response to a ConnectionRequest. +/// Must be parsed from message pack vector +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetResponse { + Accepted(NodeId), + Rejected(NodeId), + /// response to [`NetAction::GetPeers`] + Peers(Vec), + /// response to [`NetAction::GetPeer`] + Peer(Option), + /// response to [`NetAction::GetName`] + Name(Option), + /// response to [`NetAction::GetDiagnostics`]. A user-readable string. + Diagnostics(String), +} + +call_init!(init); + +fn init(_our: Address) { + let Ok(args) = await_next_request_body() else { + println!("failed to get args, aborting"); + return; + }; + let Ok(name) = String::from_utf8(args) else { + println!("argument must be a string"); + return; + }; + let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "net", "distro", "sys")) + .body(rmp_serde::to_vec(&NetAction::GetPeer(name.clone())).unwrap()) + .send_and_await_response(5) + else { + println!("failed to get response from networking module"); + return; + }; + let Ok(NetResponse::Peer(maybe_peer_id)) = rmp_serde::from_slice(&body) else { + println!("got malformed response from networking module"); + return; + }; + match maybe_peer_id { + Some(peer_id) => println!( + "peer identity for {}:\n networking key: {}\n routing: {:?}\n routers: {:?}", + peer_id.name, peer_id.networking_key, peer_id.ws_routing, peer_id.allowed_routers + ), + None => println!("no PKI entry found with name {name}"), + } +} diff --git a/kinode/packages/terminal/peers/Cargo.toml b/kinode/packages/terminal/peers/Cargo.toml new file mode 100644 index 000000000..269016293 --- /dev/null +++ b/kinode/packages/terminal/peers/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "peers" +version = "0.1.0" +edition = "2021" + + +[dependencies] +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } +rmp-serde = "1.1.2" +serde = { version = "1.0", features = ["derive"] } +wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } + +[lib] +crate-type = ["cdylib"] + +[package.metadata.component] +package = "kinode:process" diff --git a/kinode/packages/terminal/peers/src/lib.rs b/kinode/packages/terminal/peers/src/lib.rs new file mode 100644 index 000000000..bc324e222 --- /dev/null +++ b/kinode/packages/terminal/peers/src/lib.rs @@ -0,0 +1,77 @@ +use kinode_process_lib::{call_init, println, Address, Message, NodeId, Request}; +use serde::{Deserialize, Serialize}; + +wit_bindgen::generate!({ + path: "wit", + world: "process", + exports: { + world: Component, + }, +}); + +// types copied from runtime networking core + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Identity { + pub name: NodeId, + pub networking_key: String, + pub ws_routing: Option<(String, u16)>, + pub allowed_routers: Vec, +} + +/// Must be parsed from message pack vector. +/// all Get actions must be sent from local process. used for debugging +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetAction { + /// get a list of peers we are connected to + GetPeers, + /// get the [`Identity`] struct for a single peer + GetPeer(String), + /// get the [`NodeId`] associated with a given namehash, if any + GetName(String), + /// get a user-readable diagnostics string containing networking inforamtion + GetDiagnostics, +} + +/// For now, only sent in response to a ConnectionRequest. +/// Must be parsed from message pack vector +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetResponse { + Accepted(NodeId), + Rejected(NodeId), + /// response to [`NetAction::GetPeers`] + Peers(Vec), + /// response to [`NetAction::GetPeer`] + Peer(Option), + /// response to [`NetAction::GetName`] + Name(Option), + /// response to [`NetAction::GetDiagnostics`]. A user-readable string. + Diagnostics(String), +} + +call_init!(init); + +fn init(_our: Address) { + let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "net", "distro", "sys")) + .body(rmp_serde::to_vec(&NetAction::GetPeers).unwrap()) + .send_and_await_response(5) + else { + println!("failed to get peers from networking module"); + return; + }; + let Ok(NetResponse::Peers(identities)) = rmp_serde::from_slice(&body) else { + println!("got malformed response from networking module"); + return; + }; + let identities = identities + .iter() + .map(|peer_id| { + format!( + "{}:\n networking key: {}\n routing: {:?}\n routers: {:?}", + peer_id.name, peer_id.networking_key, peer_id.ws_routing, peer_id.allowed_routers + ) + }) + .collect::>() + .join("\n"); + println!("identities of current connected peers:\n{identities}"); +} diff --git a/kinode/packages/terminal/pkg/scripts.json b/kinode/packages/terminal/pkg/scripts.json index d555c90ca..6939c3733 100644 --- a/kinode/packages/terminal/pkg/scripts.json +++ b/kinode/packages/terminal/pkg/scripts.json @@ -41,6 +41,55 @@ "net:distro:sys" ] }, + "m.wasm": { + "root": true, + "public": true, + "request_networking": true + }, + "namehash_to_name.wasm": { + "root": false, + "public": false, + "request_networking": false, + "request_capabilities": [ + "net:distro:sys" + ], + "grant_capabilities": [ + "net:distro:sys" + ] + }, + "net_diagnostics.wasm": { + "root": false, + "public": false, + "request_networking": false, + "request_capabilities": [ + "net:distro:sys" + ], + "grant_capabilities": [ + "net:distro:sys" + ] + }, + "peer.wasm": { + "root": false, + "public": false, + "request_networking": false, + "request_capabilities": [ + "net:distro:sys" + ], + "grant_capabilities": [ + "net:distro:sys" + ] + }, + "peers.wasm": { + "root": false, + "public": false, + "request_networking": false, + "request_capabilities": [ + "net:distro:sys" + ], + "grant_capabilities": [ + "net:distro:sys" + ] + }, "top.wasm": { "root": false, "public": false, @@ -49,10 +98,5 @@ "kernel:distro:sys" ], "grant_capabilities": [] - }, - "m.wasm": { - "root": true, - "public": true, - "request_networking": true } } \ No newline at end of file diff --git a/kinode/packages/terminal/terminal/src/lib.rs b/kinode/packages/terminal/terminal/src/lib.rs index cdb3053d8..831eaee1e 100644 --- a/kinode/packages/terminal/terminal/src/lib.rs +++ b/kinode/packages/terminal/terminal/src/lib.rs @@ -62,27 +62,43 @@ impl Guest for Component { aliases: HashMap::from([ ( "alias".to_string(), - "alias:terminal:sys".parse::().unwrap(), + ProcessId::new(Some("alias"), "terminal", "sys"), ), ( "cat".to_string(), - "cat:terminal:sys".parse::().unwrap(), + ProcessId::new(Some("cat"), "terminal", "sys"), ), ( "echo".to_string(), - "echo:terminal:sys".parse::().unwrap(), + ProcessId::new(Some("echo"), "terminal", "sys"), ), ( "hi".to_string(), - "hi:terminal:sys".parse::().unwrap(), + ProcessId::new(Some("hi"), "terminal", "sys"), ), ( "m".to_string(), - "m:terminal:sys".parse::().unwrap(), + ProcessId::new(Some("m"), "terminal", "sys"), + ), + ( + "namehash_to_name".to_string(), + ProcessId::new(Some("namehash_to_name"), "terminal", "sys"), + ), + ( + "net_diagnostics".to_string(), + ProcessId::new(Some("net_diagnostics"), "terminal", "sys"), + ), + ( + "peer".to_string(), + ProcessId::new(Some("peer"), "terminal", "sys"), + ), + ( + "peers".to_string(), + ProcessId::new(Some("peers"), "terminal", "sys"), ), ( "top".to_string(), - "top:terminal:sys".parse::().unwrap(), + ProcessId::new(Some("top"), "terminal", "sys"), ), ]), }, diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 021c02ad5..d7270f7e3 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -142,27 +142,6 @@ struct ModuleState { print_tx: PrintSender, } -async fn activate_url_provider(provider: &mut UrlProvider) -> Result<()> { - match Url::parse(&provider.url)?.scheme() { - "ws" | "wss" => { - let connector = WsConnect { - url: provider.url.to_string(), - auth: None, - }; - let client = tokio::time::timeout( - std::time::Duration::from_secs(10), - ClientBuilder::default().ws(connector), - ) - .await??; - provider.pubsub = Some(Provider::new_with_client(client)); - Ok(()) - } - _ => Err(anyhow::anyhow!( - "Only `ws://` or `wss://` providers are supported." - )), - } -} - /// The ETH provider runtime process is responsible for connecting to one or more ETH RPC providers /// and using them to service indexing requests from other apps. This is the runtime entry point /// for the entire module. @@ -674,10 +653,59 @@ async fn handle_eth_config_action( EthConfigAction::GetAccessSettings => { return EthConfigResponse::AccessSettings(state.access_settings.clone()); } + EthConfigAction::GetState => { + return EthConfigResponse::State { + active_subscriptions: state + .active_subscriptions + .iter() + .map(|e| { + ( + e.key().clone(), + e.value() + .iter() + .map(|(id, sub)| { + ( + *id, + match sub { + ActiveSub::Local(_) => None, + ActiveSub::Remote { provider_node, .. } => { + Some(provider_node.clone()) + } + }, + ) + }) + .collect(), + ) + }) + .collect(), + outstanding_requests: state.response_channels.iter().map(|e| *e.key()).collect(), + }; + } } EthConfigResponse::Ok } +async fn activate_url_provider(provider: &mut UrlProvider) -> Result<()> { + match Url::parse(&provider.url)?.scheme() { + "ws" | "wss" => { + let connector = WsConnect { + url: provider.url.to_string(), + auth: None, + }; + let client = tokio::time::timeout( + std::time::Duration::from_secs(10), + ClientBuilder::default().ws(connector), + ) + .await??; + provider.pubsub = Some(Provider::new_with_client(client)); + Ok(()) + } + _ => Err(anyhow::anyhow!( + "Only `ws://` or `wss://` providers are supported." + )), + } +} + fn providers_to_saved_configs(providers: &Providers) -> SavedConfigs { providers .iter() diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs index c2b6c0fb5..f9813ec0e 100644 --- a/kinode/src/kernel/mod.rs +++ b/kinode/src/kernel/mod.rs @@ -860,7 +860,7 @@ pub async fn kernel( message: t::Message::Request(t::Request { inherit: false, expects_response: None, - body: rmp_serde::to_vec(&t::NetActions::KnsBatchUpdate(default_pki_entries)) + body: rmp_serde::to_vec(&t::NetAction::KnsBatchUpdate(default_pki_entries)) .unwrap(), metadata: None, capabilities: vec![], diff --git a/kinode/src/net/mod.rs b/kinode/src/net/mod.rs index 88b46e386..b83b798eb 100644 --- a/kinode/src/net/mod.rs +++ b/kinode/src/net/mod.rs @@ -889,19 +889,20 @@ async fn handle_local_message( Message::Response((response, _context)) => { // these are received as a router, when we send ConnectionRequests // to a node we do routing for. - match rmp_serde::from_slice::(&response.body) { - Ok(NetResponses::Accepted(_)) => { + match rmp_serde::from_slice::(&response.body) { + Ok(NetResponse::Accepted(_)) => { // TODO anything here? } - Ok(NetResponses::Rejected(to)) => { + Ok(NetResponse::Rejected(to)) => { // drop from our pending map // this will drop the socket, causing initiator to see it as failed pending_passthroughs .ok_or(anyhow!("got net response as non-router"))? .remove(&(to, km.source.node)); } - Err(_) => { - // this is usually the "delivered" response to a raw message + _ => { + // this is the "delivered" response to a raw message, + // or a response to a Get that was somehow given.. ignore } } return Ok(()); @@ -909,16 +910,16 @@ async fn handle_local_message( }; if km.source.node != our.name { - if let Ok(act) = rmp_serde::from_slice::(body) { + if let Ok(act) = rmp_serde::from_slice::(body) { match act { - NetActions::KnsBatchUpdate(_) | NetActions::KnsUpdate(_) => { + NetAction::KnsBatchUpdate(_) | NetAction::KnsUpdate(_) => { // for now, we don't get these from remote. } - NetActions::ConnectionRequest(from) => { + NetAction::ConnectionRequest(from) => { // someone wants to open a passthrough with us through a router! // if we are an indirect node, and source is one of our routers, // respond by attempting to init a matching passthrough. - let res: Result = if our.allowed_routers.contains(&km.source.node) + let res: Result = if our.allowed_routers.contains(&km.source.node) { let router_id = peers .get(&km.source.node) @@ -942,9 +943,9 @@ async fn handle_local_message( print_tx, ) .await; - Ok(NetResponses::Accepted(from.clone())) + Ok(NetResponse::Accepted(from.clone())) } else { - Ok(NetResponses::Rejected(from.clone())) + Ok(NetResponse::Rejected(from.clone())) }; kernel_message_tx .send(KernelMessage { @@ -959,7 +960,7 @@ async fn handle_local_message( Response { inherit: false, body: rmp_serde::to_vec( - &res.unwrap_or(NetResponses::Rejected(from)), + &res.unwrap_or(NetResponse::Rejected(from)), )?, metadata: None, capabilities: vec![], @@ -970,6 +971,9 @@ async fn handle_local_message( }) .await?; } + _ => { + // we don't accept any other actions from remote + } } return Ok(()); }; @@ -978,14 +982,12 @@ async fn handle_local_message( parse_hello_message(our, &km, body, kernel_message_tx, print_tx).await?; Ok(()) } else { - // available commands: "peers", "pki", "names", "diagnostics" - // first parse as raw string, then deserialize to NetActions object - let mut printout = String::new(); - match rmp_serde::from_slice::(body) { - Ok(NetActions::ConnectionRequest(_)) => { + let maybe_response = match rmp_serde::from_slice::(body)? { + NetAction::ConnectionRequest(_) => { // we shouldn't receive these from ourselves. + None } - Ok(NetActions::KnsUpdate(log)) => { + NetAction::KnsUpdate(log) => { pki.insert( log.name.clone(), Identity { @@ -1000,8 +1002,9 @@ async fn handle_local_message( }, ); names.insert(log.node, log.name); + None } - Ok(NetActions::KnsBatchUpdate(log_list)) => { + NetAction::KnsBatchUpdate(log_list) => { for log in log_list { pki.insert( log.name.clone(), @@ -1018,96 +1021,70 @@ async fn handle_local_message( ); names.insert(log.node, log.name); } + None } - _ => match std::str::from_utf8(body) { - Ok("peers") => { + NetAction::GetPeers => Some(NetResponse::Peers( + peers + .iter() + .map(|p| p.identity.clone()) + .collect::>(), + )), + NetAction::GetPeer(peer_name) => Some(NetResponse::Peer( + peers.get(&peer_name).map(|p| p.identity.clone()), + )), + NetAction::GetName(namehash) => { + Some(NetResponse::Name(names.get(&namehash).map(|n| n.clone()))) + } + NetAction::GetDiagnostics => { + let mut printout = String::new(); + printout.push_str(&format!( + "indexing from contract address {}\r\n", + contract_address + )); + printout.push_str(&format!("our Identity: {:#?}\r\n", our)); + printout.push_str("we have connections with peers:\r\n"); + for peer in peers.iter() { printout.push_str(&format!( - "{:#?}", - peers - .iter() - .map(|p| p.identity.name.clone()) - .collect::>() + " {}, routing_for={}\r\n", + peer.identity.name, peer.routing_for, )); } - Ok("pki") => { - printout.push_str(&format!("{:#?}", pki)); - } - Ok("names") => { - printout.push_str(&format!("{:#?}", names)); - } - Ok("diagnostics") => { + printout.push_str(&format!("we have {} entries in the PKI\r\n", pki.len())); + if pending_passthroughs.is_some() { printout.push_str(&format!( - "indexing from contract address {}\r\n", - contract_address + "we have {} pending passthrough connections\r\n", + pending_passthroughs.unwrap().len() )); - printout.push_str(&format!("our Identity: {:#?}\r\n", our)); - printout.push_str("we have connections with peers:\r\n"); - for peer in peers.iter() { - printout.push_str(&format!( - " {}, routing_for={}\r\n", - peer.identity.name, peer.routing_for, - )); - } - printout.push_str(&format!("we have {} entries in the PKI\r\n", pki.len())); - if pending_passthroughs.is_some() { - printout.push_str(&format!( - "we have {} pending passthrough connections\r\n", - pending_passthroughs.unwrap().len() - )); - } - if forwarding_connections.is_some() { - printout.push_str(&format!( - "we have {} open passthrough connections\r\n", - forwarding_connections.unwrap().len() - )); - } - } - Ok(other) => { - // parse non-commands as a request to fetch networking data - // about a specific node name - printout.push_str(&format!("net: printing known identity for {}\r\n", other)); - match pki.get(other) { - Some(id) => { - printout.push_str(&format!("{:#?}", *id)); - } - None => { - printout.push_str("no such identity known!"); - } - } } - _ => {} - }, - } - if !printout.is_empty() { - if let Message::Request(req) = km.message { - if req.expects_response.is_some() { - kernel_message_tx - .send(KernelMessage { - id: km.id, - source: Address { - node: our.name.clone(), - process: ProcessId::new(Some("net"), "distro", "sys"), - }, - target: km.rsvp.unwrap_or(km.source), - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - body: printout.clone().into_bytes(), - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) - .await?; + if forwarding_connections.is_some() { + printout.push_str(&format!( + "we have {} open passthrough connections\r\n", + forwarding_connections.unwrap().len() + )); } + Some(NetResponse::Diagnostics(printout)) } - print_tx - .send(Printout { - verbosity: 0, - content: printout, + }; + if let Some(response_body) = maybe_response { + kernel_message_tx + .send(KernelMessage { + id: km.id, + source: Address { + node: our.name.clone(), + process: ProcessId::new(Some("net"), "distro", "sys"), + }, + target: km.rsvp.unwrap_or(km.source), + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + body: rmp_serde::to_vec(&response_body)?, + metadata: None, + capabilities: vec![], + }, + None, + )), + lazy_load_blob: None, }) .await?; } diff --git a/kinode/src/net/utils.rs b/kinode/src/net/utils.rs index 69fbe3c83..5e1ef64bd 100644 --- a/kinode/src/net/utils.rs +++ b/kinode/src/net/utils.rs @@ -226,7 +226,7 @@ pub async fn create_passthrough( message: Message::Request(Request { inherit: false, expects_response: Some(5), - body: rmp_serde::to_vec(&NetActions::ConnectionRequest(from_id.name.clone()))?, + body: rmp_serde::to_vec(&NetAction::ConnectionRequest(from_id.name.clone()))?, metadata: None, capabilities: vec![], }), diff --git a/lib/src/core.rs b/lib/src/core.rs index b05a0f734..5d2353c82 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -1532,8 +1532,9 @@ pub enum TimerAction { // /// Must be parsed from message pack vector. +/// all Get actions must be sent from local process. used for debugging #[derive(Clone, Debug, Serialize, Deserialize)] -pub enum NetActions { +pub enum NetAction { /// Received from a router of ours when they have a new pending passthrough for us. /// We should respond (if we desire) by using them to initialize a routed connection /// with the NodeId given. @@ -1542,14 +1543,30 @@ pub enum NetActions { /// in the future could get from remote provider KnsUpdate(KnsUpdate), KnsBatchUpdate(Vec), + /// get a list of peers we are connected to + GetPeers, + /// get the [`Identity`] struct for a single peer + GetPeer(String), + /// get the [`NodeId`] associated with a given namehash, if any + GetName(String), + /// get a user-readable diagnostics string containing networking inforamtion + GetDiagnostics, } /// For now, only sent in response to a ConnectionRequest. /// Must be parsed from message pack vector #[derive(Clone, Debug, Serialize, Deserialize)] -pub enum NetResponses { +pub enum NetResponse { Accepted(NodeId), Rejected(NodeId), + /// response to [`NetAction::GetPeers`] + Peers(Vec), + /// response to [`NetAction::GetPeer`] + Peer(Option), + /// response to [`NetAction::GetName`] + Name(Option), + /// response to [`NetAction::GetDiagnostics`]. A user-readable string. + Diagnostics(String), } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/lib/src/eth.rs b/lib/src/eth.rs index a66900d86..e8026b5eb 100644 --- a/lib/src/eth.rs +++ b/lib/src/eth.rs @@ -1,6 +1,6 @@ use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; /// The Action and Request type that can be made to eth:distro:sys. Any process with messaging /// capabilities can send this action to the eth provider. @@ -110,6 +110,8 @@ pub enum EthConfigAction { GetProviders, /// Get the current access settings. GetAccessSettings, + /// Get the state of calls and subscriptions. Used for debugging. + GetState, } /// Response type from an [`EthConfigAction`] request. @@ -124,6 +126,11 @@ pub enum EthConfigResponse { AccessSettings(AccessSettings), /// Permission denied due to missing capability PermissionDenied, + /// Response from a GetState request + State { + active_subscriptions: HashMap>>, // None if local, Some(node_provider_name) if remote + outstanding_requests: HashSet, + }, } /// Settings for our ETH provider From bbe5905e59a0b3bec668081ad5a57ca535f3227b Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 5 Mar 2024 17:31:43 -0300 Subject: [PATCH 2/2] update readme --- README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3dbcaf3ad..1c1e7923f 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,8 @@ The `sys` publisher is not a real node ID, but it's also not a special case valu - CTRL+R to search history, CTRL+R again to toggle through search results, CTRL+G to cancel search - `m
`: send an inter-process message.
is formatted as @. is formatted as ::. JSON containing spaces must be wrapped in single-quotes (`''`). - - Example: `m our@net:distro:sys diagnostics` + - Example: `m our@eth:distro:sys "SetPublic" -a 5` + - the '-a' flag is used to expect a response with a given timeout - `our` will always be interpolated by the system as your node's name - `hi `: send a text message to another node's command line. - Example: `hi ben.os hello world` @@ -114,6 +115,9 @@ The `sys` publisher is not a real node ID, but it's also not a special case valu - Example: `cat /terminal:sys/pkg/scripts.json` - `echo `: print `text` to the terminal - Example: `echo foo` +- `net_diagnostics`: print some useful networking diagnostic data +- `peers`: print the peers the node currently hold connections with +- `peer `: print the peer's PKI info, if it exists ### Terminal example usage