diff --git a/.env.example b/.env.example deleted file mode 100644 index 0bf65bf99..000000000 --- a/.env.example +++ /dev/null @@ -1,20 +0,0 @@ -### node filesystem config, adjust according to your needs. -### note, will run with default values if no .env is found. - -### for example, if you use remote s3, you might want to set chunk_size to 5mb, and adjust flush_to_cold_interval - -### Default values automatically set (s3 defaults to None) - -# MEM_BUFFER_LIMIT=5242880 # 5mb -# READ_CACHE_LIMIT=5242880 # 5mb -# CHUNK_SIZE=262144 # 256kb -# FLUSH_TO_COLD_INTERVAL=60 # 60s -# ENCRYPTION=true # true -# CLOUD_ENABLED=false # false, controls whether new writes will be to s3 or local - -### Example s3 config -# S3_ACCESS_KEY=minioadmin -# S3_SECRET__KEY=minioadmin -# S3_REGION=eu-north-1 -# S3_BUCKET=mylittlebucket -# S3_ENDPOINT=http://localhost:9000 diff --git a/Cargo.lock b/Cargo.lock index 5a80dfed9..872514bc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,7 +78,7 @@ name = "alias" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "serde", "serde_json", "wit-bindgen", @@ -93,7 +93,7 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "alloy-eips" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2" +source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08" dependencies = [ "alloy-primitives", "alloy-rlp", @@ -104,7 +104,7 @@ dependencies = [ [[package]] name = "alloy-json-rpc" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2" +source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08" dependencies = [ "alloy-primitives", "serde", @@ -115,7 +115,7 @@ dependencies = [ [[package]] name = "alloy-network" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2" +source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08" dependencies = [ "alloy-eips", "alloy-json-rpc", @@ -149,7 +149,7 @@ dependencies = [ [[package]] name = "alloy-providers" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2" +source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08" dependencies = [ "alloy-network", "alloy-primitives", @@ -168,7 +168,7 @@ dependencies = [ [[package]] name = "alloy-pubsub" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2" +source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -207,7 +207,7 @@ dependencies = [ [[package]] name = "alloy-rpc-client" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2" +source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -228,7 +228,7 @@ dependencies = [ [[package]] name = "alloy-rpc-trace-types" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2" +source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08" dependencies = [ "alloy-primitives", "alloy-rpc-types", @@ -239,7 +239,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2" +source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08" dependencies = [ "alloy-primitives", "alloy-rlp", @@ -282,10 +282,11 @@ dependencies = [ [[package]] name = "alloy-transport" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2" +source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08" dependencies = [ "alloy-json-rpc", "base64 0.21.7", + "futures-util", "serde", "serde_json", "thiserror", @@ -298,7 +299,7 @@ dependencies = [ [[package]] name = "alloy-transport-http" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2" +source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -311,7 +312,7 @@ dependencies = [ [[package]] name = "alloy-transport-ws" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2" +source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08" dependencies = [ "alloy-pubsub", "alloy-transport", @@ -407,7 +408,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "bincode", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=3232423)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "rand 0.8.5", "serde", "serde_json", @@ -894,7 +895,7 @@ name = "cat" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "serde", "serde_json", "wit-bindgen", @@ -956,7 +957,7 @@ dependencies = [ "anyhow", "base64 0.13.1", "bincode", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=3232423)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "pleco", "serde", "serde_json", @@ -1591,7 +1592,7 @@ name = "download" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "serde", "serde_json", "wit-bindgen", @@ -1622,7 +1623,7 @@ name = "echo" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "serde", "serde_json", "wit-bindgen", @@ -1822,7 +1823,7 @@ version = "0.2.0" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "rand 0.8.5", "serde", "serde_json", @@ -1963,6 +1964,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "get_block" +version = "0.1.0" +dependencies = [ + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", + "serde", + "serde_json", + "wit-bindgen", +] + [[package]] name = "getrandom" version = "0.2.12" @@ -2132,7 +2143,7 @@ checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" name = "hi" version = "0.1.0" dependencies = [ - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "serde", "serde_json", "wit-bindgen", @@ -2162,7 +2173,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "serde", "serde_json", "wit-bindgen", @@ -2388,7 +2399,7 @@ name = "install" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "serde", "serde_json", "wit-bindgen", @@ -2624,24 +2635,7 @@ dependencies = [ [[package]] name = "kinode_process_lib" version = "0.6.0" -source = "git+https://github.com/kinode-dao/process_lib?rev=12bf9ee#12bf9eefeb9237db5e5165647fa91b437b05e169" -dependencies = [ - "anyhow", - "bincode", - "http 1.0.0", - "mime_guess", - "rand 0.8.5", - "serde", - "serde_json", - "thiserror", - "url", - "wit-bindgen", -] - -[[package]] -name = "kinode_process_lib" -version = "0.6.0" -source = "git+https://github.com/kinode-dao/process_lib?rev=3232423#323242399efdcdad02e7f31bb6a9cc5eec048610" +source = "git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2#f6a2bdab370da88488b210f4dc92b715b9c0e4b2" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -2714,7 +2708,7 @@ dependencies = [ "anyhow", "bincode", "hex", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=3232423)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "rmp-serde", "serde", "serde_json", @@ -2895,7 +2889,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "regex", "serde", "serde_json", @@ -4581,7 +4575,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "rand 0.8.5", "regex", "serde", @@ -4595,7 +4589,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "serde", "serde_json", "thiserror", @@ -4609,7 +4603,7 @@ dependencies = [ "anyhow", "bincode", "indexmap", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "serde", "serde_json", "thiserror", @@ -4827,7 +4821,7 @@ name = "top" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "serde", "serde_json", "wit-bindgen", @@ -5058,7 +5052,7 @@ name = "uninstall" version = "0.1.0" dependencies = [ "anyhow", - "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=12bf9ee)", + "kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha.2)", "serde", "serde_json", "wit-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 5e8178580..cbd8822cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ members = [ "kinode/packages/app_store/download", "kinode/packages/app_store/install", "kinode/packages/app_store/uninstall", "kinode/packages/chess/chess", "kinode/packages/homepage/homepage", - "kinode/packages/kns_indexer/kns_indexer", + "kinode/packages/kns_indexer/kns_indexer", "kinode/packages/kns_indexer/get_block", "kinode/packages/terminal/terminal", "kinode/packages/terminal/alias", "kinode/packages/terminal/cat", "kinode/packages/terminal/echo", "kinode/packages/terminal/hi", "kinode/packages/terminal/m", "kinode/packages/terminal/top", "kinode/packages/tester/tester", "kinode/packages/tester/test_runner", diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index c14595431..7b99d4014 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -26,11 +26,11 @@ simulation-mode = [] [dependencies] aes-gcm = "0.10.2" -alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" } -alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" } -alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56", features = ["ws"]} -alloy-transport-ws = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" } -alloy-providers = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" } +alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4" } +alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4" } +alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4", features = ["ws"]} +alloy-transport-ws = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4" } +alloy-providers = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4" } anyhow = "1.0.71" async-trait = "0.1.71" base64 = "0.13" diff --git a/kinode/build.rs b/kinode/build.rs index 4ea37ee56..443c2959d 100644 --- a/kinode/build.rs +++ b/kinode/build.rs @@ -46,6 +46,11 @@ fn build_and_zip_package( } fn main() -> anyhow::Result<()> { + if std::env::var("SKIP_BUILD_SCRIPT").is_ok() { + println!("Skipping build script"); + return Ok(()); + } + let pwd = std::env::current_dir()?; let parent_dir = pwd.parent().unwrap(); let packages_dir = pwd.join("packages"); diff --git a/kinode/default_providers_mainnet.json b/kinode/default_providers_mainnet.json index 46f041c27..b83f72e7a 100644 --- a/kinode/default_providers_mainnet.json +++ b/kinode/default_providers_mainnet.json @@ -1,29 +1,63 @@ [ { - "name": "default-router-1.os", - "owner": "", - "node": "0xb35eb347deb896bc3fb6132a07fca1601f83462385ed11e835c24c33ba4ef73d", - "public_key": "0xb1b1cf23c89f651aac3e5fd4decb04aa177ab0ec8ce5f1d3877b90bb6f5779db", - "ip": "147.135.114.167", - "port": 9005, - "routers": [] + "chain_id": 1, + "trusted": false, + "provider": { + "RpcUrl": "wss://ethereum.publicnode.com" + } }, { - "name": "default-router-2.os", - "owner": "", - "node": "0xd827ae579fafa604af79fbed977e8abe048497f10885c6473dfd343a3b7b4458", - "public_key": "0xab9f1a996db3a4e1dbcd31d765daedeb3af9af4f570c0968463b5be3a7d1e992", - "ip": "147.135.114.167", - "port": 9006, - "routers": [] + "chain_id": 10, + "trusted": false, + "provider": { + "Node": { + "use_as_provider": true, + "kns_update": { + "name": "default-router-1.os", + "owner": "", + "node": "0xb35eb347deb896bc3fb6132a07fca1601f83462385ed11e835c24c33ba4ef73d", + "public_key": "0xb1b1cf23c89f651aac3e5fd4decb04aa177ab0ec8ce5f1d3877b90bb6f5779db", + "ip": "147.135.114.167", + "port": 9005, + "routers": [] + } + } + } }, { - "name": "default-router-3.os", - "owner": "", - "node": "0x96e36331c8f0882f2c0c46c13b15d812def04fe8606d503bc0e2be39db26486a", - "public_key": "0x536e30785e64dd0349a697285af365b5ed7c4d300010139261cfc4dbdd5b254b", - "ip": "147.135.114.167", - "port": 9007, - "routers": [] + "chain_id": 10, + "trusted": false, + "provider": { + "Node": { + "use_as_provider": true, + "kns_update": { + "name": "default-router-2.os", + "owner": "", + "node": "0xd827ae579fafa604af79fbed977e8abe048497f10885c6473dfd343a3b7b4458", + "public_key": "0xab9f1a996db3a4e1dbcd31d765daedeb3af9af4f570c0968463b5be3a7d1e992", + "ip": "147.135.114.167", + "port": 9006, + "routers": [] + } + } + } + }, + { + "chain_id": 10, + "trusted": false, + "provider": { + "Node": { + "use_as_provider": true, + "kns_update": { + "name": "default-router-3.os", + "owner": "", + "node": "0x96e36331c8f0882f2c0c46c13b15d812def04fe8606d503bc0e2be39db26486a", + "public_key": "0x536e30785e64dd0349a697285af365b5ed7c4d300010139261cfc4dbdd5b254b", + "ip": "147.135.114.167", + "port": 9007, + "routers": [] + } + } + } } ] \ No newline at end of file diff --git a/kinode/default_providers_testnet.json b/kinode/default_providers_testnet.json index af47dce33..528abfd41 100644 --- a/kinode/default_providers_testnet.json +++ b/kinode/default_providers_testnet.json @@ -1,29 +1,63 @@ [ { - "name": "default-router-1.os", - "owner": "", - "node": "0xb35eb347deb896bc3fb6132a07fca1601f83462385ed11e835c24c33ba4ef73d", - "public_key": "0xb1b1cf23c89f651aac3e5fd4decb04aa177ab0ec8ce5f1d3877b90bb6f5779db", - "ip": "147.135.114.167", - "port": 9002, - "routers": [] + "chain_id": 1, + "trusted": false, + "provider": { + "RpcUrl": "wss://ethereum.publicnode.com" + } }, { - "name": "default-router-2.os", - "owner": "", - "node": "0xd827ae579fafa604af79fbed977e8abe048497f10885c6473dfd343a3b7b4458", - "public_key": "0xab9f1a996db3a4e1dbcd31d765daedeb3af9af4f570c0968463b5be3a7d1e992", - "ip": "147.135.114.167", - "port": 9003, - "routers": [] + "chain_id": 11155111, + "trusted": false, + "provider": { + "Node": { + "use_as_provider": true, + "kns_update": { + "name": "default-router-1.os", + "owner": "", + "node": "0xb35eb347deb896bc3fb6132a07fca1601f83462385ed11e835c24c33ba4ef73d", + "public_key": "0xb1b1cf23c89f651aac3e5fd4decb04aa177ab0ec8ce5f1d3877b90bb6f5779db", + "ip": "147.135.114.167", + "port": 9002, + "routers": [] + } + } + } }, { - "name": "default-router-3.os", - "owner": "", - "node": "0x96e36331c8f0882f2c0c46c13b15d812def04fe8606d503bc0e2be39db26486a", - "public_key": "0x536e30785e64dd0349a697285af365b5ed7c4d300010139261cfc4dbdd5b254b", - "ip": "147.135.114.167", - "port": 9004, - "routers": [] + "chain_id": 11155111, + "trusted": false, + "provider": { + "Node": { + "use_as_provider": true, + "kns_update": { + "name": "default-router-2.os", + "owner": "", + "node": "0xd827ae579fafa604af79fbed977e8abe048497f10885c6473dfd343a3b7b4458", + "public_key": "0xab9f1a996db3a4e1dbcd31d765daedeb3af9af4f570c0968463b5be3a7d1e992", + "ip": "147.135.114.167", + "port": 9003, + "routers": [] + } + } + } + }, + { + "chain_id": 11155111, + "trusted": false, + "provider": { + "Node": { + "use_as_provider": true, + "kns_update": { + "name": "default-router-3.os", + "owner": "", + "node": "0x96e36331c8f0882f2c0c46c13b15d812def04fe8606d503bc0e2be39db26486a", + "public_key": "0x536e30785e64dd0349a697285af365b5ed7c4d300010139261cfc4dbdd5b254b", + "ip": "147.135.114.167", + "port": 9004, + "routers": [] + } + } + } } ] \ No newline at end of file diff --git a/kinode/packages/app_store/app_store/Cargo.toml b/kinode/packages/app_store/app_store/Cargo.toml index f34a470c9..13f182c46 100644 --- a/kinode/packages/app_store/app_store/Cargo.toml +++ b/kinode/packages/app_store/app_store/Cargo.toml @@ -9,7 +9,7 @@ alloy-primitives = "0.6.2" alloy-sol-types = "0.6.2" anyhow = "1.0" bincode = "1.3.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } rand = "0.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/app_store/app_store/src/lib.rs b/kinode/packages/app_store/app_store/src/lib.rs index 6965c1265..771f558f5 100644 --- a/kinode/packages/app_store/app_store/src/lib.rs +++ b/kinode/packages/app_store/app_store/src/lib.rs @@ -1,7 +1,3 @@ -use kinode_process_lib::eth::{ - get_logs, subscribe, unsubscribe, Address as EthAddress, EthSub, EthSubResult, Filter, - SubscriptionResult, -}; use kinode_process_lib::http::{bind_http_path, serve_ui, HttpServerRequest}; use kinode_process_lib::kernel_types as kt; use kinode_process_lib::*; @@ -42,7 +38,8 @@ use ft_worker_lib::{ /// - uninstalled + deleted /// - set to automatically update if a new version is available -const CONTRACT_ADDRESS: &str = "0x18c39eB547A0060C6034f8bEaFB947D1C16eADF1"; +const CHAIN_ID: u64 = 11155111; // sepolia +const CONTRACT_ADDRESS: &str = "0x18c39eB547A0060C6034f8bEaFB947D1C16eADF1"; // sepolia const EVENTS: [&str; 3] = [ "AppRegistered(uint256,string,bytes,string,bytes32)", @@ -59,7 +56,7 @@ pub enum Req { RemoteRequest(RemoteRequest), FTWorkerCommand(FTWorkerCommand), FTWorkerResult(FTWorkerResult), - Eth(EthSubResult), + Eth(eth::EthSubResult), Http(HttpServerRequest), } @@ -71,6 +68,33 @@ pub enum Resp { FTWorkerResult(FTWorkerResult), } +fn fetch_logs(eth_provider: ð::Provider, filter: ð::Filter) -> Vec { + loop { + match eth_provider.get_logs(filter) { + Ok(res) => return res, + Err(_) => { + println!("app store: failed to fetch logs! trying again in 5s..."); + std::thread::sleep(std::time::Duration::from_secs(5)); + continue; + } + } + } +} + +fn subscribe_to_logs(eth_provider: ð::Provider, filter: eth::Filter) { + loop { + match eth_provider.subscribe(1, filter.clone()) { + Ok(()) => break, + Err(_) => { + println!("app store: failed to subscribe to chain! trying again in 5s..."); + std::thread::sleep(std::time::Duration::from_secs(5)); + continue; + } + } + } + println!("app store: subscribed to logs successfully"); +} + call_init!(init); fn init(our: Address) { println!("{}: started", our.package()); @@ -109,23 +133,25 @@ fn init(our: Address) { state.contract_address ); + // create new provider for sepolia with request-timeout of 60s + // can change, log requests can take quite a long time. + let eth_provider = eth::Provider::new(CHAIN_ID, 60); + let mut requested_packages: HashMap = HashMap::new(); // get past logs, subscribe to new ones. - let filter = Filter::new() - .address(EthAddress::from_str(&state.contract_address).unwrap()) + let filter = eth::Filter::new() + .address(eth::Address::from_str(&state.contract_address).unwrap()) .from_block(state.last_saved_block - 1) + .to_block(eth::BlockNumberOrTag::Latest) .events(EVENTS); - let logs = get_logs(&filter); - - if let Ok(logs) = logs { - for log in logs { - state.ingest_listings_contract_event(&our, log); - } + for log in fetch_logs(ð_provider, &filter) { + if let Err(e) = state.ingest_listings_contract_event(&our, log) { + println!("app store: error ingesting log: {e:?}"); + }; } - - subscribe(1, filter).unwrap(); + subscribe_to_logs(ð_provider, filter); loop { match await_message() { @@ -134,8 +160,13 @@ fn init(our: Address) { println!("app store: got network error: {send_error}"); } Ok(message) => { - if let Err(e) = handle_message(&our, &mut state, &mut requested_packages, &message) - { + if let Err(e) = handle_message( + &our, + &mut state, + ð_provider, + &mut requested_packages, + &message, + ) { println!("app store: error handling message: {:?}", e) } } @@ -150,6 +181,7 @@ fn init(our: Address) { fn handle_message( our: &Address, mut state: &mut State, + eth_provider: ð::Provider, mut requested_packages: &mut HashMap, message: &Message, ) -> anyhow::Result<()> { @@ -164,8 +196,13 @@ fn handle_message( if our.node != source.node { return Err(anyhow::anyhow!("local request from non-local node")); } - let resp = - handle_local_request(&our, &local_request, &mut state, &mut requested_packages); + let resp = handle_local_request( + &our, + &local_request, + &mut state, + eth_provider, + &mut requested_packages, + ); if expects_response.is_some() { Response::new().body(serde_json::to_vec(&resp)?).send()?; } @@ -189,10 +226,19 @@ fn handle_message( if source.node() != our.node() || source.process != "eth:distro:sys" { return Err(anyhow::anyhow!("eth sub event from weird addr: {source}")); } - if let Ok(EthSub { result, .. }) = eth_result { + if let Ok(eth::EthSub { result, .. }) = eth_result { handle_eth_sub_event(our, &mut state, result)?; } else { - println!("app store: got eth sub error: {eth_result:?}"); + println!("app store: got eth subscription error"); + // attempt to resubscribe + subscribe_to_logs( + ð_provider, + eth::Filter::new() + .address(eth::Address::from_str(&state.contract_address).unwrap()) + .from_block(state.last_saved_block - 1) + .to_block(eth::BlockNumberOrTag::Latest) + .events(EVENTS), + ); } } Req::Http(incoming) => { @@ -269,6 +315,7 @@ fn handle_local_request( our: &Address, request: &LocalRequest, state: &mut State, + eth_provider: ð::Provider, requested_packages: &mut HashMap, ) -> LocalResponse { match request { @@ -338,22 +385,21 @@ fn handle_local_request( LocalRequest::RebuildIndex => { *state = State::new(CONTRACT_ADDRESS.to_string()).unwrap(); // kill our old subscription and build a new one. - unsubscribe(1).unwrap(); + eth_provider + .unsubscribe(1) + .expect("app_store: failed to unsub from eth events!"); - let filter = Filter::new() - .address(EthAddress::from_str(&state.contract_address).unwrap()) + let filter = eth::Filter::new() + .address(eth::Address::from_str(&state.contract_address).unwrap()) .from_block(state.last_saved_block - 1) .events(EVENTS); - let logs = get_logs(&filter); - - if let Ok(logs) = logs { - for log in logs { - state.ingest_listings_contract_event(our, log); - } + for log in fetch_logs(ð_provider, &filter) { + if let Err(e) = state.ingest_listings_contract_event(our, log) { + println!("app store: error ingesting log: {e:?}"); + }; } - subscribe(1, filter).unwrap(); - + subscribe_to_logs(ð_provider, filter); LocalResponse::RebuiltIndex } } @@ -542,9 +588,9 @@ fn handle_ft_worker_result(body: &[u8], context: &[u8]) -> anyhow::Result<()> { fn handle_eth_sub_event( our: &Address, state: &mut State, - event: SubscriptionResult, + event: eth::SubscriptionResult, ) -> anyhow::Result<()> { - let SubscriptionResult::Log(log) = event else { + let eth::SubscriptionResult::Log(log) = event else { return Err(anyhow::anyhow!("app store: got non-log event")); }; state.ingest_listings_contract_event(our, *log) diff --git a/kinode/packages/app_store/app_store/src/types.rs b/kinode/packages/app_store/app_store/src/types.rs index 1d6adea24..f4a49e47e 100644 --- a/kinode/packages/app_store/app_store/src/types.rs +++ b/kinode/packages/app_store/app_store/src/types.rs @@ -206,6 +206,7 @@ impl State { let manifest_file = vfs::File { path: format!("/{}/pkg/manifest.json", package_id), + timeout: 5, }; let manifest_bytes = manifest_file.read()?; let manifest_hash = generate_metadata_hash(&manifest_bytes); @@ -284,6 +285,7 @@ impl State { if entry.file_type == vfs::FileType::Directory { let zip_file = vfs::File { path: format!("/{}/pkg/{}.zip", package_id, package_id), + timeout: 5, }; let Ok(zip_file_bytes) = zip_file.read() else { continue; @@ -293,6 +295,7 @@ impl State { let our_version = generate_version_hash(&zip_file_bytes); let manifest_file = vfs::File { path: format!("/{}/pkg/manifest.json", package_id), + timeout: 5, }; let manifest_bytes = manifest_file.read()?; // the user will need to turn mirroring and auto-update back on if they diff --git a/kinode/packages/app_store/download/Cargo.toml b/kinode/packages/app_store/download/Cargo.toml index 07bb716df..75e7ca1f5 100644 --- a/kinode/packages/app_store/download/Cargo.toml +++ b/kinode/packages/app_store/download/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } diff --git a/kinode/packages/app_store/ft_worker/Cargo.toml b/kinode/packages/app_store/ft_worker/Cargo.toml index 3171678b8..7ec470781 100644 --- a/kinode/packages/app_store/ft_worker/Cargo.toml +++ b/kinode/packages/app_store/ft_worker/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] anyhow = "1.0" bincode = "1.3.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } rand = "0.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/app_store/install/Cargo.toml b/kinode/packages/app_store/install/Cargo.toml index 28f5e6e15..f12bf75d5 100644 --- a/kinode/packages/app_store/install/Cargo.toml +++ b/kinode/packages/app_store/install/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } diff --git a/kinode/packages/app_store/pkg/manifest.json b/kinode/packages/app_store/pkg/manifest.json index 5112319d4..705a80667 100644 --- a/kinode/packages/app_store/pkg/manifest.json +++ b/kinode/packages/app_store/pkg/manifest.json @@ -13,6 +13,12 @@ "vfs:distro:sys", "kernel:distro:sys", "eth:distro:sys", + { + "process": "eth:distro:sys", + "params": { + "root": true + } + }, "sqlite:distro:sys", "kv:distro:sys", "chess:chess:sys", diff --git a/kinode/packages/app_store/uninstall/Cargo.toml b/kinode/packages/app_store/uninstall/Cargo.toml index 020dc4917..f7ebb6b54 100644 --- a/kinode/packages/app_store/uninstall/Cargo.toml +++ b/kinode/packages/app_store/uninstall/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } diff --git a/kinode/packages/chess/chess/Cargo.toml b/kinode/packages/chess/chess/Cargo.toml index 385da11d5..255dd1754 100644 --- a/kinode/packages/chess/chess/Cargo.toml +++ b/kinode/packages/chess/chess/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" anyhow = "1.0" base64 = "0.13" bincode = "1.3.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } pleco = "0.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/homepage/homepage/Cargo.toml b/kinode/packages/homepage/homepage/Cargo.toml index bb9a8e409..d9d0d1c4f 100644 --- a/kinode/packages/homepage/homepage/Cargo.toml +++ b/kinode/packages/homepage/homepage/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] anyhow = "1.0" bincode = "1.3.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } diff --git a/kinode/packages/kns_indexer/get_block/Cargo.toml b/kinode/packages/kns_indexer/get_block/Cargo.toml new file mode 100644 index 000000000..8ae30de57 --- /dev/null +++ b/kinode/packages/kns_indexer/get_block/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "get_block" +version = "0.1.0" +edition = "2021" + + +[dependencies] +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } + +[lib] +crate-type = ["cdylib"] + +[package.metadata.component] +package = "kinode:process" diff --git a/kinode/packages/kns_indexer/get_block/src/lib.rs b/kinode/packages/kns_indexer/get_block/src/lib.rs new file mode 100644 index 000000000..11c9c2356 --- /dev/null +++ b/kinode/packages/kns_indexer/get_block/src/lib.rs @@ -0,0 +1,36 @@ +use kinode_process_lib::{await_next_request_body, call_init, eth, println, Address}; + +wit_bindgen::generate!({ + path: "wit", + world: "process", + exports: { + world: Component, + }, +}); + +call_init!(init); + +fn init(_our: Address) { + let Ok(args) = await_next_request_body() else { + println!("get_block: failed to get args, aborting"); + return; + }; + + // incoming args bytes are a string we parse to u64, if none provided, default to 1 + let chain_id = std::str::from_utf8(&args) + .unwrap_or("1") + .parse::() + .unwrap_or(1); + + // request timeout of 5s + let provider = eth::Provider::new(chain_id, 5); + + match provider.get_block_number() { + Ok(block_number) => { + println!("latest block number: {block_number}"); + } + Err(e) => { + println!("failed to get block number: {e:?}"); + } + } +} diff --git a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml index 285e7fd49..8edb58bde 100644 --- a/kinode/packages/kns_indexer/kns_indexer/Cargo.toml +++ b/kinode/packages/kns_indexer/kns_indexer/Cargo.toml @@ -10,7 +10,7 @@ alloy-primitives = "0.6.2" alloy-sol-types = "0.6.2" bincode = "1.3.3" hex = "0.4.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } rmp-serde = "1.1.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs index d22c9d436..58b4cdf41 100644 --- a/kinode/packages/kns_indexer/kns_indexer/src/lib.rs +++ b/kinode/packages/kns_indexer/kns_indexer/src/lib.rs @@ -1,12 +1,6 @@ use alloy_sol_types::{sol, SolEvent}; - use kinode_process_lib::{ - await_message, - eth::{ - get_block_number, get_logs, subscribe, Address as EthAddress, BlockNumberOrTag, EthSub, - EthSubResult, Filter, Log, SubscriptionResult, - }, - get_typed_state, print_to_terminal, println, set_state, Address, Message, Request, Response, + await_message, eth, get_typed_state, println, set_state, Address, Message, Request, Response, }; use serde::{Deserialize, Serialize}; use std::collections::{ @@ -25,14 +19,15 @@ wit_bindgen::generate!({ #[derive(Clone, Debug, Serialize, Deserialize)] struct State { + chain_id: u64, // what contract this state pertains to - contract_address: Option, + contract_address: String, // namehash to human readable name names: HashMap, // human readable name to most recent on-chain routing information as json // NOTE: not every namehash will have a node registered nodes: HashMap, - // last block we read from + // last block we have an update from block: u64, } @@ -99,25 +94,74 @@ sol! { event RoutingUpdate(bytes32 indexed node, bytes32[] routers); } +fn subscribe_to_logs(eth_provider: ð::Provider, from_block: u64, filter: eth::Filter) { + loop { + match eth_provider.subscribe(1, filter.clone().from_block(from_block)) { + Ok(()) => break, + Err(_) => { + println!("kns_indexer: failed to subscribe to chain! trying again in 5s..."); + std::thread::sleep(std::time::Duration::from_secs(5)); + continue; + } + } + } + println!("kns_indexer: subscribed to logs successfully"); +} + struct Component; impl Guest for Component { fn init(our: String) { let our: Address = our.parse().unwrap(); - let mut state: State = State { - contract_address: None, - names: HashMap::new(), - nodes: HashMap::new(), - block: 1, - }; + // first, await a message from the kernel which will contain the + // chain ID and contract address for the KNS version we want to track. + let chain_id: u64; + let contract_address: String; + loop { + let Ok(Message::Request { source, body, .. }) = await_message() else { + continue; + }; + if source.process != "kernel:distro:sys" { + continue; + } + (chain_id, contract_address) = serde_json::from_slice(&body).unwrap(); + break; + } + println!( + "kns_indexer: indexing on contract address {}", + contract_address + ); // if we have state, load it in - match get_typed_state(|bytes| Ok(bincode::deserialize(bytes)?)) { + let state: State = match get_typed_state(|bytes| Ok(bincode::deserialize::(bytes)?)) + { Some(s) => { - state = s; + // if chain id or contract address changed from a previous run, reset state + if s.chain_id != chain_id || s.contract_address != contract_address { + println!("kns_indexer: resetting state because runtime contract address or chain ID changed"); + State { + chain_id, + contract_address, + names: HashMap::new(), + nodes: HashMap::new(), + block: 1, + } + } else { + println!( + "kns_indexer: loading in {} persisted PKI entries", + s.nodes.len() + ); + s + } } - None => {} - } + None => State { + chain_id, + contract_address: contract_address.clone(), + names: HashMap::new(), + nodes: HashMap::new(), + block: 1, + }, + }; match main(our, state) { Ok(_) => {} @@ -129,34 +173,6 @@ impl Guest for Component { } fn main(our: Address, mut state: State) -> anyhow::Result<()> { - // first, await a message from the kernel which will contain the - // contract address for the KNS version we want to track. - let mut contract_address: Option = None; - loop { - let Ok(Message::Request { source, body, .. }) = await_message() else { - continue; - }; - if source.process != "kernel:distro:sys" { - continue; - } - contract_address = Some(std::str::from_utf8(&body).unwrap().to_string()); - break; - } - println!( - "kns_indexer: indexing on contract address {}", - contract_address.as_ref().unwrap() - ); - // if contract address changed from a previous run, reset state - if state.contract_address != contract_address { - println!("kns_indexer: contract address changed, re-indexing"); - state = State { - contract_address: contract_address.clone(), - names: HashMap::new(), - nodes: HashMap::new(), - block: 1, - }; - } - // shove all state into net::net Request::new() .target((&our.node, "net", "distro", "sys")) @@ -165,10 +181,10 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { ))? .send()?; - let filter = Filter::new() - .address(contract_address.unwrap().parse::().unwrap()) - .to_block(BlockNumberOrTag::Latest) + let filter = eth::Filter::new() + .address(state.contract_address.parse::().unwrap()) .from_block(state.block - 1) + .to_block(eth::BlockNumberOrTag::Latest) .events(vec![ "NodeRegistered(bytes32,bytes)", "KeyUpdate(bytes32,bytes32)", @@ -177,11 +193,26 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { "RoutingUpdate(bytes32,bytes32[])", ]); + // 60s timeout -- these calls can take a long time + // if they do time out, we try them again + let eth_provider = eth::Provider::new(state.chain_id, 60); + // if block in state is < current_block, get logs from that part. - if state.block < get_block_number()? { - let logs = get_logs(&filter)?; - for log in logs { - handle_log(&our, &mut state, &log)?; + if state.block < eth_provider.get_block_number().unwrap_or(u64::MAX) { + loop { + match eth_provider.get_logs(&filter) { + Ok(logs) => { + for log in logs { + handle_log(&our, &mut state, &log)?; + } + break; + } + Err(_) => { + println!("kns_indexer: failed to fetch logs! trying again in 5s..."); + std::thread::sleep(std::time::Duration::from_secs(5)); + continue; + } + } } } // shove all state into net::net @@ -194,7 +225,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { set_state(&bincode::serialize(&state)?); - subscribe(1, filter.clone())?; + subscribe_to_logs(ð_provider, state.block - 1, filter.clone()); let mut pending_requests: BTreeMap> = BTreeMap::new(); @@ -210,7 +241,14 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { }; if source.process == "eth:distro:sys" { - handle_eth_message(&our, &mut state, &mut pending_requests, &body, &filter)?; + handle_eth_message( + &our, + &mut state, + ð_provider, + &mut pending_requests, + &body, + &filter, + )?; } else { let Ok(request) = serde_json::from_slice::(&body) else { println!("kns_indexer: got invalid message"); @@ -250,23 +288,24 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> { fn handle_eth_message( our: &Address, state: &mut State, + eth_provider: ð::Provider, pending_requests: &mut BTreeMap>, body: &[u8], - filter: &Filter, + filter: ð::Filter, ) -> anyhow::Result<()> { - let Ok(eth_result) = serde_json::from_slice::(body) else { + let Ok(eth_result) = serde_json::from_slice::(body) else { return Err(anyhow::anyhow!("kns_indexer: got invalid message")); }; match eth_result { - Ok(EthSub { result, .. }) => { - if let SubscriptionResult::Log(log) = result { + Ok(eth::EthSub { result, .. }) => { + if let eth::SubscriptionResult::Log(log) = result { handle_log(our, state, &log)?; } } - Err(e) => { - println!("kns_indexer: got sub error, resubscribing.. {:?}", e.error); - subscribe(1, filter.clone())?; + Err(_e) => { + println!("kns_indexer: got eth subscription error"); + subscribe_to_logs(ð_provider, state.block - 1, filter.clone()); } } @@ -304,9 +343,7 @@ fn handle_eth_message( Ok(()) } -fn handle_log(our: &Address, state: &mut State, log: &Log) -> anyhow::Result<()> { - state.block = log.block_number.expect("expect").to::(); - +fn handle_log(our: &Address, state: &mut State, log: ð::Log) -> anyhow::Result<()> { let node_id = log.topics[1]; let name = match state.names.entry(node_id.to_string()) { @@ -368,22 +405,27 @@ fn handle_log(our: &Address, state: &mut State, log: &Log) -> anyhow::Result<()> && ((node.ip != "" && node.port != 0) || node.routers.len() > 0) && send { - print_to_terminal( - 1, - &format!( - "kns_indexer: sending ID to net: {node:?} (blocknum {})", - state.block - ), - ); Request::new() .target((&our.node, "net", "distro", "sys")) .try_body(NetActions::KnsUpdate(node.clone()))? .send()?; } + + // if new block is > 100 from last block, save state + let block = log.block_number.expect("expect").to::(); + if block > state.block + 100 { + println!( + "kns_indexer: persisting {} PKI entries at block {}", + state.nodes.len(), + block + ); + state.block = block; + set_state(&bincode::serialize(state)?); + } Ok(()) } -fn get_name(log: &Log) -> String { +fn get_name(log: ð::Log) -> String { let decoded = NodeRegistered::abi_decode_data(&log.data, true).unwrap(); let name = match dnswire_decode(decoded.0.clone()) { Ok(n) => n, diff --git a/kinode/packages/kns_indexer/pkg/scripts.json b/kinode/packages/kns_indexer/pkg/scripts.json new file mode 100644 index 000000000..8d4119cab --- /dev/null +++ b/kinode/packages/kns_indexer/pkg/scripts.json @@ -0,0 +1,13 @@ +{ + "get_block.wasm": { + "root": false, + "public": false, + "request_networking": false, + "request_capabilities": [ + "eth:distro:sys" + ], + "grant_capabilities": [ + "eth:distro:sys" + ] + } +} \ No newline at end of file diff --git a/kinode/packages/terminal/alias/Cargo.toml b/kinode/packages/terminal/alias/Cargo.toml index 7213f3e2a..318269cc7 100644 --- a/kinode/packages/terminal/alias/Cargo.toml +++ b/kinode/packages/terminal/alias/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } diff --git a/kinode/packages/terminal/cat/Cargo.toml b/kinode/packages/terminal/cat/Cargo.toml index f52ebb846..a5dbc8113 100644 --- a/kinode/packages/terminal/cat/Cargo.toml +++ b/kinode/packages/terminal/cat/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } diff --git a/kinode/packages/terminal/cat/src/lib.rs b/kinode/packages/terminal/cat/src/lib.rs index 373a2b79f..e538dddc4 100644 --- a/kinode/packages/terminal/cat/src/lib.rs +++ b/kinode/packages/terminal/cat/src/lib.rs @@ -1,5 +1,5 @@ use kinode_process_lib::{ - await_next_request_body, call_init, get_blob, println, vfs, Address, Request, Response, + await_next_request_body, call_init, get_blob, println, vfs, Address, Request, }; wit_bindgen::generate!({ diff --git a/kinode/packages/terminal/echo/Cargo.toml b/kinode/packages/terminal/echo/Cargo.toml index 79c526172..36f9a32a1 100644 --- a/kinode/packages/terminal/echo/Cargo.toml +++ b/kinode/packages/terminal/echo/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } diff --git a/kinode/packages/terminal/hi/Cargo.toml b/kinode/packages/terminal/hi/Cargo.toml index 2fcb9145b..7bcec5c3d 100644 --- a/kinode/packages/terminal/hi/Cargo.toml +++ b/kinode/packages/terminal/hi/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [dependencies] -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } diff --git a/kinode/packages/terminal/m/Cargo.toml b/kinode/packages/terminal/m/Cargo.toml index a0f3080a8..372e286b5 100644 --- a/kinode/packages/terminal/m/Cargo.toml +++ b/kinode/packages/terminal/m/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] anyhow = "1.0" clap = "4.4.18" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } regex = "1.10.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/terminal/m/src/lib.rs b/kinode/packages/terminal/m/src/lib.rs index 4a0d8db0b..c202d64cb 100644 --- a/kinode/packages/terminal/m/src/lib.rs +++ b/kinode/packages/terminal/m/src/lib.rs @@ -1,6 +1,6 @@ use clap::{Arg, Command}; use kinode_process_lib::{ - await_next_request_body, call_init, println, Address, Request, Response, SendErrorKind, + await_next_request_body, call_init, println, Address, Request, SendErrorKind, }; use regex::Regex; diff --git a/kinode/packages/terminal/pkg/manifest.json b/kinode/packages/terminal/pkg/manifest.json index 7c787eb08..72003cee3 100644 --- a/kinode/packages/terminal/pkg/manifest.json +++ b/kinode/packages/terminal/pkg/manifest.json @@ -12,6 +12,12 @@ "kernel:distro:sys", "vfs:distro:sys", "eth:distro:sys", + { + "process": "eth:distro:sys", + "params": { + "root": true + } + }, "sqlite:distro:sys", "kv:distro:sys", "chess:chess:sys", diff --git a/kinode/packages/terminal/terminal/Cargo.toml b/kinode/packages/terminal/terminal/Cargo.toml index 1980a8004..a29805a35 100644 --- a/kinode/packages/terminal/terminal/Cargo.toml +++ b/kinode/packages/terminal/terminal/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] anyhow = "1.0" bincode = "1.3.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } rand = "0.8" regex = "1.10.3" serde = { version = "1.0", features = ["derive"] } diff --git a/kinode/packages/terminal/terminal/src/lib.rs b/kinode/packages/terminal/terminal/src/lib.rs index 6c924b925..03994985e 100644 --- a/kinode/packages/terminal/terminal/src/lib.rs +++ b/kinode/packages/terminal/terminal/src/lib.rs @@ -5,7 +5,6 @@ use kinode_process_lib::{ get_blob, get_typed_state, our_capabilities, print_to_terminal, println, set_state, vfs, Address, Capability, ProcessId, Request, }; -use regex::Regex; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; diff --git a/kinode/packages/terminal/top/Cargo.toml b/kinode/packages/terminal/top/Cargo.toml index ccf37c9b8..1bfff6883 100644 --- a/kinode/packages/terminal/top/Cargo.toml +++ b/kinode/packages/terminal/top/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" } diff --git a/kinode/packages/tester/test_runner/Cargo.toml b/kinode/packages/tester/test_runner/Cargo.toml index 13af06401..d4423d2b6 100644 --- a/kinode/packages/tester/test_runner/Cargo.toml +++ b/kinode/packages/tester/test_runner/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] anyhow = "1.0" bincode = "1.3.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" diff --git a/kinode/packages/tester/test_runner/src/lib.rs b/kinode/packages/tester/test_runner/src/lib.rs index b4ce5b399..e29d502d2 100644 --- a/kinode/packages/tester/test_runner/src/lib.rs +++ b/kinode/packages/tester/test_runner/src/lib.rs @@ -84,7 +84,7 @@ fn handle_message(our: &Address) -> anyhow::Result<()> { None => std::collections::HashMap::new(), Some(caps_index) => { children.remove(caps_index); - let file = vfs::file::open_file(&caps_file_path, false)?; + let file = vfs::file::open_file(&caps_file_path, false, None)?; let file_contents = file.read()?; serde_json::from_slice(&file_contents)? } diff --git a/kinode/packages/tester/tester/Cargo.toml b/kinode/packages/tester/tester/Cargo.toml index 49e2095ee..69ce7280c 100644 --- a/kinode/packages/tester/tester/Cargo.toml +++ b/kinode/packages/tester/tester/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" anyhow = "1.0" bincode = "1.3.3" indexmap = "2.1" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha.2" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index 6257bab40..4e9a3296e 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -1,4 +1,811 @@ -#![allow(unused)] -pub mod provider; +use alloy_providers::provider::Provider; +use alloy_pubsub::PubSubFrontend; +use alloy_rpc_client::ClientBuilder; +use alloy_transport_ws::WsConnect; +use anyhow::Result; +use dashmap::DashMap; +use lib::types::core::*; +use lib::types::eth::*; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use tokio::task::JoinHandle; +use url::Url; -pub use lib::types::eth as types; +mod subscription; + +/// meta-type for all incoming requests we need to handle +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +enum IncomingReq { + EthAction(EthAction), + EthConfigAction(EthConfigAction), + EthSubResult(EthSubResult), + SubKeepalive(u64), +} + +/// mapping of chain id to ordered lists of providers +type Providers = Arc>; + +#[derive(Debug)] +struct ActiveProviders { + pub urls: Vec, + pub nodes: Vec, +} + +#[derive(Debug)] +struct UrlProvider { + pub trusted: bool, + pub url: String, + pub pubsub: Option>, +} + +#[derive(Debug)] +struct NodeProvider { + pub trusted: bool, + /// semi-temporary flag to mark if this provider is currently usable + /// future updates will make this more dynamic + pub usable: bool, + pub name: String, +} + +impl ActiveProviders { + fn add_provider_config(&mut self, new: ProviderConfig) { + match new.provider { + NodeOrRpcUrl::Node { + kns_update, + use_as_provider, + } => { + self.nodes.push(NodeProvider { + trusted: new.trusted, + usable: use_as_provider, + name: kns_update.name, + }); + } + NodeOrRpcUrl::RpcUrl(url) => { + self.urls.push(UrlProvider { + trusted: new.trusted, + url, + pubsub: None, + }); + } + } + } + + fn remove_provider(&mut self, remove: &str) { + self.urls.retain(|x| x.url != remove); + self.nodes.retain(|x| x.name != remove); + } +} + +/// existing subscriptions held by local OR remote processes +type ActiveSubscriptions = Arc>>; + +type ResponseChannels = Arc>; + +#[derive(Debug)] +enum ActiveSub { + Local(JoinHandle<()>), + Remote { + provider_node: String, + handle: JoinHandle<()>, + sender: tokio::sync::mpsc::Sender, + }, +} + +impl ActiveSub { + async fn close(&self, sub_id: u64, state: &ModuleState) { + match self { + ActiveSub::Local(handle) => { + handle.abort(); + } + ActiveSub::Remote { + provider_node, + handle, + .. + } => { + // tell provider node we don't need their services anymore + kernel_message( + &state.our, + rand::random(), + Address { + node: provider_node.clone(), + process: ETH_PROCESS_ID.clone(), + }, + None, + true, + None, + EthAction::UnsubscribeLogs(sub_id), + &state.send_to_loop, + ) + .await; + handle.abort(); + } + } + } +} + +struct ModuleState { + /// the name of this node + our: Arc, + /// the access settings for this provider + access_settings: AccessSettings, + /// the set of providers we have available for all chains + providers: Providers, + /// the set of active subscriptions we are currently maintaining + active_subscriptions: ActiveSubscriptions, + /// the set of response channels we have open for outstanding request tasks + response_channels: ResponseChannels, + /// our sender for kernel event loop + send_to_loop: MessageSender, + /// our sender for terminal prints + print_tx: PrintSender, +} + +async fn activate_url_provider(provider: &mut UrlProvider) -> Result<()> { + match Url::parse(&provider.url)?.scheme() { + "ws" | "wss" => { + let connector = WsConnect { + url: provider.url.to_string(), + auth: None, + }; + let client = tokio::time::timeout( + std::time::Duration::from_secs(10), + ClientBuilder::default().ws(connector), + ) + .await??; + provider.pubsub = Some(Provider::new_with_client(client)); + Ok(()) + } + _ => Err(anyhow::anyhow!( + "Only `ws://` or `wss://` providers are supported." + )), + } +} + +/// The ETH provider runtime process is responsible for connecting to one or more ETH RPC providers +/// and using them to service indexing requests from other apps. This is the runtime entry point +/// for the entire module. +pub async fn provider( + our: String, + configs: SavedConfigs, + send_to_loop: MessageSender, + mut recv_in_client: MessageReceiver, + mut net_error_recv: NetworkErrorReceiver, + caps_oracle: CapMessageSender, + print_tx: PrintSender, +) -> Result<()> { + let mut state = ModuleState { + our: Arc::new(our), + access_settings: AccessSettings { + public: false, + allow: HashSet::new(), + deny: HashSet::new(), + }, + providers: Arc::new(DashMap::new()), + active_subscriptions: Arc::new(DashMap::new()), + response_channels: Arc::new(DashMap::new()), + send_to_loop, + print_tx, + }; + + // convert saved configs into data structure that we will use to route queries + for entry in configs { + let mut ap = state + .providers + .entry(entry.chain_id) + .or_insert(ActiveProviders { + urls: vec![], + nodes: vec![], + }); + ap.add_provider_config(entry); + } + + verbose_print(&state.print_tx, "eth: provider initialized").await; + + loop { + tokio::select! { + Some(wrapped_error) = net_error_recv.recv() => { + handle_network_error( + wrapped_error, + &state.active_subscriptions, + &state.response_channels, + &state.print_tx + ).await; + } + Some(km) = recv_in_client.recv() => { + let km_id = km.id; + let response_target = km.rsvp.as_ref().unwrap_or(&km.source).clone(); + if let Err(e) = handle_message( + &mut state, + km, + &caps_oracle, + ) + .await + { + error_message( + &state.our, + km_id, + response_target, + e, + &state.send_to_loop + ).await; + }; + } + } + } +} + +async fn handle_network_error( + wrapped_error: WrappedSendError, + active_subscriptions: &ActiveSubscriptions, + response_channels: &ResponseChannels, + print_tx: &PrintSender, +) { + verbose_print(&print_tx, "eth: got network error").await; + // if we hold active subscriptions for the remote node that this error refers to, + // close them here -- they will need to resubscribe + // TODO is this necessary? + if let Some(sub_map) = active_subscriptions.get(&wrapped_error.source) { + for (_sub_id, sub) in sub_map.iter() { + if let ActiveSub::Local(handle) = sub { + verbose_print( + &print_tx, + "eth: closing local sub in response to network error", + ) + .await; + handle.abort(); + } + } + } + // we got an error from a remote node provider -- + // forward it to response channel if it exists + if let Some(chan) = response_channels.get(&wrapped_error.id) { + // can't close channel here, as response may be an error + // and fulfill_request may wish to try other providers. + verbose_print(&print_tx, "eth: sent network error to response channel").await; + let _ = chan.send(Err(wrapped_error)).await; + } +} + +/// handle incoming requests, namely [`EthAction`] and [`EthConfigAction`]. +/// also handle responses that are passthroughs from remote provider nodes. +async fn handle_message( + state: &mut ModuleState, + km: KernelMessage, + caps_oracle: &CapMessageSender, +) -> Result<(), EthError> { + match &km.message { + Message::Response(_) => { + // map response to the correct channel + if let Some(chan) = state.response_channels.get(&km.id) { + // can't close channel here, as response may be an error + // and fulfill_request may wish to try other providers. + let _ = chan.send(Ok(km)).await; + } else { + verbose_print( + &state.print_tx, + "eth: got response but no matching channel found", + ) + .await; + } + } + Message::Request(req) => { + let timeout = req.expects_response.unwrap_or(60); + let Ok(req) = serde_json::from_slice::(&req.body) else { + return Err(EthError::MalformedRequest); + }; + match req { + IncomingReq::EthAction(eth_action) => { + return handle_eth_action(state, km, timeout, eth_action).await; + } + IncomingReq::EthConfigAction(eth_config_action) => { + kernel_message( + &state.our.clone(), + km.id, + km.rsvp.as_ref().unwrap_or(&km.source).clone(), + None, + false, + None, + handle_eth_config_action(state, caps_oracle, &km, eth_config_action).await, + &state.send_to_loop, + ) + .await; + } + IncomingReq::EthSubResult(eth_sub_result) => { + // forward this to rsvp, if we have the sub id in our active subs + let Some(rsvp) = km.rsvp else { + return Ok(()); // no rsvp, no need to forward + }; + let sub_id = match eth_sub_result { + Ok(EthSub { id, .. }) => id, + Err(EthSubError { id, .. }) => id, + }; + if let Some(sub_map) = state.active_subscriptions.get(&rsvp) { + if let Some(ActiveSub::Remote { + provider_node, + sender, + .. + }) = sub_map.get(&sub_id) + { + if provider_node == &km.source.node { + if let Ok(()) = sender.send(eth_sub_result).await { + return Ok(()); + } + } + } + } + // tell the remote provider that we don't have this sub + // so they can stop sending us updates + verbose_print( + &state.print_tx, + "eth: got eth_sub_result but no matching sub found", + ) + .await; + kernel_message( + &state.our.clone(), + km.id, + km.source.clone(), + None, + true, + None, + EthAction::UnsubscribeLogs(sub_id), + &state.send_to_loop, + ) + .await; + } + IncomingReq::SubKeepalive(sub_id) => { + // source expects that we have a local sub for them with this id + // if we do, no action required, otherwise, throw them an error. + if let Some(sub_map) = state.active_subscriptions.get(&km.source) { + if sub_map.contains_key(&sub_id) { + return Ok(()); + } + } + } + } + } + } + Ok(()) +} + +async fn handle_eth_action( + state: &mut ModuleState, + km: KernelMessage, + timeout: u64, + eth_action: EthAction, +) -> Result<(), EthError> { + // check our access settings if the request is from a remote node + if km.source.node != *state.our { + if state.access_settings.deny.contains(&km.source.node) { + verbose_print( + &state.print_tx, + "eth: got eth_action from unauthorized remote source", + ) + .await; + return Err(EthError::PermissionDenied); + } + if !state.access_settings.public { + if !state.access_settings.allow.contains(&km.source.node) { + verbose_print( + &state.print_tx, + "eth: got eth_action from unauthorized remote source", + ) + .await; + return Err(EthError::PermissionDenied); + } + } + } + + verbose_print( + &state.print_tx, + &format!("eth: handling eth_action {eth_action:?}"), + ) + .await; + + // for each incoming action, we need to assign a provider from our map + // based on the chain id. once we assign a provider, we can use it for + // this request. if the provider is not usable, cycle through options + // before returning an error. + match eth_action { + EthAction::SubscribeLogs { sub_id, .. } => { + tokio::spawn(subscription::create_new_subscription( + state.our.to_string(), + km.id, + km.source.clone(), + km.rsvp, + state.send_to_loop.clone(), + sub_id, + eth_action, + state.providers.clone(), + state.active_subscriptions.clone(), + state.response_channels.clone(), + state.print_tx.clone(), + )); + } + EthAction::UnsubscribeLogs(sub_id) => { + let mut sub_map = state + .active_subscriptions + .entry(km.source) + .or_insert(HashMap::new()); + if let Some(sub) = sub_map.remove(&sub_id) { + sub.close(sub_id, state).await; + } + } + EthAction::Request { .. } => { + let (sender, receiver) = tokio::sync::mpsc::channel(1); + state.response_channels.insert(km.id, sender); + let our = state.our.to_string(); + let send_to_loop = state.send_to_loop.clone(); + let providers = state.providers.clone(); + let response_channels = state.response_channels.clone(); + let print_tx = state.print_tx.clone(); + tokio::spawn(async move { + match tokio::time::timeout( + std::time::Duration::from_secs(timeout), + fulfill_request( + &our, + km.id, + &send_to_loop, + eth_action, + providers, + receiver, + &print_tx, + ), + ) + .await + { + Ok(response) => { + kernel_message( + &our, + km.id, + km.rsvp.unwrap_or(km.source), + None, + false, + None, + response, + &send_to_loop, + ) + .await; + } + Err(_) => { + // task timeout + error_message(&our, km.id, km.source, EthError::RpcTimeout, &send_to_loop) + .await; + } + } + response_channels.remove(&km.id); + }); + } + } + Ok(()) +} + +async fn fulfill_request( + our: &str, + km_id: u64, + send_to_loop: &MessageSender, + eth_action: EthAction, + providers: Providers, + mut remote_request_receiver: ProcessMessageReceiver, + print_tx: &PrintSender, +) -> EthResponse { + let EthAction::Request { + chain_id, + ref method, + ref params, + } = eth_action + else { + return EthResponse::Err(EthError::PermissionDenied); // will never hit + }; + let Some(method) = to_static_str(&method) else { + return EthResponse::Err(EthError::InvalidMethod(method.to_string())); + }; + let Some(mut aps) = providers.get_mut(&chain_id) else { + return EthResponse::Err(EthError::NoRpcForChain); + }; + // first, try any url providers we have for this chain, + // then if we have none or they all fail, go to node provider. + // finally, if no provider works, return an error. + for url_provider in &mut aps.urls { + let pubsub = match &url_provider.pubsub { + Some(pubsub) => pubsub, + None => { + if let Ok(()) = activate_url_provider(url_provider).await { + verbose_print(print_tx, "eth: activated a url provider").await; + url_provider.pubsub.as_ref().unwrap() + } else { + continue; + } + } + }; + let Ok(value) = pubsub.inner().prepare(method, params.clone()).await else { + // this provider failed and needs to be reset + url_provider.pubsub = None; + continue; + }; + return EthResponse::Response { value }; + } + for node_provider in &mut aps.nodes { + let response = forward_to_node_provider( + our, + km_id, + node_provider, + eth_action.clone(), + send_to_loop, + &mut remote_request_receiver, + ) + .await; + if let EthResponse::Err(e) = response { + if e == EthError::RpcMalformedResponse { + node_provider.usable = false; + } + } else { + return response; + } + } + EthResponse::Err(EthError::NoRpcForChain) +} + +/// take an EthAction and send it to a node provider, then await a response. +async fn forward_to_node_provider( + our: &str, + km_id: u64, + node_provider: &NodeProvider, + eth_action: EthAction, + send_to_loop: &MessageSender, + receiver: &mut ProcessMessageReceiver, +) -> EthResponse { + if !node_provider.usable || node_provider.name == our { + return EthResponse::Err(EthError::PermissionDenied); + } + // in order, forward the request to each node provider + // until one sends back a satisfactory response + kernel_message( + our, + km_id, + Address { + node: node_provider.name.clone(), + process: ETH_PROCESS_ID.clone(), + }, + None, + true, + Some(60), // TODO + eth_action.clone(), + &send_to_loop, + ) + .await; + let Ok(Some(Ok(response_km))) = + tokio::time::timeout(std::time::Duration::from_secs(30), receiver.recv()).await + else { + return EthResponse::Err(EthError::RpcTimeout); + }; + let Message::Response((resp, _context)) = response_km.message else { + // if we hit this, they spoofed a request with same id, ignore and possibly punish + return EthResponse::Err(EthError::RpcMalformedResponse); + }; + let Ok(eth_response) = serde_json::from_slice::(&resp.body) else { + // if we hit this, they sent a malformed response, ignore and possibly punish + return EthResponse::Err(EthError::RpcMalformedResponse); + }; + eth_response +} + +async fn handle_eth_config_action( + state: &mut ModuleState, + caps_oracle: &CapMessageSender, + km: &KernelMessage, + eth_config_action: EthConfigAction, +) -> EthConfigResponse { + if km.source.node != *state.our { + verbose_print( + &state.print_tx, + "eth: got eth_config_action from unauthorized remote source", + ) + .await; + return EthConfigResponse::PermissionDenied; + } + + // check capabilities to ensure the sender is allowed to make this request + if !check_for_root_cap(&state.our, &km.source.process, caps_oracle).await { + verbose_print( + &state.print_tx, + "eth: got eth_config_action from unauthorized local source", + ) + .await; + return EthConfigResponse::PermissionDenied; + } + + verbose_print( + &state.print_tx, + &format!("eth: handling eth_config_action {eth_config_action:?}"), + ) + .await; + + // modify our providers and access settings based on config action + match eth_config_action { + EthConfigAction::AddProvider(provider) => { + let mut aps = state + .providers + .entry(provider.chain_id) + .or_insert(ActiveProviders { + urls: vec![], + nodes: vec![], + }); + aps.add_provider_config(provider); + } + EthConfigAction::RemoveProvider((chain_id, remove)) => { + if let Some(mut aps) = state.providers.get_mut(&chain_id) { + aps.remove_provider(&remove); + } + } + EthConfigAction::SetPublic => { + state.access_settings.public = true; + } + EthConfigAction::SetPrivate => { + state.access_settings.public = false; + } + EthConfigAction::AllowNode(node) => { + state.access_settings.allow.insert(node); + } + EthConfigAction::UnallowNode(node) => { + state.access_settings.allow.remove(&node); + } + EthConfigAction::DenyNode(node) => { + state.access_settings.deny.insert(node); + } + EthConfigAction::UndenyNode(node) => { + state.access_settings.deny.remove(&node); + } + EthConfigAction::SetProviders(new_providers) => { + let new_map = DashMap::new(); + for entry in new_providers { + let mut aps = new_map.entry(entry.chain_id).or_insert(ActiveProviders { + urls: vec![], + nodes: vec![], + }); + aps.add_provider_config(entry); + } + state.providers = Arc::new(new_map); + } + EthConfigAction::GetProviders => { + return EthConfigResponse::Providers(providers_to_saved_configs(&state.providers)); + } + EthConfigAction::GetAccessSettings => { + return EthConfigResponse::AccessSettings(state.access_settings.clone()); + } + } + EthConfigResponse::Ok +} + +fn providers_to_saved_configs(providers: &Providers) -> SavedConfigs { + providers + .iter() + .map(|entry| { + entry + .urls + .iter() + .map(|url_provider| ProviderConfig { + chain_id: *entry.key(), + provider: NodeOrRpcUrl::RpcUrl(url_provider.url.clone()), + trusted: url_provider.trusted, + }) + .chain(entry.nodes.iter().map(|node_provider| ProviderConfig { + chain_id: *entry.key(), + provider: NodeOrRpcUrl::Node { + kns_update: KnsUpdate { + name: node_provider.name.clone(), + owner: "".to_string(), + node: "".to_string(), + public_key: "".to_string(), + ip: "".to_string(), + port: 0, + routers: vec![], + }, + use_as_provider: node_provider.usable, + }, + trusted: node_provider.trusted, + })) + .collect::>() + }) + .flatten() + .collect() +} + +async fn check_for_root_cap( + our: &str, + process: &ProcessId, + caps_oracle: &CapMessageSender, +) -> bool { + let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); + caps_oracle + .send(CapMessage::Has { + on: process.clone(), + cap: Capability { + issuer: Address { + node: our.to_string(), + process: ETH_PROCESS_ID.clone(), + }, + params: serde_json::to_string(&serde_json::json!({ + "root": true, + })) + .unwrap(), + }, + responder: send_cap_bool, + }) + .await + .expect("eth: capability oracle died!"); + recv_cap_bool.await.unwrap_or(false) +} + +async fn verbose_print(print_tx: &PrintSender, content: &str) { + let _ = print_tx + .send(Printout { + verbosity: 2, + content: content.to_string(), + }) + .await; +} + +async fn error_message( + our: &str, + km_id: u64, + target: Address, + error: EthError, + send_to_loop: &MessageSender, +) { + kernel_message( + our, + km_id, + target, + None, + false, + None, + EthResponse::Err(error), + send_to_loop, + ) + .await +} + +async fn kernel_message( + our: &str, + km_id: u64, + target: Address, + rsvp: Option
, + req: bool, + timeout: Option, + body: T, + send_to_loop: &MessageSender, +) { + let _ = send_to_loop + .send(KernelMessage { + id: km_id, + source: Address { + node: our.to_string(), + process: ETH_PROCESS_ID.clone(), + }, + target, + rsvp, + message: if req { + Message::Request(Request { + inherit: false, + expects_response: timeout, + body: serde_json::to_vec(&body).unwrap(), + metadata: None, + capabilities: vec![], + }) + } else { + Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&body).unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + )) + }, + lazy_load_blob: None, + }) + .await; +} diff --git a/kinode/src/eth/provider.rs b/kinode/src/eth/provider.rs deleted file mode 100644 index 4f45a605f..000000000 --- a/kinode/src/eth/provider.rs +++ /dev/null @@ -1,490 +0,0 @@ -use alloy_providers::provider::Provider; -use alloy_pubsub::{PubSubFrontend, RawSubscription}; -use alloy_rpc_client::ClientBuilder; -use alloy_rpc_types::pubsub::SubscriptionResult; -use alloy_transport_ws::WsConnect; -use anyhow::Result; -use dashmap::DashMap; -use lib::types::core::*; -use lib::types::eth::*; -use std::str::FromStr; -use std::sync::Arc; -use tokio::task::JoinHandle; -use url::Url; - -/// Provider config. Can currently be a node or a ws provider instance. -/// Future: add chainId configs, several nodes and fallbacks. -pub enum ProviderConfig { - Node(String), - Provider(Provider), -} - -/// The ETH provider runtime process is responsible for connecting to one or more ETH RPC providers -/// and using them to service indexing requests from other apps. This could also be done by a wasm -/// app, but in the future, this process will hopefully expand in scope to perform more complex -/// indexing and ETH node responsibilities. -pub async fn provider( - our: String, - provider_node: ProviderInput, - public: bool, - send_to_loop: MessageSender, - mut recv_in_client: MessageReceiver, - _print_tx: PrintSender, -) -> Result<()> { - let our = Arc::new(our); - - // Initialize the provider conditionally based on rpc_url - // Todo: make provider support multiple transports, one direct and another passthrough. - let provider_config = match provider_node { - ProviderInput::Ws(rpc_url) => { - // Validate and parse the WebSocket URL - match Url::parse(&rpc_url)?.scheme() { - "ws" | "wss" => { - let connector = WsConnect { - url: rpc_url, - auth: None, - }; - let client = ClientBuilder::default().ws(connector).await?; - ProviderConfig::Provider(Provider::new_with_client(client)) - } - _ => { - return Err(anyhow::anyhow!( - "Only `ws://` or `wss://` URLs are supported." - )) - } - } - } - ProviderInput::Node(node_id) => { - // Directly use the node ID - ProviderConfig::Node(node_id) - } - }; - - let provider_config = Arc::new(provider_config); - - // handles of longrunning subscriptions. - let connections: DashMap<(ProcessId, u64), JoinHandle>> = DashMap::new(); - let connections = Arc::new(connections); - - // add whitelist, logic in provider middleware? - while let Some(km) = recv_in_client.recv().await { - // clone Arcs - let our = our.clone(); - let send_to_loop = send_to_loop.clone(); - let provider_config = provider_config.clone(); - let connections = connections.clone(); - tokio::spawn(async move { - if let Err(e) = handle_message( - &our, - &km, - &send_to_loop, - provider_config, - connections, - public, - ) - .await - { - let _ = send_to_loop - .send(make_error_message(our.to_string(), km, e)) - .await; - }; - }); - } - Err(anyhow::anyhow!("eth: fatal: message receiver closed!")) -} - -async fn handle_message( - our: &str, - km: &KernelMessage, - send_to_loop: &MessageSender, - provider_config: Arc, - connections: Arc>>>, - public: bool, -) -> Result<(), EthError> { - match &km.message { - Message::Request(req) => { - match &*provider_config { - ProviderConfig::Node(node) => { - if km.source.node == our { - // we have no provider, let's send this request to someone who has one. - let request = KernelMessage { - id: km.id, - source: Address { - node: our.to_string(), - process: ETH_PROCESS_ID.clone(), - }, - target: Address { - node: "jugodenaranja.os".to_string(), - process: ETH_PROCESS_ID.clone(), - }, - rsvp: Some(km.source.clone()), - message: Message::Request(req.clone()), - lazy_load_blob: None, - }; - - let _ = send_to_loop.send(request).await; - } else { - // either someone asking us for rpc, or we are passing through a sub event. - handle_remote_request(our, km, send_to_loop, None, connections, public) - .await? - } - } - ProviderConfig::Provider(provider) => { - if km.source.node == our { - handle_local_request(our, km, send_to_loop, &provider, connections, public) - .await? - } else { - handle_remote_request( - our, - km, - send_to_loop, - Some(provider), - connections, - public, - ) - .await? - } - } - } - } - Message::Response(_) => { - // handle passthrough responses, send to rsvp. - if km.source.process == "eth:distro:sys" { - if let Some(rsvp) = &km.rsvp { - let _ = send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our.to_string(), - process: ETH_PROCESS_ID.clone(), - }, - target: rsvp.clone(), - rsvp: None, - message: km.message.clone(), - lazy_load_blob: None, - }) - .await; - } - } - } - } - Ok(()) -} - -async fn handle_local_request( - our: &str, - km: &KernelMessage, - send_to_loop: &MessageSender, - provider: &Provider, - connections: Arc>>>, - public: bool, -) -> Result<(), EthError> { - let Message::Request(req) = &km.message else { - return Err(EthError::InvalidMethod( - "eth: only accepts requests".to_string(), - )); - }; - let action = serde_json::from_slice::(&req.body).map_err(|e| { - EthError::InvalidMethod(format!("eth: failed to deserialize request: {:?}", e)) - })?; - - // we might want some of these in payloads.. sub items? - let return_body: EthResponse = match action { - EthAction::SubscribeLogs { - sub_id, - kind, - params, - } => { - let sub_id = (km.target.process.clone(), sub_id); - - let kind = serde_json::to_value(&kind).unwrap(); - let params = serde_json::to_value(¶ms).unwrap(); - - let id = provider - .inner() - .prepare("eth_subscribe", [kind, params]) - .await - .map_err(|e| EthError::TransportError(e.to_string()))?; - - let rx = provider.inner().get_raw_subscription(id).await; - let handle = tokio::spawn(handle_subscription_stream( - our.to_string(), - sub_id.1.clone(), - rx, - km.source.clone(), - km.rsvp.clone(), - send_to_loop.clone(), - )); - - connections.insert(sub_id, handle); - EthResponse::Ok - } - EthAction::UnsubscribeLogs(sub_id) => { - let sub_id = (km.target.process.clone(), sub_id); - let handle = connections - .remove(&sub_id) - .ok_or(EthError::SubscriptionNotFound)?; - - handle.1.abort(); - EthResponse::Ok - } - EthAction::Request { method, params } => { - let method = to_static_str(&method).ok_or(EthError::InvalidMethod(method))?; - - let response: serde_json::Value = provider - .inner() - .prepare(method, params) - .await - .map_err(|e| EthError::TransportError(e.to_string()))?; - EthResponse::Response { value: response } - } - }; - if let Some(_) = req.expects_response { - let _ = send_to_loop - .send(KernelMessage { - id: km.id, - source: Address { - node: our.to_string(), - process: ETH_PROCESS_ID.clone(), - }, - target: km.source.clone(), - rsvp: km.rsvp.clone(), - message: Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&return_body).unwrap(), - metadata: req.metadata.clone(), - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) - .await; - } - - Ok(()) -} - -// here we are either processing another nodes request. -// or we are passing through an ethSub Request.. -async fn handle_remote_request( - our: &str, - km: &KernelMessage, - send_to_loop: &MessageSender, - provider: Option<&Provider>, - connections: Arc>>>, - public: bool, -) -> Result<(), EthError> { - let Message::Request(req) = &km.message else { - return Err(EthError::InvalidMethod( - "eth: only accepts requests".to_string(), - )); - }; - - if let Some(provider) = provider { - // we need some sort of agreement perhaps on rpc providing. - // even with an agreement, fake ethsubevents could be sent to us. - // light clients could verify blocks perhaps... - if !public { - return Err(EthError::PermissionDenied("not on the list.".to_string())); - } - - let action = serde_json::from_slice::(&req.body).map_err(|e| { - EthError::InvalidMethod(format!("eth: failed to deserialize request: {:?}", e)) - })?; - - let return_body: EthResponse = match action { - EthAction::SubscribeLogs { - sub_id, - kind, - params, - } => { - let sub_id = (km.target.process.clone(), sub_id); - - let kind = serde_json::to_value(&kind).unwrap(); - let params = serde_json::to_value(¶ms).unwrap(); - - let id = provider - .inner() - .prepare("eth_subscribe", [kind, params]) - .await - .map_err(|e| EthError::TransportError(e.to_string()))?; - - let rx = provider.inner().get_raw_subscription(id).await; - let handle = tokio::spawn(handle_subscription_stream( - our.to_string(), - sub_id.1.clone(), - rx, - km.target.clone(), - km.rsvp.clone(), - send_to_loop.clone(), - )); - - connections.insert(sub_id, handle); - EthResponse::Ok - } - EthAction::UnsubscribeLogs(sub_id) => { - let sub_id = (km.target.process.clone(), sub_id); - let handle = connections - .remove(&sub_id) - .ok_or(EthError::SubscriptionNotFound)?; - - handle.1.abort(); - EthResponse::Ok - } - EthAction::Request { method, params } => { - let method = to_static_str(&method).ok_or(EthError::InvalidMethod(method))?; - - let response: serde_json::Value = provider - .inner() - .prepare(method, params) - .await - .map_err(|e| EthError::TransportError(e.to_string()))?; - - EthResponse::Response { value: response } - } - }; - - let response = KernelMessage { - id: km.id, - source: Address { - node: our.to_string(), - process: ETH_PROCESS_ID.clone(), - }, - target: km.source.clone(), - rsvp: km.rsvp.clone(), - message: Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&return_body).unwrap(), - metadata: req.metadata.clone(), - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }; - - let _ = send_to_loop.send(response).await; - } else { - // We do not have a provider, this is a reply for a request made by us. - if let Ok(eth_sub) = serde_json::from_slice::(&req.body) { - // forward... - if let Some(target) = km.rsvp.clone() { - let _ = send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our.to_string(), - process: ETH_PROCESS_ID.clone(), - }, - target: target, - rsvp: None, - message: Message::Request(req.clone()), - lazy_load_blob: None, - }) - .await; - } - } - } - Ok(()) -} - -/// Executed as a long-lived task. The JoinHandle is stored in the `connections` map. -/// This task is responsible for connecting to the ETH RPC provider and streaming logs -/// for a specific subscription made by a process. -async fn handle_subscription_stream( - our: String, - sub_id: u64, - mut rx: RawSubscription, - target: Address, - rsvp: Option
, - send_to_loop: MessageSender, -) -> Result<(), EthError> { - match rx.recv().await { - Err(e) => { - let error = Err(EthError::SubscriptionClosed(sub_id))?; - let _ = send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our, - process: ETH_PROCESS_ID.clone(), - }, - target: target.clone(), - rsvp: rsvp.clone(), - message: Message::Request(Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&EthSubResult::Err(EthSubError { - id: sub_id, - error: e.to_string(), - })) - .unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await - .unwrap(); - } - Ok(value) => { - let event: SubscriptionResult = serde_json::from_str(value.get()).map_err(|_| { - EthError::RpcError("eth: failed to deserialize subscription result".to_string()) - })?; - send_to_loop - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our, - process: ETH_PROCESS_ID.clone(), - }, - target: target.clone(), - rsvp: rsvp.clone(), - message: Message::Request(Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&EthSubResult::Ok(EthSub { - id: sub_id, - result: event, - })) - .unwrap(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await - .unwrap(); - } - } - Err(EthError::SubscriptionClosed(sub_id)) -} - -fn make_error_message(our_node: String, km: KernelMessage, error: EthError) -> KernelMessage { - let source = km.rsvp.unwrap_or_else(|| Address { - node: our_node.clone(), - process: km.source.process.clone(), - }); - KernelMessage { - id: km.id, - source: Address { - node: our_node, - process: ETH_PROCESS_ID.clone(), - }, - target: source, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&EthResponse::Err(error)).unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - } -} diff --git a/kinode/src/eth/subscription.rs b/kinode/src/eth/subscription.rs new file mode 100644 index 000000000..66d22f0bb --- /dev/null +++ b/kinode/src/eth/subscription.rs @@ -0,0 +1,354 @@ +use crate::eth::*; +use alloy_pubsub::RawSubscription; +use alloy_rpc_types::pubsub::SubscriptionResult; +use anyhow::Result; +use std::collections::HashMap; + +/// cleans itself up when the subscription is closed or fails. +pub async fn create_new_subscription( + our: String, + km_id: u64, + target: Address, + rsvp: Option
, + send_to_loop: MessageSender, + sub_id: u64, + eth_action: EthAction, + providers: Providers, + active_subscriptions: ActiveSubscriptions, + response_channels: ResponseChannels, + print_tx: PrintSender, +) { + verbose_print(&print_tx, "eth: creating new subscription").await; + match build_subscription( + &our, + km_id, + &target, + &send_to_loop, + ð_action, + &providers, + &response_channels, + &print_tx, + ) + .await + { + Ok(maybe_raw_sub) => { + // send a response to the target that the subscription was successful + kernel_message( + &our, + km_id, + target.clone(), + rsvp.clone(), + false, + None, + EthResponse::Ok, + &send_to_loop, + ) + .await; + let mut subs = active_subscriptions + .entry(target.clone()) + .or_insert(HashMap::new()); + let active_subscriptions = active_subscriptions.clone(); + subs.insert( + sub_id, + match maybe_raw_sub { + Ok(rx) => { + // this is a local sub, as in, we connect to the rpc endpt + ActiveSub::Local(tokio::spawn(async move { + // await the subscription error and kill it if so + if let Err(e) = maintain_local_subscription( + &our, + sub_id, + rx, + &target, + &rsvp, + &send_to_loop, + ) + .await + { + verbose_print( + &print_tx, + "eth: closed local subscription due to error", + ) + .await; + kernel_message( + &our, + rand::random(), + target.clone(), + None, + true, + None, + EthSubResult::Err(e), + &send_to_loop, + ) + .await; + active_subscriptions.entry(target).and_modify(|sub_map| { + sub_map.remove(&km_id); + }); + } + })) + } + Err((provider_node, remote_sub_id)) => { + // this is a remote sub, given by a relay node + let (sender, rx) = tokio::sync::mpsc::channel(10); + let keepalive_km_id = rand::random(); + let (keepalive_err_sender, keepalive_err_receiver) = + tokio::sync::mpsc::channel(1); + response_channels.insert(keepalive_km_id, keepalive_err_sender); + ActiveSub::Remote { + provider_node: provider_node.clone(), + handle: tokio::spawn(async move { + if let Err(e) = maintain_remote_subscription( + &our, + &provider_node, + remote_sub_id, + sub_id, + keepalive_km_id, + rx, + keepalive_err_receiver, + &target, + &send_to_loop, + ) + .await + { + verbose_print( + &print_tx, + "eth: closed subscription with provider node due to error", + ) + .await; + kernel_message( + &our, + rand::random(), + target.clone(), + None, + true, + None, + EthSubResult::Err(e), + &send_to_loop, + ) + .await; + active_subscriptions.entry(target).and_modify(|sub_map| { + sub_map.remove(&sub_id); + }); + response_channels.remove(&keepalive_km_id); + } + }), + sender, + } + } + }, + ); + } + Err(e) => { + error_message(&our, km_id, target.clone(), e, &send_to_loop).await; + } + } +} + +/// terrible abuse of result in return type, yes, sorry +async fn build_subscription( + our: &str, + km_id: u64, + target: &Address, + send_to_loop: &MessageSender, + eth_action: &EthAction, + providers: &Providers, + response_channels: &ResponseChannels, + print_tx: &PrintSender, +) -> Result, EthError> { + let EthAction::SubscribeLogs { + chain_id, + kind, + params, + .. + } = eth_action + else { + return Err(EthError::PermissionDenied); // will never hit + }; + let Some(mut aps) = providers.get_mut(&chain_id) else { + return Err(EthError::NoRpcForChain); + }; + // first, try any url providers we have for this chain, + // then if we have none or they all fail, go to node providers. + // finally, if no provider works, return an error. + for url_provider in &mut aps.urls { + let pubsub = match &url_provider.pubsub { + Some(pubsub) => pubsub, + None => { + if let Ok(()) = activate_url_provider(url_provider).await { + verbose_print(print_tx, "eth: activated a url provider").await; + url_provider.pubsub.as_ref().unwrap() + } else { + continue; + } + } + }; + let kind = serde_json::to_value(&kind).unwrap(); + let params = serde_json::to_value(¶ms).unwrap(); + if let Ok(id) = pubsub + .inner() + .prepare("eth_subscribe", [kind, params]) + .await + { + let rx = pubsub.inner().get_raw_subscription(id).await; + return Ok(Ok(rx)); + } + // this provider failed and needs to be reset + url_provider.pubsub = None; + } + // now we need a response channel + let (sender, mut response_receiver) = tokio::sync::mpsc::channel(1); + response_channels.insert(km_id, sender); + // we need to create our own unique sub id because in the remote provider node, + // all subs will be identified under our process address. + let remote_sub_id = rand::random(); + for node_provider in &mut aps.nodes { + match forward_to_node_provider( + &our, + km_id, + node_provider, + EthAction::SubscribeLogs { + sub_id: remote_sub_id, + chain_id: chain_id.clone(), + kind: kind.clone(), + params: params.clone(), + }, + &send_to_loop, + &mut response_receiver, + ) + .await + { + EthResponse::Ok => { + kernel_message( + &our, + km_id, + target.clone(), + None, + false, + None, + EthResponse::Ok, + &send_to_loop, + ) + .await; + response_channels.remove(&km_id); + return Ok(Err((node_provider.name.clone(), remote_sub_id))); + } + EthResponse::Response { .. } => { + // the response to a SubscribeLogs request must be an 'ok' + node_provider.usable = false; + } + EthResponse::Err(e) => { + if e == EthError::RpcMalformedResponse { + node_provider.usable = false; + } + } + } + } + return Err(EthError::NoRpcForChain); +} + +async fn maintain_local_subscription( + our: &str, + sub_id: u64, + mut rx: RawSubscription, + target: &Address, + rsvp: &Option
, + send_to_loop: &MessageSender, +) -> Result<(), EthSubError> { + while let Ok(value) = rx.recv().await { + let result: SubscriptionResult = + serde_json::from_str(value.get()).map_err(|e| EthSubError { + id: sub_id, + error: e.to_string(), + })?; + kernel_message( + our, + rand::random(), + target.clone(), + rsvp.clone(), + true, + None, + EthSubResult::Ok(EthSub { id: sub_id, result }), + &send_to_loop, + ) + .await; + } + Err(EthSubError { + id: sub_id, + error: "subscription closed unexpectedly".to_string(), + }) +} + +/// handle the subscription updates from a remote provider, +/// and also perform keepalive checks on that provider. +/// current keepalive is 30s, this can be adjusted as desired +async fn maintain_remote_subscription( + our: &str, + provider_node: &str, + remote_sub_id: u64, + sub_id: u64, + keepalive_km_id: u64, + mut rx: tokio::sync::mpsc::Receiver, + mut net_error_rx: ProcessMessageReceiver, + target: &Address, + send_to_loop: &MessageSender, +) -> Result<(), EthSubError> { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); + loop { + tokio::select! { + incoming = rx.recv() => { + match incoming { + Some(EthSubResult::Ok(upd)) => { + kernel_message( + &our, + rand::random(), + target.clone(), + None, + true, + None, + EthSubResult::Ok(EthSub { + id: sub_id, + result: upd.result, + }), + &send_to_loop, + ) + .await; + } + Some(EthSubResult::Err(e)) => { + return Err(EthSubError { + id: sub_id, + error: e.error, + }); + } + None => { + return Err(EthSubError { + id: sub_id, + error: "subscription closed unexpectedly".to_string(), + }); + + } + } + } + _ = interval.tick() => { + // perform keepalive + kernel_message( + &our, + keepalive_km_id, + Address { node: provider_node.to_string(), process: ETH_PROCESS_ID.clone() }, + None, + true, + Some(30), + IncomingReq::SubKeepalive(remote_sub_id), + &send_to_loop, + ).await; + } + incoming = net_error_rx.recv() => { + if let Some(Err(_net_error)) = incoming { + return Err(EthSubError { + id: sub_id, + error: "subscription node-provider failed keepalive".to_string(), + }); + } + } + } + } +} diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs index e5fac26fd..c2b6c0fb5 100644 --- a/kinode/src/kernel/mod.rs +++ b/kinode/src/kernel/mod.rs @@ -20,11 +20,6 @@ const PROCESS_CHANNEL_CAPACITY: usize = 100; const DEFAULT_WIT_VERSION: u32 = 0; -type ProcessMessageSender = - tokio::sync::mpsc::Sender>; -type ProcessMessageReceiver = - tokio::sync::mpsc::Receiver>; - #[derive(Serialize, Deserialize)] struct StartProcessMetadata { source: t::Address, @@ -39,8 +34,11 @@ type Senders = HashMap; type ProcessHandles = HashMap>>; enum ProcessSender { - Runtime(t::MessageSender), - Userspace(ProcessMessageSender), + Runtime { + sender: t::MessageSender, + net_errors: Option, + }, + Userspace(t::ProcessMessageSender), } /// persist kernel's process_map state for next bootup @@ -457,13 +455,6 @@ async fn handle_kernel_request( // brutal and savage killing: aborting the task. // do not do this to a process if you don't want to risk // dropped messages / un-replied-to-requests / revoked caps - caps_oracle - .send(t::CapMessage::RevokeAll { - on: process_id.clone(), - responder: tokio::sync::oneshot::channel().0, - }) - .await - .expect("event loop: fatal: sender died"); let _ = senders.remove(&process_id); let process_handle = match process_handles.remove(&process_id) { Some(ph) => ph, @@ -485,7 +476,13 @@ async fn handle_kernel_request( .await; process_handle.abort(); process_map.remove(&process_id); - let _ = persist_state(&our_name, &send_to_loop, process_map).await; + caps_oracle + .send(t::CapMessage::RevokeAll { + on: process_id.clone(), + responder: tokio::sync::oneshot::channel().0, + }) + .await + .expect("event loop: fatal: sender died"); if request.expects_response.is_none() { return; } @@ -674,9 +671,14 @@ pub async fn kernel( mut recv_debug_in_loop: t::DebugReceiver, send_to_net: t::MessageSender, home_directory_path: String, - contract_address: String, - runtime_extensions: Vec<(t::ProcessId, t::MessageSender, bool)>, - default_pki_entries: Vec, + contract_chain_and_address: (u64, String), + runtime_extensions: Vec<( + t::ProcessId, + t::MessageSender, + Option, + bool, + )>, + default_pki_entries: Vec, ) -> Result<()> { let mut config = Config::new(); config.cache_config_load_default().unwrap(); @@ -693,10 +695,19 @@ pub async fn kernel( let mut senders: Senders = HashMap::new(); senders.insert( t::ProcessId::new(Some("net"), "distro", "sys"), - ProcessSender::Runtime(send_to_net.clone()), + ProcessSender::Runtime { + sender: send_to_net.clone(), + net_errors: None, // networking module does not accept net errors sent to it + }, ); - for (process_id, sender, _) in runtime_extensions { - senders.insert(process_id, ProcessSender::Runtime(sender)); + for (process_id, sender, net_error_sender, _) in runtime_extensions { + senders.insert( + process_id, + ProcessSender::Runtime { + sender, + net_errors: net_error_sender, + }, + ); } // each running process is stored in this map @@ -849,10 +860,8 @@ pub async fn kernel( message: t::Message::Request(t::Request { inherit: false, expects_response: None, - body: rmp_serde::to_vec(&crate::net::NetActions::KnsBatchUpdate( - default_pki_entries, - )) - .unwrap(), + body: rmp_serde::to_vec(&t::NetActions::KnsBatchUpdate(default_pki_entries)) + .unwrap(), metadata: None, capabilities: vec![], }), @@ -879,7 +888,7 @@ pub async fn kernel( message: t::Message::Request(t::Request { inherit: false, expects_response: None, - body: contract_address.as_bytes().to_vec(), + body: serde_json::to_vec(&contract_chain_and_address).unwrap(), metadata: None, capabilities: vec![], }), @@ -903,8 +912,8 @@ pub async fn kernel( Some(wrapped_network_error) = network_error_recv.recv() => { let _ = send_to_terminal.send( t::Printout { - verbosity: 2, - content: format!("event loop: got network error: {:?}", wrapped_network_error) + verbosity: 3, + content: format!("{wrapped_network_error:?}") } ).await; // forward the error to the relevant process @@ -912,10 +921,10 @@ pub async fn kernel( Some(ProcessSender::Userspace(sender)) => { let _ = sender.send(Err(wrapped_network_error)).await; } - Some(ProcessSender::Runtime(_sender)) => { - // TODO should runtime modules get these? no - // this will change if a runtime process ever makes - // a message directed to not-our-node + Some(ProcessSender::Runtime { net_errors, .. }) => { + if let Some(net_errors) = net_errors { + let _ = net_errors.send(wrapped_network_error).await; + } } None => { let _ = send_to_terminal @@ -1072,7 +1081,7 @@ pub async fn kernel( let _ = send_to_terminal.send( t::Printout { verbosity: 3, - content: format!("{}", kernel_message) + content: format!("{kernel_message}") } ).await; @@ -1103,7 +1112,7 @@ pub async fn kernel( Some(ProcessSender::Userspace(sender)) => { let _ = sender.send(Ok(kernel_message)).await; } - Some(ProcessSender::Runtime(sender)) => { + Some(ProcessSender::Runtime { sender, .. }) => { sender.send(kernel_message).await.expect("event loop: fatal: runtime module died"); } None => { @@ -1130,6 +1139,12 @@ pub async fn kernel( }, // capabilities oracle: handles all requests to add, drop, and check capabilities Some(cap_message) = caps_oracle_receiver.recv() => { + let _ = send_to_terminal.send( + t::Printout { + verbosity: 3, + content: format!("{cap_message:?}") + } + ).await; match cap_message { t::CapMessage::Add { on, caps, responder } => { // insert cap in process map @@ -1187,16 +1202,16 @@ pub async fn kernel( }, t::CapMessage::RevokeAll { on, responder } => { let Some(granter) = reverse_cap_index.get(&on) else { + let _ = persist_state(&our.name, &send_to_loop, &process_map).await; let _ = responder.send(true); continue; }; for (grantee, caps) in granter { - let Some(entry) = process_map.get_mut(&grantee) else { - continue; + if let Some(entry) = process_map.get_mut(&grantee) { + for cap in caps { + entry.capabilities.remove(&cap); + } }; - for cap in caps { - entry.capabilities.remove(&cap); - } } let _ = persist_state(&our.name, &send_to_loop, &process_map).await; let _ = responder.send(true); diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index 434ac9f7b..66e7147ed 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -1,4 +1,3 @@ -use crate::kernel::{ProcessMessageReceiver, ProcessMessageSender}; use crate::KERNEL_PROCESS_ID; use anyhow::Result; use lib::types::core as t; @@ -33,9 +32,9 @@ pub struct ProcessState { /// information about ourself pub metadata: t::ProcessMetadata, /// pipe from which we get messages from the main event loop - pub recv_in_process: ProcessMessageReceiver, + pub recv_in_process: t::ProcessMessageReceiver, /// pipe to send messages to ourself (received in `recv_in_process`) - pub self_sender: ProcessMessageSender, + pub self_sender: t::ProcessMessageSender, /// pipe for sending messages to the main event loop pub send_to_loop: t::MessageSender, /// pipe for sending [`t::Printout`]s to the terminal @@ -492,8 +491,8 @@ pub async fn make_process_loop( metadata: t::ProcessMetadata, send_to_loop: t::MessageSender, send_to_terminal: t::PrintSender, - mut recv_in_process: ProcessMessageReceiver, - send_to_process: ProcessMessageSender, + mut recv_in_process: t::ProcessMessageReceiver, + send_to_process: t::ProcessMessageSender, wasm_bytes: Vec, caps_oracle: t::CapMessageSender, engine: Engine, @@ -548,24 +547,30 @@ pub async fn make_process_loop( metadata.our.process.package(), metadata.our.process.publisher() ); - if let Err(e) = fs::create_dir_all(&tmp_path).await { - panic!("failed creating tmp dir! {:?}", e); + + let mut wasi = WasiCtxBuilder::new(); + + // TODO make guarantees about this + if let Ok(Ok(())) = tokio::time::timeout( + std::time::Duration::from_secs(5), + fs::create_dir_all(&tmp_path), + ) + .await + { + if let Ok(wasi_tempdir) = + Dir::open_ambient_dir(tmp_path.clone(), wasmtime_wasi::sync::ambient_authority()) + { + wasi.preopened_dir( + wasi_tempdir, + DirPerms::all(), + FilePerms::all(), + tmp_path.clone(), + ) + .env("TEMP_DIR", tmp_path); + } } - let Ok(wasi_tempdir) = - Dir::open_ambient_dir(tmp_path.clone(), wasmtime_wasi::sync::ambient_authority()) - else { - panic!("failed to open ambient tmp dir!"); - }; - let wasi = WasiCtxBuilder::new() - .preopened_dir( - wasi_tempdir, - DirPerms::all(), - FilePerms::all(), - tmp_path.clone(), - ) - .env("TEMP_DIR", tmp_path) - .stderr(wasi_stderr.clone()) - .build(); + + let wasi = wasi.stderr(wasi_stderr.clone()).build(); wasmtime_wasi::preview2::command::add_to_linker(&mut linker).unwrap(); diff --git a/kinode/src/main.rs b/kinode/src/main.rs index e0df5e57c..f731599e6 100644 --- a/kinode/src/main.rs +++ b/kinode/src/main.rs @@ -2,17 +2,14 @@ use anyhow::Result; use clap::{arg, value_parser, Command}; -use rand::seq::SliceRandom; +use lib::types::core::*; +#[cfg(feature = "simulation-mode")] +use ring::{rand::SystemRandom, signature, signature::KeyPair}; use std::env; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use tokio::{fs, time::timeout}; -use lib::types::core::*; - -#[cfg(feature = "simulation-mode")] -use ring::{rand::SystemRandom, signature, signature::KeyPair}; - mod eth; mod http; mod kernel; @@ -40,12 +37,6 @@ const SQLITE_CHANNEL_CAPACITY: usize = 1_000; const VERSION: &str = env!("CARGO_PKG_VERSION"); -/// This can and should be an environment variable / setting. It configures networking -/// such that indirect nodes always use routers, even when target is a direct node, -/// such that only their routers can ever see their physical networking details. -#[cfg(not(feature = "simulation-mode"))] -const REVEAL_IP: bool = true; - /// default routers as a eth-provider fallback const DEFAULT_PROVIDERS_TESTNET: &str = include_str!("../default_providers_testnet.json"); const DEFAULT_PROVIDERS_MAINNET: &str = include_str!("../default_providers_mainnet.json"); @@ -111,17 +102,15 @@ async fn main() { .default_value("false") .value_parser(value_parser!(bool)), ) - .arg( - arg!(--public "If set, allow rpc passthrough") - .default_value("false") - .value_parser(value_parser!(bool)), - ) - .arg(arg!(--rpcnode "RPC node provider must be a valid address").required(false)) - .arg(arg!(--rpc "Ethereum RPC endpoint (must be wss://)").required(false)) .arg( arg!(--verbosity "Verbosity level: higher is more verbose") .default_value("0") .value_parser(value_parser!(u8)), + ) + .arg( + arg!(--"reveal-ip" "If set to false, as an indirect node, always use routers to connect to other nodes.") + .default_value("true") + .value_parser(value_parser!(bool)), ); #[cfg(feature = "simulation-mode")] @@ -146,9 +135,6 @@ async fn main() { None => (8080, false), }; let on_testnet = *matches.get_one::("testnet").unwrap(); - let public = *matches.get_one::("public").unwrap(); - let rpc_url = matches.get_one::("rpc").cloned(); - let rpc_node = matches.get_one::("rpcnode").cloned(); #[cfg(not(feature = "simulation-mode"))] let is_detached = false; @@ -163,10 +149,10 @@ async fn main() { *matches.get_one::("detached").unwrap(), ); - let contract_address = if on_testnet { - register::KNS_SEPOLIA_ADDRESS + let contract_chain_and_address: (u64, String) = if on_testnet { + (11155111, register::KNS_SEPOLIA_ADDRESS.to_string()) } else { - register::KNS_OPTIMISM_ADDRESS + (10, register::KNS_OPTIMISM_ADDRESS.to_string()) }; let verbose_mode = *matches.get_one::("verbosity").unwrap(); @@ -196,47 +182,24 @@ async fn main() { } } + if let Err(e) = fs::create_dir_all(home_directory_path).await { + panic!("failed to create home directory: {:?}", e); + } + println!("home at {}\r", home_directory_path); + // default eth providers/routers - type KnsUpdate = crate::net::KnsUpdate; - let default_pki_entries: Vec = - match fs::read_to_string(format!("{}/.default_providers", home_directory_path)).await { - Ok(contents) => serde_json::from_str(&contents).unwrap(), + let eth_provider_config: lib::eth::SavedConfigs = + match fs::read_to_string(format!("{}/.eth_providers", home_directory_path)).await { + Ok(contents) => { + println!("loaded saved eth providers\r"); + serde_json::from_str(&contents).unwrap() + } Err(_) => match on_testnet { true => serde_json::from_str(DEFAULT_PROVIDERS_TESTNET).unwrap(), false => serde_json::from_str(DEFAULT_PROVIDERS_MAINNET).unwrap(), }, }; - type ProviderInput = lib::eth::ProviderInput; - let eth_provider: ProviderInput; - - match (rpc_url.clone(), rpc_node) { - (Some(url), Some(_)) => { - println!("passed both node and url for rpc, using url."); - eth_provider = ProviderInput::Ws(url); - } - (Some(url), None) => { - eth_provider = ProviderInput::Ws(url); - } - (None, Some(ref node)) => { - println!("trying to use remote node for rpc: {}", node); - eth_provider = ProviderInput::Node(node.clone()); - } - (None, None) => { - let random_provider = default_pki_entries.choose(&mut rand::thread_rng()).unwrap(); - let default_provider = random_provider.name.clone(); - - println!("no rpc provided, using a default: {}", default_provider); - - eth_provider = ProviderInput::Node(default_provider); - } - } - - if let Err(e) = fs::create_dir_all(home_directory_path).await { - panic!("failed to create home directory: {:?}", e); - } - println!("home at {}\r", home_directory_path); - // kernel receives system messages via this channel, all other modules send messages let (kernel_message_sender, kernel_message_receiver): (MessageSender, MessageReceiver) = mpsc::channel(EVENT_LOOP_CHANNEL_CAPACITY); @@ -268,6 +231,8 @@ async fn main() { mpsc::channel(HTTP_CHANNEL_CAPACITY); let (eth_provider_sender, eth_provider_receiver): (MessageSender, MessageReceiver) = mpsc::channel(ETH_PROVIDER_CHANNEL_CAPACITY); + let (eth_net_error_sender, eth_net_error_receiver): (NetworkErrorSender, NetworkErrorReceiver) = + mpsc::channel(EVENT_LOOP_CHANNEL_CAPACITY); // http client performs http requests on behalf of processes let (http_client_sender, http_client_receiver): (MessageSender, MessageReceiver) = mpsc::channel(HTTP_CLIENT_CHANNEL_CAPACITY); @@ -435,41 +400,49 @@ async fn main() { ( ProcessId::new(Some("http_server"), "distro", "sys"), http_server_sender, + None, false, ), ( ProcessId::new(Some("http_client"), "distro", "sys"), http_client_sender, + None, false, ), ( ProcessId::new(Some("timer"), "distro", "sys"), timer_service_sender, + None, true, ), ( ProcessId::new(Some("eth"), "distro", "sys"), eth_provider_sender, + Some(eth_net_error_sender), false, ), ( ProcessId::new(Some("vfs"), "distro", "sys"), vfs_message_sender, + None, false, ), ( ProcessId::new(Some("state"), "distro", "sys"), state_sender, + None, false, ), ( ProcessId::new(Some("kv"), "distro", "sys"), kv_sender, + None, false, ), ( ProcessId::new(Some("sqlite"), "distro", "sys"), sqlite_sender, + None, false, ), ]; @@ -506,9 +479,22 @@ async fn main() { kernel_debug_message_receiver, net_message_sender.clone(), home_directory_path.clone(), - contract_address.to_string(), + contract_chain_and_address.clone(), runtime_extensions, - default_pki_entries, + // from saved eth provider config, filter for node identities which will be + // bootstrapped into the networking module, so that this node can start + // getting PKI info ("bootstrap") + eth_provider_config + .clone() + .into_iter() + .filter_map(|config| { + if let lib::eth::NodeOrRpcUrl::Node { kns_update, .. } = config.provider { + Some(kns_update) + } else { + None + } + }) + .collect(), )); #[cfg(not(feature = "simulation-mode"))] tasks.spawn(net::networking( @@ -520,8 +506,8 @@ async fn main() { print_sender.clone(), net_message_sender, net_message_receiver, - contract_address.to_string(), - REVEAL_IP, + contract_chain_and_address.1, + *matches.get_one::("reveal-ip").unwrap_or(&true), )); #[cfg(feature = "simulation-mode")] tasks.spawn(net::mock_client( @@ -577,26 +563,15 @@ async fn main() { timer_service_receiver, print_sender.clone(), )); - #[cfg(not(feature = "simulation-mode"))] - tasks.spawn(eth::provider::provider( + tasks.spawn(eth::provider( our.name.clone(), - eth_provider, - public, + eth_provider_config, kernel_message_sender.clone(), eth_provider_receiver, + eth_net_error_receiver, + caps_oracle_sender.clone(), print_sender.clone(), )); - #[cfg(feature = "simulation-mode")] - if let Some(ref rpc_url) = rpc_url { - tasks.spawn(eth::provider::provider( - our.name.clone(), - eth_provider, - public, - kernel_message_sender.clone(), - eth_provider_receiver, - print_sender.clone(), - )); - } tasks.spawn(vfs::vfs( our.name.clone(), kernel_message_sender.clone(), @@ -668,5 +643,4 @@ async fn main() { crossterm::terminal::SetTitle(""), ); println!("\r\n\x1b[38;5;196m{}\x1b[0m", quit_msg); - return; } diff --git a/kinode/src/net/types.rs b/kinode/src/net/types.rs index fd8f11571..da850cfa0 100644 --- a/kinode/src/net/types.rs +++ b/kinode/src/net/types.rs @@ -1,5 +1,6 @@ use dashmap::DashMap; use futures::stream::{SplitSink, SplitStream}; +use lib::types::core::*; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; @@ -7,8 +8,6 @@ use tokio::net::TcpStream; use tokio::sync::mpsc::UnboundedSender; use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; -use lib::types::core::*; - /// Sent to a node when you want to connect directly to them. /// Sent in the 'e, ee, s, es' and 's, se' phases of XX noise protocol pattern. #[derive(Debug, Deserialize, Serialize)] @@ -86,35 +85,3 @@ pub struct Peer { pub routing_for: bool, pub sender: UnboundedSender, } - -/// Must be parsed from message pack vector. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum NetActions { - /// Received from a router of ours when they have a new pending passthrough for us. - /// We should respond (if we desire) by using them to initialize a routed connection - /// with the NodeId given. - ConnectionRequest(NodeId), - /// can only receive from trusted source, for now just ourselves locally, - /// in the future could get from remote provider - KnsUpdate(KnsUpdate), - KnsBatchUpdate(Vec), -} - -/// For now, only sent in response to a ConnectionRequest. -/// Must be parsed from message pack vector -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum NetResponses { - Accepted(NodeId), - Rejected(NodeId), -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct KnsUpdate { - pub name: String, // actual username / domain name - pub owner: String, - pub node: String, // hex namehash of node - pub public_key: String, - pub ip: String, - pub port: u16, - pub routers: Vec, -} diff --git a/kinode/src/state.rs b/kinode/src/state.rs index f7f1992a2..372248291 100644 --- a/kinode/src/state.rs +++ b/kinode/src/state.rs @@ -18,7 +18,7 @@ pub async fn load_state( our_name: String, keypair: Arc, home_directory_path: String, - runtime_extensions: Vec<(ProcessId, MessageSender, bool)>, + runtime_extensions: Vec<(ProcessId, MessageSender, Option, bool)>, ) -> Result<(ProcessMap, DB, ReverseCapIndex), StateError> { let state_path = format!("{}/kernel", &home_directory_path); @@ -72,7 +72,7 @@ pub async fn load_state( &our_name, keypair, home_directory_path.clone(), - runtime_extensions.clone(), + runtime_extensions, &mut process_map, &mut reverse_cap_index, ) @@ -307,7 +307,7 @@ async fn bootstrap( our_name: &str, keypair: Arc, home_directory_path: String, - runtime_extensions: Vec<(ProcessId, MessageSender, bool)>, + runtime_extensions: Vec<(ProcessId, MessageSender, Option, bool)>, process_map: &mut ProcessMap, reverse_cap_index: &mut ReverseCapIndex, ) -> Result<()> { @@ -382,7 +382,7 @@ async fn bootstrap( wit_version: None, on_exit: OnExit::Restart, capabilities: runtime_caps.clone(), - public: runtime_module.2, + public: runtime_module.3, }); current.capabilities.extend(runtime_caps.clone()); } @@ -391,11 +391,11 @@ async fn bootstrap( for (package_metadata, mut package) in packages.clone() { let package_name = package_metadata.properties.package_name.as_str(); - // special case tester: only load it in if in simulation mode - if package_name == "tester" { - #[cfg(not(feature = "simulation-mode"))] - continue; - } + // // special case tester: only load it in if in simulation mode + // if package_name == "tester" { + // #[cfg(not(feature = "simulation-mode"))] + // continue; + // } println!("fs: handling package {package_name}...\r"); let package_publisher = package_metadata.properties.publisher.as_str(); diff --git a/lib/Cargo.toml b/lib/Cargo.toml index f9d264973..15c6995cd 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -14,8 +14,8 @@ license = "Apache-2.0" reqwest = { version = "0.11.22", features = ["blocking"] } [dependencies] -alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" } -alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" } +alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4" } +alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4" } lazy_static = "1.4.0" rand = "0.8.4" ring = "0.16.20" diff --git a/lib/src/core.rs b/lib/src/core.rs index bfbd8203a..b05a0f734 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -547,8 +547,8 @@ fn display_message(m: &Message, delimiter: &str) -> String { format!("expects_response: {:?},", request.expects_response), format!( "body: {},", - match serde_json::from_slice::(&request.body) { - Ok(json) => format!("{}", json), + match std::str::from_utf8(&request.body) { + Ok(str) => str.to_string(), Err(_) => format!("{:?}", request.body), } ), @@ -749,6 +749,10 @@ pub type DebugReceiver = tokio::sync::mpsc::Receiver; pub type CapMessageSender = tokio::sync::mpsc::Sender; pub type CapMessageReceiver = tokio::sync::mpsc::Receiver; +pub type ProcessMessageSender = tokio::sync::mpsc::Sender>; +pub type ProcessMessageReceiver = + tokio::sync::mpsc::Receiver>; + // // types used for onchain identity system // @@ -1522,3 +1526,39 @@ pub enum TimerAction { Debug, SetTimer(u64), } + +// +// networking protocol types +// + +/// Must be parsed from message pack vector. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetActions { + /// Received from a router of ours when they have a new pending passthrough for us. + /// We should respond (if we desire) by using them to initialize a routed connection + /// with the NodeId given. + ConnectionRequest(NodeId), + /// can only receive from trusted source, for now just ourselves locally, + /// in the future could get from remote provider + KnsUpdate(KnsUpdate), + KnsBatchUpdate(Vec), +} + +/// For now, only sent in response to a ConnectionRequest. +/// Must be parsed from message pack vector +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetResponses { + Accepted(NodeId), + Rejected(NodeId), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct KnsUpdate { + pub name: String, // actual username / domain name + pub owner: String, + pub node: String, // hex namehash of node + pub public_key: String, + pub ip: String, + pub port: u16, + pub routers: Vec, +} diff --git a/lib/src/eth.rs b/lib/src/eth.rs index e36564b96..a66900d86 100644 --- a/lib/src/eth.rs +++ b/lib/src/eth.rs @@ -1,15 +1,18 @@ use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult}; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; -/// The Action and Request type that can be made to eth:distro:sys. +/// The Action and Request type that can be made to eth:distro:sys. Any process with messaging +/// capabilities can send this action to the eth provider. /// /// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum EthAction { /// Subscribe to logs with a custom filter. ID is to be used to unsubscribe. /// Logs come in as alloy_rpc_types::pubsub::SubscriptionResults SubscribeLogs { sub_id: u64, + chain_id: u64, kind: SubscriptionKind, params: Params, }, @@ -17,22 +20,25 @@ pub enum EthAction { UnsubscribeLogs(u64), /// Raw request. Used by kinode_process_lib. Request { + chain_id: u64, method: String, params: serde_json::Value, }, } -/// Incoming Result type for subscription updates or errors that processes will receive. +/// Incoming `Request` containing subscription updates or errors that processes will receive. +/// Can deserialize all incoming requests from eth:distro:sys to this type. +/// +/// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`. pub type EthSubResult = Result; -/// Incoming Request type for subscription updates. +/// Incoming type for successful subscription updates. #[derive(Debug, Serialize, Deserialize)] pub struct EthSub { pub id: u64, pub result: SubscriptionResult, } -/// Incoming Request for subscription errors that processes will receive. /// If your subscription is closed unexpectedly, you will receive this. #[derive(Debug, Serialize, Deserialize)] pub struct EthSubError { @@ -41,8 +47,11 @@ pub struct EthSubError { } /// The Response type which a process will get from requesting with an [`EthAction`] will be -/// of the form `Result<(), EthError>`, serialized and deserialized using `serde_json::to_vec` +/// of this type, serialized and deserialized using `serde_json::to_vec` /// and `serde_json::from_slice`. +/// +/// In the case of an [`EthAction::SubscribeLogs`] request, the response will indicate if +/// the subscription was successfully created or not. #[derive(Debug, Serialize, Deserialize)] pub enum EthResponse { Ok, @@ -50,20 +59,107 @@ pub enum EthResponse { Err(EthError), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub enum EthError { - /// Underlying transport error - TransportError(String), + /// provider module cannot parse message + MalformedRequest, + /// No RPC provider for the chain + NoRpcForChain, /// Subscription closed SubscriptionClosed(u64), - /// The subscription ID was not found, so we couldn't unsubscribe. - SubscriptionNotFound, /// Invalid method InvalidMethod(String), + /// Invalid parameters + InvalidParams, /// Permission denied - PermissionDenied(String), - /// Internal RPC error - RpcError(String), + PermissionDenied, + /// RPC timed out + RpcTimeout, + /// RPC gave garbage back + RpcMalformedResponse, +} + +/// The action type used for configuring eth:distro:sys. Only processes which have the "root" +/// capability from eth:distro:sys can successfully send this action. +/// +/// NOTE: changes to config will not be persisted between boots, they must be saved in .env +/// to be reflected between boots. TODO: can change this +#[derive(Debug, Serialize, Deserialize)] +pub enum EthConfigAction { + /// Add a new provider to the list of providers. + AddProvider(ProviderConfig), + /// Remove a provider from the list of providers. + /// The tuple is (chain_id, node_id/rpc_url). + RemoveProvider((u64, String)), + /// make our provider public + SetPublic, + /// make our provider not-public + SetPrivate, + /// add node to whitelist on a provider + AllowNode(String), + /// remove node from whitelist on a provider + UnallowNode(String), + /// add node to blacklist on a provider + DenyNode(String), + /// remove node from blacklist on a provider + UndenyNode(String), + /// Set the list of providers to a new list. + /// Replaces all existing saved provider configs. + SetProviders(SavedConfigs), + /// Get the list of current providers as a [`SavedConfigs`] object. + GetProviders, + /// Get the current access settings. + GetAccessSettings, +} + +/// Response type from an [`EthConfigAction`] request. +#[derive(Debug, Serialize, Deserialize)] +pub enum EthConfigResponse { + Ok, + /// Response from a GetProviders request. + /// Note the [`crate::core::KnsUpdate`] will only have the correct `name` field. + /// The rest of the Update is not saved in this module. + Providers(SavedConfigs), + /// Response from a GetAccessSettings request. + AccessSettings(AccessSettings), + /// Permission denied due to missing capability + PermissionDenied, +} + +/// Settings for our ETH provider +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct AccessSettings { + pub public: bool, // whether or not other nodes can access through us + pub allow: HashSet, // whitelist for access (only used if public == false) + pub deny: HashSet, // blacklist for access (always used) +} + +pub type SavedConfigs = Vec; + +/// Provider config. Can currently be a node or a ws provider instance. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ProviderConfig { + pub chain_id: u64, + pub trusted: bool, + pub provider: NodeOrRpcUrl, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum NodeOrRpcUrl { + Node { + kns_update: crate::core::KnsUpdate, + use_as_provider: bool, // for routers inside saved config + }, + RpcUrl(String), +} + +impl std::cmp::PartialEq for NodeOrRpcUrl { + fn eq(&self, other: &str) -> bool { + match self { + NodeOrRpcUrl::Node { kns_update, .. } => kns_update.name == other, + NodeOrRpcUrl::RpcUrl(url) => url == other, + } + } } // @@ -102,8 +198,3 @@ pub fn to_static_str(method: &str) -> Option<&'static str> { _ => None, } } - -pub enum ProviderInput { - Ws(String), - Node(String), -}