diff --git a/Cargo.lock b/Cargo.lock
index 872514bc7..ba76a973f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3008,6 +3008,16 @@ dependencies = [
"version_check",
]
+[[package]]
+name = "namehash_to_name"
+version = "0.1.0"
+dependencies = [
+ "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)",
+ "rmp-serde",
+ "serde",
+ "wit-bindgen",
+]
+
[[package]]
name = "native-tls"
version = "0.2.11"
@@ -3026,6 +3036,16 @@ dependencies = [
"tempfile",
]
+[[package]]
+name = "net_diagnostics"
+version = "0.1.0"
+dependencies = [
+ "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)",
+ "rmp-serde",
+ "serde",
+ "wit-bindgen",
+]
+
[[package]]
name = "nibble_vec"
version = "0.1.0"
@@ -3283,6 +3303,26 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
+[[package]]
+name = "peer"
+version = "0.1.0"
+dependencies = [
+ "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)",
+ "rmp-serde",
+ "serde",
+ "wit-bindgen",
+]
+
+[[package]]
+name = "peers"
+version = "0.1.0"
+dependencies = [
+ "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)",
+ "rmp-serde",
+ "serde",
+ "wit-bindgen",
+]
+
[[package]]
name = "percent-encoding"
version = "2.3.1"
diff --git a/Cargo.toml b/Cargo.toml
index cbd8822cd..c37931177 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,6 +21,7 @@ members = [
"kinode/packages/kns_indexer/kns_indexer", "kinode/packages/kns_indexer/get_block",
"kinode/packages/terminal/terminal",
"kinode/packages/terminal/alias", "kinode/packages/terminal/cat", "kinode/packages/terminal/echo", "kinode/packages/terminal/hi", "kinode/packages/terminal/m", "kinode/packages/terminal/top",
+ "kinode/packages/terminal/namehash_to_name", "kinode/packages/terminal/net_diagnostics", "kinode/packages/terminal/peer", "kinode/packages/terminal/peers",
"kinode/packages/tester/tester", "kinode/packages/tester/test_runner",
]
default-members = ["lib"]
diff --git a/README.md b/README.md
index 3dbcaf3ad..1c1e7923f 100644
--- a/README.md
+++ b/README.md
@@ -103,7 +103,8 @@ The `sys` publisher is not a real node ID, but it's also not a special case valu
- CTRL+R to search history, CTRL+R again to toggle through search results, CTRL+G to cancel search
- `m
`: send an inter-process message. is formatted as @. is formatted as ::. JSON containing spaces must be wrapped in single-quotes (`''`).
- - Example: `m our@net:distro:sys diagnostics`
+ - Example: `m our@eth:distro:sys "SetPublic" -a 5`
+ - the '-a' flag is used to expect a response with a given timeout
- `our` will always be interpolated by the system as your node's name
- `hi `: send a text message to another node's command line.
- Example: `hi ben.os hello world`
@@ -114,6 +115,9 @@ The `sys` publisher is not a real node ID, but it's also not a special case valu
- Example: `cat /terminal:sys/pkg/scripts.json`
- `echo `: print `text` to the terminal
- Example: `echo foo`
+- `net_diagnostics`: print some useful networking diagnostic data
+- `peers`: print the peers the node currently hold connections with
+- `peer `: print the peer's PKI info, if it exists
### Terminal example usage
diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs
index 16b889d1d..3269949e2 100644
--- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs
+++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs
@@ -49,12 +49,12 @@ pub enum IndexerRequests {
}
#[derive(Clone, Debug, Serialize, Deserialize)]
-pub enum NetActions {
+pub enum NetAction {
KnsUpdate(KnsUpdate),
KnsBatchUpdate(Vec),
}
-impl TryInto> for NetActions {
+impl TryInto> for NetAction {
type Error = anyhow::Error;
fn try_into(self) -> Result, Self::Error> {
Ok(rmp_serde::to_vec(&self)?)
@@ -172,7 +172,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
// shove all state into net::net
Request::new()
.target((&our.node, "net", "distro", "sys"))
- .try_body(NetActions::KnsBatchUpdate(
+ .try_body(NetAction::KnsBatchUpdate(
state.nodes.values().cloned().collect::>(),
))?
.send()?;
@@ -214,7 +214,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
// shove all state into net::net
Request::new()
.target((&our.node, "net", "distro", "sys"))
- .try_body(NetActions::KnsBatchUpdate(
+ .try_body(NetAction::KnsBatchUpdate(
state.nodes.values().cloned().collect::>(),
))?
.send()?;
@@ -403,7 +403,7 @@ fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Resul
{
Request::new()
.target((&our.node, "net", "distro", "sys"))
- .try_body(NetActions::KnsUpdate(node.clone()))?
+ .try_body(NetAction::KnsUpdate(node.clone()))?
.send()?;
}
diff --git a/kinode/packages/terminal/namehash_to_name/Cargo.toml b/kinode/packages/terminal/namehash_to_name/Cargo.toml
new file mode 100644
index 000000000..b54bcc3e9
--- /dev/null
+++ b/kinode/packages/terminal/namehash_to_name/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "namehash_to_name"
+version = "0.1.0"
+edition = "2021"
+
+
+[dependencies]
+kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" }
+rmp-serde = "1.1.2"
+serde = { version = "1.0", features = ["derive"] }
+wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" }
+
+[lib]
+crate-type = ["cdylib"]
+
+[package.metadata.component]
+package = "kinode:process"
diff --git a/kinode/packages/terminal/namehash_to_name/src/lib.rs b/kinode/packages/terminal/namehash_to_name/src/lib.rs
new file mode 100644
index 000000000..90b041fed
--- /dev/null
+++ b/kinode/packages/terminal/namehash_to_name/src/lib.rs
@@ -0,0 +1,80 @@
+use kinode_process_lib::{
+ await_next_request_body, call_init, println, Address, Message, NodeId, Request,
+};
+use serde::{Deserialize, Serialize};
+
+wit_bindgen::generate!({
+ path: "wit",
+ world: "process",
+ exports: {
+ world: Component,
+ },
+});
+
+// types copied from runtime networking core
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct Identity {
+ pub name: NodeId,
+ pub networking_key: String,
+ pub ws_routing: Option<(String, u16)>,
+ pub allowed_routers: Vec,
+}
+
+/// Must be parsed from message pack vector.
+/// all Get actions must be sent from local process. used for debugging
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum NetAction {
+ /// get a list of peers we are connected to
+ GetPeers,
+ /// get the [`Identity`] struct for a single peer
+ GetPeer(String),
+ /// get the [`NodeId`] associated with a given namehash, if any
+ GetName(String),
+ /// get a user-readable diagnostics string containing networking inforamtion
+ GetDiagnostics,
+}
+
+/// For now, only sent in response to a ConnectionRequest.
+/// Must be parsed from message pack vector
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum NetResponse {
+ Accepted(NodeId),
+ Rejected(NodeId),
+ /// response to [`NetAction::GetPeers`]
+ Peers(Vec),
+ /// response to [`NetAction::GetPeer`]
+ Peer(Option),
+ /// response to [`NetAction::GetName`]
+ Name(Option),
+ /// response to [`NetAction::GetDiagnostics`]. A user-readable string.
+ Diagnostics(String),
+}
+
+call_init!(init);
+
+fn init(_our: Address) {
+ let Ok(args) = await_next_request_body() else {
+ println!("failed to get args, aborting");
+ return;
+ };
+ let Ok(namehash) = String::from_utf8(args) else {
+ println!("argument must be a string");
+ return;
+ };
+ let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "net", "distro", "sys"))
+ .body(rmp_serde::to_vec(&NetAction::GetName(namehash.clone())).unwrap())
+ .send_and_await_response(5)
+ else {
+ println!("failed to get name from networking module");
+ return;
+ };
+ let Ok(NetResponse::Name(maybe_name)) = rmp_serde::from_slice(&body) else {
+ println!("got malformed response from networking module");
+ return;
+ };
+ match maybe_name {
+ Some(name) => println!("{namehash}: {name}"),
+ None => println!("no name found for {namehash}"),
+ }
+}
diff --git a/kinode/packages/terminal/net_diagnostics/Cargo.toml b/kinode/packages/terminal/net_diagnostics/Cargo.toml
new file mode 100644
index 000000000..72c8dfd12
--- /dev/null
+++ b/kinode/packages/terminal/net_diagnostics/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "net_diagnostics"
+version = "0.1.0"
+edition = "2021"
+
+
+[dependencies]
+kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" }
+rmp-serde = "1.1.2"
+serde = { version = "1.0", features = ["derive"] }
+wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" }
+
+[lib]
+crate-type = ["cdylib"]
+
+[package.metadata.component]
+package = "kinode:process"
diff --git a/kinode/packages/terminal/net_diagnostics/src/lib.rs b/kinode/packages/terminal/net_diagnostics/src/lib.rs
new file mode 100644
index 000000000..9079fd36b
--- /dev/null
+++ b/kinode/packages/terminal/net_diagnostics/src/lib.rs
@@ -0,0 +1,67 @@
+use kinode_process_lib::{call_init, println, Address, Message, NodeId, Request};
+use serde::{Deserialize, Serialize};
+
+wit_bindgen::generate!({
+ path: "wit",
+ world: "process",
+ exports: {
+ world: Component,
+ },
+});
+
+// types copied from runtime networking core
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct Identity {
+ pub name: NodeId,
+ pub networking_key: String,
+ pub ws_routing: Option<(String, u16)>,
+ pub allowed_routers: Vec,
+}
+
+/// Must be parsed from message pack vector.
+/// all Get actions must be sent from local process. used for debugging
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum NetAction {
+ /// get a list of peers we are connected to
+ GetPeers,
+ /// get the [`Identity`] struct for a single peer
+ GetPeer(String),
+ /// get the [`NodeId`] associated with a given namehash, if any
+ GetName(String),
+ /// get a user-readable diagnostics string containing networking inforamtion
+ GetDiagnostics,
+}
+
+/// For now, only sent in response to a ConnectionRequest.
+/// Must be parsed from message pack vector
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum NetResponse {
+ Accepted(NodeId),
+ Rejected(NodeId),
+ /// response to [`NetAction::GetPeers`]
+ Peers(Vec),
+ /// response to [`NetAction::GetPeer`]
+ Peer(Option),
+ /// response to [`NetAction::GetName`]
+ Name(Option),
+ /// response to [`NetAction::GetDiagnostics`]. A user-readable string.
+ Diagnostics(String),
+}
+
+call_init!(init);
+
+fn init(_our: Address) {
+ let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "net", "distro", "sys"))
+ .body(rmp_serde::to_vec(&NetAction::GetDiagnostics).unwrap())
+ .send_and_await_response(5)
+ else {
+ println!("failed to get diagnostics from networking module");
+ return;
+ };
+ let Ok(NetResponse::Diagnostics(printout)) = rmp_serde::from_slice(&body) else {
+ println!("got malformed response from networking module");
+ return;
+ };
+ println!("{printout}");
+}
diff --git a/kinode/packages/terminal/peer/Cargo.toml b/kinode/packages/terminal/peer/Cargo.toml
new file mode 100644
index 000000000..0446cc120
--- /dev/null
+++ b/kinode/packages/terminal/peer/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "peer"
+version = "0.1.0"
+edition = "2021"
+
+
+[dependencies]
+kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" }
+rmp-serde = "1.1.2"
+serde = { version = "1.0", features = ["derive"] }
+wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" }
+
+[lib]
+crate-type = ["cdylib"]
+
+[package.metadata.component]
+package = "kinode:process"
diff --git a/kinode/packages/terminal/peer/src/lib.rs b/kinode/packages/terminal/peer/src/lib.rs
new file mode 100644
index 000000000..9aa05f430
--- /dev/null
+++ b/kinode/packages/terminal/peer/src/lib.rs
@@ -0,0 +1,83 @@
+use kinode_process_lib::{
+ await_next_request_body, call_init, println, Address, Message, NodeId, Request,
+};
+use serde::{Deserialize, Serialize};
+
+wit_bindgen::generate!({
+ path: "wit",
+ world: "process",
+ exports: {
+ world: Component,
+ },
+});
+
+// types copied from runtime networking core
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct Identity {
+ pub name: NodeId,
+ pub networking_key: String,
+ pub ws_routing: Option<(String, u16)>,
+ pub allowed_routers: Vec,
+}
+
+/// Must be parsed from message pack vector.
+/// all Get actions must be sent from local process. used for debugging
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum NetAction {
+ /// get a list of peers we are connected to
+ GetPeers,
+ /// get the [`Identity`] struct for a single peer
+ GetPeer(String),
+ /// get the [`NodeId`] associated with a given namehash, if any
+ GetName(String),
+ /// get a user-readable diagnostics string containing networking inforamtion
+ GetDiagnostics,
+}
+
+/// For now, only sent in response to a ConnectionRequest.
+/// Must be parsed from message pack vector
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum NetResponse {
+ Accepted(NodeId),
+ Rejected(NodeId),
+ /// response to [`NetAction::GetPeers`]
+ Peers(Vec),
+ /// response to [`NetAction::GetPeer`]
+ Peer(Option),
+ /// response to [`NetAction::GetName`]
+ Name(Option),
+ /// response to [`NetAction::GetDiagnostics`]. A user-readable string.
+ Diagnostics(String),
+}
+
+call_init!(init);
+
+fn init(_our: Address) {
+ let Ok(args) = await_next_request_body() else {
+ println!("failed to get args, aborting");
+ return;
+ };
+ let Ok(name) = String::from_utf8(args) else {
+ println!("argument must be a string");
+ return;
+ };
+ let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "net", "distro", "sys"))
+ .body(rmp_serde::to_vec(&NetAction::GetPeer(name.clone())).unwrap())
+ .send_and_await_response(5)
+ else {
+ println!("failed to get response from networking module");
+ return;
+ };
+ let Ok(NetResponse::Peer(maybe_peer_id)) = rmp_serde::from_slice(&body) else {
+ println!("got malformed response from networking module");
+ return;
+ };
+ match maybe_peer_id {
+ Some(peer_id) => println!(
+ "peer identity for {}:\n networking key: {}\n routing: {:?}\n routers: {:?}",
+ peer_id.name, peer_id.networking_key, peer_id.ws_routing, peer_id.allowed_routers
+ ),
+ None => println!("no PKI entry found with name {name}"),
+ }
+}
diff --git a/kinode/packages/terminal/peers/Cargo.toml b/kinode/packages/terminal/peers/Cargo.toml
new file mode 100644
index 000000000..269016293
--- /dev/null
+++ b/kinode/packages/terminal/peers/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "peers"
+version = "0.1.0"
+edition = "2021"
+
+
+[dependencies]
+kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" }
+rmp-serde = "1.1.2"
+serde = { version = "1.0", features = ["derive"] }
+wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" }
+
+[lib]
+crate-type = ["cdylib"]
+
+[package.metadata.component]
+package = "kinode:process"
diff --git a/kinode/packages/terminal/peers/src/lib.rs b/kinode/packages/terminal/peers/src/lib.rs
new file mode 100644
index 000000000..bc324e222
--- /dev/null
+++ b/kinode/packages/terminal/peers/src/lib.rs
@@ -0,0 +1,77 @@
+use kinode_process_lib::{call_init, println, Address, Message, NodeId, Request};
+use serde::{Deserialize, Serialize};
+
+wit_bindgen::generate!({
+ path: "wit",
+ world: "process",
+ exports: {
+ world: Component,
+ },
+});
+
+// types copied from runtime networking core
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct Identity {
+ pub name: NodeId,
+ pub networking_key: String,
+ pub ws_routing: Option<(String, u16)>,
+ pub allowed_routers: Vec,
+}
+
+/// Must be parsed from message pack vector.
+/// all Get actions must be sent from local process. used for debugging
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum NetAction {
+ /// get a list of peers we are connected to
+ GetPeers,
+ /// get the [`Identity`] struct for a single peer
+ GetPeer(String),
+ /// get the [`NodeId`] associated with a given namehash, if any
+ GetName(String),
+ /// get a user-readable diagnostics string containing networking inforamtion
+ GetDiagnostics,
+}
+
+/// For now, only sent in response to a ConnectionRequest.
+/// Must be parsed from message pack vector
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum NetResponse {
+ Accepted(NodeId),
+ Rejected(NodeId),
+ /// response to [`NetAction::GetPeers`]
+ Peers(Vec),
+ /// response to [`NetAction::GetPeer`]
+ Peer(Option),
+ /// response to [`NetAction::GetName`]
+ Name(Option),
+ /// response to [`NetAction::GetDiagnostics`]. A user-readable string.
+ Diagnostics(String),
+}
+
+call_init!(init);
+
+fn init(_our: Address) {
+ let Ok(Ok(Message::Response { body, .. })) = Request::to(("our", "net", "distro", "sys"))
+ .body(rmp_serde::to_vec(&NetAction::GetPeers).unwrap())
+ .send_and_await_response(5)
+ else {
+ println!("failed to get peers from networking module");
+ return;
+ };
+ let Ok(NetResponse::Peers(identities)) = rmp_serde::from_slice(&body) else {
+ println!("got malformed response from networking module");
+ return;
+ };
+ let identities = identities
+ .iter()
+ .map(|peer_id| {
+ format!(
+ "{}:\n networking key: {}\n routing: {:?}\n routers: {:?}",
+ peer_id.name, peer_id.networking_key, peer_id.ws_routing, peer_id.allowed_routers
+ )
+ })
+ .collect::>()
+ .join("\n");
+ println!("identities of current connected peers:\n{identities}");
+}
diff --git a/kinode/packages/terminal/pkg/scripts.json b/kinode/packages/terminal/pkg/scripts.json
index d555c90ca..6939c3733 100644
--- a/kinode/packages/terminal/pkg/scripts.json
+++ b/kinode/packages/terminal/pkg/scripts.json
@@ -41,6 +41,55 @@
"net:distro:sys"
]
},
+ "m.wasm": {
+ "root": true,
+ "public": true,
+ "request_networking": true
+ },
+ "namehash_to_name.wasm": {
+ "root": false,
+ "public": false,
+ "request_networking": false,
+ "request_capabilities": [
+ "net:distro:sys"
+ ],
+ "grant_capabilities": [
+ "net:distro:sys"
+ ]
+ },
+ "net_diagnostics.wasm": {
+ "root": false,
+ "public": false,
+ "request_networking": false,
+ "request_capabilities": [
+ "net:distro:sys"
+ ],
+ "grant_capabilities": [
+ "net:distro:sys"
+ ]
+ },
+ "peer.wasm": {
+ "root": false,
+ "public": false,
+ "request_networking": false,
+ "request_capabilities": [
+ "net:distro:sys"
+ ],
+ "grant_capabilities": [
+ "net:distro:sys"
+ ]
+ },
+ "peers.wasm": {
+ "root": false,
+ "public": false,
+ "request_networking": false,
+ "request_capabilities": [
+ "net:distro:sys"
+ ],
+ "grant_capabilities": [
+ "net:distro:sys"
+ ]
+ },
"top.wasm": {
"root": false,
"public": false,
@@ -49,10 +98,5 @@
"kernel:distro:sys"
],
"grant_capabilities": []
- },
- "m.wasm": {
- "root": true,
- "public": true,
- "request_networking": true
}
}
\ No newline at end of file
diff --git a/kinode/packages/terminal/terminal/src/lib.rs b/kinode/packages/terminal/terminal/src/lib.rs
index cdb3053d8..831eaee1e 100644
--- a/kinode/packages/terminal/terminal/src/lib.rs
+++ b/kinode/packages/terminal/terminal/src/lib.rs
@@ -62,27 +62,43 @@ impl Guest for Component {
aliases: HashMap::from([
(
"alias".to_string(),
- "alias:terminal:sys".parse::().unwrap(),
+ ProcessId::new(Some("alias"), "terminal", "sys"),
),
(
"cat".to_string(),
- "cat:terminal:sys".parse::().unwrap(),
+ ProcessId::new(Some("cat"), "terminal", "sys"),
),
(
"echo".to_string(),
- "echo:terminal:sys".parse::().unwrap(),
+ ProcessId::new(Some("echo"), "terminal", "sys"),
),
(
"hi".to_string(),
- "hi:terminal:sys".parse::().unwrap(),
+ ProcessId::new(Some("hi"), "terminal", "sys"),
),
(
"m".to_string(),
- "m:terminal:sys".parse::().unwrap(),
+ ProcessId::new(Some("m"), "terminal", "sys"),
+ ),
+ (
+ "namehash_to_name".to_string(),
+ ProcessId::new(Some("namehash_to_name"), "terminal", "sys"),
+ ),
+ (
+ "net_diagnostics".to_string(),
+ ProcessId::new(Some("net_diagnostics"), "terminal", "sys"),
+ ),
+ (
+ "peer".to_string(),
+ ProcessId::new(Some("peer"), "terminal", "sys"),
+ ),
+ (
+ "peers".to_string(),
+ ProcessId::new(Some("peers"), "terminal", "sys"),
),
(
"top".to_string(),
- "top:terminal:sys".parse::().unwrap(),
+ ProcessId::new(Some("top"), "terminal", "sys"),
),
]),
},
diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs
index 021c02ad5..d7270f7e3 100644
--- a/kinode/src/eth/mod.rs
+++ b/kinode/src/eth/mod.rs
@@ -142,27 +142,6 @@ struct ModuleState {
print_tx: PrintSender,
}
-async fn activate_url_provider(provider: &mut UrlProvider) -> Result<()> {
- match Url::parse(&provider.url)?.scheme() {
- "ws" | "wss" => {
- let connector = WsConnect {
- url: provider.url.to_string(),
- auth: None,
- };
- let client = tokio::time::timeout(
- std::time::Duration::from_secs(10),
- ClientBuilder::default().ws(connector),
- )
- .await??;
- provider.pubsub = Some(Provider::new_with_client(client));
- Ok(())
- }
- _ => Err(anyhow::anyhow!(
- "Only `ws://` or `wss://` providers are supported."
- )),
- }
-}
-
/// The ETH provider runtime process is responsible for connecting to one or more ETH RPC providers
/// and using them to service indexing requests from other apps. This is the runtime entry point
/// for the entire module.
@@ -674,10 +653,59 @@ async fn handle_eth_config_action(
EthConfigAction::GetAccessSettings => {
return EthConfigResponse::AccessSettings(state.access_settings.clone());
}
+ EthConfigAction::GetState => {
+ return EthConfigResponse::State {
+ active_subscriptions: state
+ .active_subscriptions
+ .iter()
+ .map(|e| {
+ (
+ e.key().clone(),
+ e.value()
+ .iter()
+ .map(|(id, sub)| {
+ (
+ *id,
+ match sub {
+ ActiveSub::Local(_) => None,
+ ActiveSub::Remote { provider_node, .. } => {
+ Some(provider_node.clone())
+ }
+ },
+ )
+ })
+ .collect(),
+ )
+ })
+ .collect(),
+ outstanding_requests: state.response_channels.iter().map(|e| *e.key()).collect(),
+ };
+ }
}
EthConfigResponse::Ok
}
+async fn activate_url_provider(provider: &mut UrlProvider) -> Result<()> {
+ match Url::parse(&provider.url)?.scheme() {
+ "ws" | "wss" => {
+ let connector = WsConnect {
+ url: provider.url.to_string(),
+ auth: None,
+ };
+ let client = tokio::time::timeout(
+ std::time::Duration::from_secs(10),
+ ClientBuilder::default().ws(connector),
+ )
+ .await??;
+ provider.pubsub = Some(Provider::new_with_client(client));
+ Ok(())
+ }
+ _ => Err(anyhow::anyhow!(
+ "Only `ws://` or `wss://` providers are supported."
+ )),
+ }
+}
+
fn providers_to_saved_configs(providers: &Providers) -> SavedConfigs {
providers
.iter()
diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs
index c2b6c0fb5..f9813ec0e 100644
--- a/kinode/src/kernel/mod.rs
+++ b/kinode/src/kernel/mod.rs
@@ -860,7 +860,7 @@ pub async fn kernel(
message: t::Message::Request(t::Request {
inherit: false,
expects_response: None,
- body: rmp_serde::to_vec(&t::NetActions::KnsBatchUpdate(default_pki_entries))
+ body: rmp_serde::to_vec(&t::NetAction::KnsBatchUpdate(default_pki_entries))
.unwrap(),
metadata: None,
capabilities: vec![],
diff --git a/kinode/src/net/mod.rs b/kinode/src/net/mod.rs
index 88b46e386..b83b798eb 100644
--- a/kinode/src/net/mod.rs
+++ b/kinode/src/net/mod.rs
@@ -889,19 +889,20 @@ async fn handle_local_message(
Message::Response((response, _context)) => {
// these are received as a router, when we send ConnectionRequests
// to a node we do routing for.
- match rmp_serde::from_slice::(&response.body) {
- Ok(NetResponses::Accepted(_)) => {
+ match rmp_serde::from_slice::(&response.body) {
+ Ok(NetResponse::Accepted(_)) => {
// TODO anything here?
}
- Ok(NetResponses::Rejected(to)) => {
+ Ok(NetResponse::Rejected(to)) => {
// drop from our pending map
// this will drop the socket, causing initiator to see it as failed
pending_passthroughs
.ok_or(anyhow!("got net response as non-router"))?
.remove(&(to, km.source.node));
}
- Err(_) => {
- // this is usually the "delivered" response to a raw message
+ _ => {
+ // this is the "delivered" response to a raw message,
+ // or a response to a Get that was somehow given.. ignore
}
}
return Ok(());
@@ -909,16 +910,16 @@ async fn handle_local_message(
};
if km.source.node != our.name {
- if let Ok(act) = rmp_serde::from_slice::(body) {
+ if let Ok(act) = rmp_serde::from_slice::(body) {
match act {
- NetActions::KnsBatchUpdate(_) | NetActions::KnsUpdate(_) => {
+ NetAction::KnsBatchUpdate(_) | NetAction::KnsUpdate(_) => {
// for now, we don't get these from remote.
}
- NetActions::ConnectionRequest(from) => {
+ NetAction::ConnectionRequest(from) => {
// someone wants to open a passthrough with us through a router!
// if we are an indirect node, and source is one of our routers,
// respond by attempting to init a matching passthrough.
- let res: Result = if our.allowed_routers.contains(&km.source.node)
+ let res: Result = if our.allowed_routers.contains(&km.source.node)
{
let router_id = peers
.get(&km.source.node)
@@ -942,9 +943,9 @@ async fn handle_local_message(
print_tx,
)
.await;
- Ok(NetResponses::Accepted(from.clone()))
+ Ok(NetResponse::Accepted(from.clone()))
} else {
- Ok(NetResponses::Rejected(from.clone()))
+ Ok(NetResponse::Rejected(from.clone()))
};
kernel_message_tx
.send(KernelMessage {
@@ -959,7 +960,7 @@ async fn handle_local_message(
Response {
inherit: false,
body: rmp_serde::to_vec(
- &res.unwrap_or(NetResponses::Rejected(from)),
+ &res.unwrap_or(NetResponse::Rejected(from)),
)?,
metadata: None,
capabilities: vec![],
@@ -970,6 +971,9 @@ async fn handle_local_message(
})
.await?;
}
+ _ => {
+ // we don't accept any other actions from remote
+ }
}
return Ok(());
};
@@ -978,14 +982,12 @@ async fn handle_local_message(
parse_hello_message(our, &km, body, kernel_message_tx, print_tx).await?;
Ok(())
} else {
- // available commands: "peers", "pki", "names", "diagnostics"
- // first parse as raw string, then deserialize to NetActions object
- let mut printout = String::new();
- match rmp_serde::from_slice::(body) {
- Ok(NetActions::ConnectionRequest(_)) => {
+ let maybe_response = match rmp_serde::from_slice::(body)? {
+ NetAction::ConnectionRequest(_) => {
// we shouldn't receive these from ourselves.
+ None
}
- Ok(NetActions::KnsUpdate(log)) => {
+ NetAction::KnsUpdate(log) => {
pki.insert(
log.name.clone(),
Identity {
@@ -1000,8 +1002,9 @@ async fn handle_local_message(
},
);
names.insert(log.node, log.name);
+ None
}
- Ok(NetActions::KnsBatchUpdate(log_list)) => {
+ NetAction::KnsBatchUpdate(log_list) => {
for log in log_list {
pki.insert(
log.name.clone(),
@@ -1018,96 +1021,70 @@ async fn handle_local_message(
);
names.insert(log.node, log.name);
}
+ None
}
- _ => match std::str::from_utf8(body) {
- Ok("peers") => {
+ NetAction::GetPeers => Some(NetResponse::Peers(
+ peers
+ .iter()
+ .map(|p| p.identity.clone())
+ .collect::>(),
+ )),
+ NetAction::GetPeer(peer_name) => Some(NetResponse::Peer(
+ peers.get(&peer_name).map(|p| p.identity.clone()),
+ )),
+ NetAction::GetName(namehash) => {
+ Some(NetResponse::Name(names.get(&namehash).map(|n| n.clone())))
+ }
+ NetAction::GetDiagnostics => {
+ let mut printout = String::new();
+ printout.push_str(&format!(
+ "indexing from contract address {}\r\n",
+ contract_address
+ ));
+ printout.push_str(&format!("our Identity: {:#?}\r\n", our));
+ printout.push_str("we have connections with peers:\r\n");
+ for peer in peers.iter() {
printout.push_str(&format!(
- "{:#?}",
- peers
- .iter()
- .map(|p| p.identity.name.clone())
- .collect::>()
+ " {}, routing_for={}\r\n",
+ peer.identity.name, peer.routing_for,
));
}
- Ok("pki") => {
- printout.push_str(&format!("{:#?}", pki));
- }
- Ok("names") => {
- printout.push_str(&format!("{:#?}", names));
- }
- Ok("diagnostics") => {
+ printout.push_str(&format!("we have {} entries in the PKI\r\n", pki.len()));
+ if pending_passthroughs.is_some() {
printout.push_str(&format!(
- "indexing from contract address {}\r\n",
- contract_address
+ "we have {} pending passthrough connections\r\n",
+ pending_passthroughs.unwrap().len()
));
- printout.push_str(&format!("our Identity: {:#?}\r\n", our));
- printout.push_str("we have connections with peers:\r\n");
- for peer in peers.iter() {
- printout.push_str(&format!(
- " {}, routing_for={}\r\n",
- peer.identity.name, peer.routing_for,
- ));
- }
- printout.push_str(&format!("we have {} entries in the PKI\r\n", pki.len()));
- if pending_passthroughs.is_some() {
- printout.push_str(&format!(
- "we have {} pending passthrough connections\r\n",
- pending_passthroughs.unwrap().len()
- ));
- }
- if forwarding_connections.is_some() {
- printout.push_str(&format!(
- "we have {} open passthrough connections\r\n",
- forwarding_connections.unwrap().len()
- ));
- }
- }
- Ok(other) => {
- // parse non-commands as a request to fetch networking data
- // about a specific node name
- printout.push_str(&format!("net: printing known identity for {}\r\n", other));
- match pki.get(other) {
- Some(id) => {
- printout.push_str(&format!("{:#?}", *id));
- }
- None => {
- printout.push_str("no such identity known!");
- }
- }
}
- _ => {}
- },
- }
- if !printout.is_empty() {
- if let Message::Request(req) = km.message {
- if req.expects_response.is_some() {
- kernel_message_tx
- .send(KernelMessage {
- id: km.id,
- source: Address {
- node: our.name.clone(),
- process: ProcessId::new(Some("net"), "distro", "sys"),
- },
- target: km.rsvp.unwrap_or(km.source),
- rsvp: None,
- message: Message::Response((
- Response {
- inherit: false,
- body: printout.clone().into_bytes(),
- metadata: None,
- capabilities: vec![],
- },
- None,
- )),
- lazy_load_blob: None,
- })
- .await?;
+ if forwarding_connections.is_some() {
+ printout.push_str(&format!(
+ "we have {} open passthrough connections\r\n",
+ forwarding_connections.unwrap().len()
+ ));
}
+ Some(NetResponse::Diagnostics(printout))
}
- print_tx
- .send(Printout {
- verbosity: 0,
- content: printout,
+ };
+ if let Some(response_body) = maybe_response {
+ kernel_message_tx
+ .send(KernelMessage {
+ id: km.id,
+ source: Address {
+ node: our.name.clone(),
+ process: ProcessId::new(Some("net"), "distro", "sys"),
+ },
+ target: km.rsvp.unwrap_or(km.source),
+ rsvp: None,
+ message: Message::Response((
+ Response {
+ inherit: false,
+ body: rmp_serde::to_vec(&response_body)?,
+ metadata: None,
+ capabilities: vec![],
+ },
+ None,
+ )),
+ lazy_load_blob: None,
})
.await?;
}
diff --git a/kinode/src/net/utils.rs b/kinode/src/net/utils.rs
index 69fbe3c83..5e1ef64bd 100644
--- a/kinode/src/net/utils.rs
+++ b/kinode/src/net/utils.rs
@@ -226,7 +226,7 @@ pub async fn create_passthrough(
message: Message::Request(Request {
inherit: false,
expects_response: Some(5),
- body: rmp_serde::to_vec(&NetActions::ConnectionRequest(from_id.name.clone()))?,
+ body: rmp_serde::to_vec(&NetAction::ConnectionRequest(from_id.name.clone()))?,
metadata: None,
capabilities: vec![],
}),
diff --git a/lib/src/core.rs b/lib/src/core.rs
index b05a0f734..5d2353c82 100644
--- a/lib/src/core.rs
+++ b/lib/src/core.rs
@@ -1532,8 +1532,9 @@ pub enum TimerAction {
//
/// Must be parsed from message pack vector.
+/// all Get actions must be sent from local process. used for debugging
#[derive(Clone, Debug, Serialize, Deserialize)]
-pub enum NetActions {
+pub enum NetAction {
/// Received from a router of ours when they have a new pending passthrough for us.
/// We should respond (if we desire) by using them to initialize a routed connection
/// with the NodeId given.
@@ -1542,14 +1543,30 @@ pub enum NetActions {
/// in the future could get from remote provider
KnsUpdate(KnsUpdate),
KnsBatchUpdate(Vec),
+ /// get a list of peers we are connected to
+ GetPeers,
+ /// get the [`Identity`] struct for a single peer
+ GetPeer(String),
+ /// get the [`NodeId`] associated with a given namehash, if any
+ GetName(String),
+ /// get a user-readable diagnostics string containing networking inforamtion
+ GetDiagnostics,
}
/// For now, only sent in response to a ConnectionRequest.
/// Must be parsed from message pack vector
#[derive(Clone, Debug, Serialize, Deserialize)]
-pub enum NetResponses {
+pub enum NetResponse {
Accepted(NodeId),
Rejected(NodeId),
+ /// response to [`NetAction::GetPeers`]
+ Peers(Vec),
+ /// response to [`NetAction::GetPeer`]
+ Peer(Option),
+ /// response to [`NetAction::GetName`]
+ Name(Option),
+ /// response to [`NetAction::GetDiagnostics`]. A user-readable string.
+ Diagnostics(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
diff --git a/lib/src/eth.rs b/lib/src/eth.rs
index a66900d86..e8026b5eb 100644
--- a/lib/src/eth.rs
+++ b/lib/src/eth.rs
@@ -1,6 +1,6 @@
use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult};
use serde::{Deserialize, Serialize};
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
/// The Action and Request type that can be made to eth:distro:sys. Any process with messaging
/// capabilities can send this action to the eth provider.
@@ -110,6 +110,8 @@ pub enum EthConfigAction {
GetProviders,
/// Get the current access settings.
GetAccessSettings,
+ /// Get the state of calls and subscriptions. Used for debugging.
+ GetState,
}
/// Response type from an [`EthConfigAction`] request.
@@ -124,6 +126,11 @@ pub enum EthConfigResponse {
AccessSettings(AccessSettings),
/// Permission denied due to missing capability
PermissionDenied,
+ /// Response from a GetState request
+ State {
+ active_subscriptions: HashMap>>, // None if local, Some(node_provider_name) if remote
+ outstanding_requests: HashSet,
+ },
}
/// Settings for our ETH provider