From bffcc28e9166f3b29d189ddbdae15cfecf54f29b Mon Sep 17 00:00:00 2001 From: Pedro Carnerero Date: Tue, 7 Oct 2025 19:38:39 +0200 Subject: [PATCH 01/14] feat: support lido modules Added new types in `crates/common/src/types.rs` to model the different Lido modules we'll encounter depending on the Chain. Also added a `HashMap` that introduces a new dimension to the pseudo-map that previously always returned the registry address for the Lido curated module in `crates/common/src/config/mux.rs`, with the goal of being able to return the corresponding address based on the Chain and Lido Module id. To specify the required Lido module, a new optional field was added to `examples/configs/pbs_mux.toml` for lido loader, called `lido_module_id` which always defaults to 1. --- config.example.toml | 2 +- crates/common/src/config/mux.rs | 64 ++++++++++++++++++++++++++------- crates/common/src/types.rs | 22 +++++++++++- examples/configs/pbs_mux.toml | 2 +- 4 files changed, 75 insertions(+), 15 deletions(-) diff --git a/config.example.toml b/config.example.toml index ad0c5340..77c83311 100644 --- a/config.example.toml +++ b/config.example.toml @@ -135,7 +135,7 @@ validator_pubkeys = [ # OPTIONAL loader = "./tests/data/mux_keys.example.json" # loader = { url = "http://localhost:8000/keys" } -# loader = { registry = "lido", node_operator_id = 8 } +# loader = { registry = "lido", node_operator_id = 8, lido_module_id = 1 } # loader = { registry = "ssv", node_operator_id = 8 } late_in_slot_time_ms = 1500 timeout_get_header_ms = 900 diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 346eaf78..3aa3746e 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -22,7 +22,7 @@ use super::{MUX_PATH_ENV, PbsConfig, RelayConfig, load_optional_env_var}; use crate::{ config::{remove_duplicate_keys, safe_read_http_response}, pbs::RelayClient, - types::{BlsPublicKey, Chain}, + types::{BlsPublicKey, Chain, HoleskyModules, HoodiModules, MainnetModules}, }; #[derive(Debug, Deserialize, Serialize)] @@ -167,6 +167,8 @@ pub enum MuxKeysLoader { Registry { registry: NORegistry, node_operator_id: u64, + #[serde(default)] + lido_module_id: Option }, } @@ -210,7 +212,7 @@ impl MuxKeysLoader { .wrap_err("failed to fetch mux keys from HTTP endpoint") } - Self::Registry { registry, node_operator_id } => match registry { + Self::Registry { registry, node_operator_id, lido_module_id } => match registry { NORegistry::Lido => { let Some(rpc_url) = rpc_url else { bail!("Lido registry requires RPC URL to be set in the PBS config"); @@ -220,6 +222,7 @@ impl MuxKeysLoader { rpc_url, chain, U256::from(*node_operator_id), + *lido_module_id, http_timeout, ) .await @@ -257,21 +260,58 @@ sol! { "src/abi/LidoNORegistry.json" } -// Fetching Lido Curated Module -fn lido_registry_address(chain: Chain) -> eyre::Result
{ - match chain { - Chain::Mainnet => Ok(address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5")), - Chain::Holesky => Ok(address!("595F64Ddc3856a3b5Ff4f4CC1d1fb4B46cFd2bAC")), - Chain::Hoodi => Ok(address!("5cDbE1590c083b5A2A64427fAA63A7cfDB91FbB5")), - Chain::Sepolia => Ok(address!("33d6E15047E8644F8DDf5CD05d202dfE587DA6E3")), - _ => bail!("Lido registry not supported for chain: {chain:?}"), - } +fn lido_registry_addresses_by_module() -> HashMap> { + let mut map: HashMap> = HashMap::new(); + + // --- Mainnet --- + let mut mainnet = HashMap::new(); + mainnet.insert(MainnetModules::Curated as u8, address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5")); + mainnet.insert(MainnetModules::SimpleDVT as u8, address!("aE7B191A31f627b4eB1d4DaC64eaB9976995b433")); + mainnet.insert(MainnetModules::CommunityStaking as u8, address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F")); + map.insert(Chain::Mainnet, mainnet); + + // --- Holesky --- + let mut holesky = HashMap::new(); + holesky.insert(HoleskyModules::Curated as u8, address!("595F64Ddc3856a3b5Ff4f4CC1d1fb4B46cFd2bAC")); + holesky.insert(HoleskyModules::SimpleDVT as u8, address!("11a93807078f8BB880c1BD0ee4C387537de4b4b6")); + holesky.insert(HoleskyModules::Sandbox as u8, address!("D6C2ce3BB8bea2832496Ac8b5144819719f343AC")); + holesky.insert(HoleskyModules::CommunityStaking as u8, address!("4562c3e63c2e586cD1651B958C22F88135aCAd4f")); + map.insert(Chain::Holesky, holesky); + + // --- Hoodi --- + let mut hoodi = HashMap::new(); + hoodi.insert(HoodiModules::Curated as u8, address!("5cDbE1590c083b5A2A64427fAA63A7cfDB91FbB5")); + hoodi.insert(HoodiModules::SimpleDVT as u8, address!("0B5236BECA68004DB89434462DfC3BB074d2c830")); + hoodi.insert(HoodiModules::Sandbox as u8, address!("682E94d2630846a503BDeE8b6810DF71C9806891")); + hoodi.insert(HoodiModules::CommunityStaking as u8, address!("79CEf36D84743222f37765204Bec41E92a93E59d")); + map.insert(Chain::Hoodi, hoodi); + + // --- Sepolia -- + let mut sepolia = HashMap::new(); + sepolia.insert(1, address!("33d6E15047E8644F8DDf5CD05d202dfE587DA6E3")); + map.insert(Chain::Sepolia, sepolia); + + map +} + +// Fetching appropiate registry address +fn lido_registry_address(chain: Chain, maybe_module: Option) -> eyre::Result
{ + lido_registry_addresses_by_module() + .get(&chain) + .ok_or_else(|| eyre::eyre!("Lido registry not supported for chain: {chain:?}"))? + .get(&maybe_module.unwrap_or(1)) + .copied() + .ok_or_else(|| eyre::eyre!( + "Lido module id {:?} not found for chain: {chain:?}", + maybe_module.unwrap_or(1) + )) } async fn fetch_lido_registry_keys( rpc_url: Url, chain: Chain, node_operator_id: U256, + lido_module_id: Option, http_timeout: Duration, ) -> eyre::Result> { debug!(?chain, %node_operator_id, "loading operator keys from Lido registry"); @@ -283,7 +323,7 @@ async fn fetch_lido_registry_keys( let rpc_client = RpcClient::new(http, is_local); let provider = ProviderBuilder::new().connect_client(rpc_client); - let registry_address = lido_registry_address(chain)?; + let registry_address = lido_registry_address(chain, lido_module_id)?; let registry = LidoRegistry::new(registry_address, provider); let total_keys = registry.getTotalSigningKeyCount(node_operator_id).call().await?.try_into()?; diff --git a/crates/common/src/types.rs b/crates/common/src/types.rs index 077b4ccd..ef98ef22 100644 --- a/crates/common/src/types.rs +++ b/crates/common/src/types.rs @@ -29,7 +29,7 @@ pub struct JwtClaims { pub module: String, } -#[derive(Clone, Copy, PartialEq, Eq)] +#[derive(Clone, Copy, PartialEq, Eq, Hash)] pub enum Chain { Mainnet, Holesky, @@ -44,6 +44,26 @@ pub enum Chain { }, } +pub enum MainnetModules { + Curated = 1, + SimpleDVT = 2, + CommunityStaking = 3 +} + +pub enum HoleskyModules { + Curated = 1, + SimpleDVT = 2, + Sandbox = 3, + CommunityStaking = 4 +} + +pub enum HoodiModules { + Curated = 1, + SimpleDVT = 2, + Sandbox = 3, + CommunityStaking = 4 +} + pub type ForkVersion = [u8; 4]; impl std::fmt::Display for Chain { diff --git a/examples/configs/pbs_mux.toml b/examples/configs/pbs_mux.toml index 3ea9f355..fcf4ea8c 100644 --- a/examples/configs/pbs_mux.toml +++ b/examples/configs/pbs_mux.toml @@ -33,7 +33,7 @@ target_first_request_ms = 200 [[mux]] id = "lido-mux" -loader = { registry = "lido", node_operator_id = 8 } +loader = { registry = "lido", node_operator_id = 8, lido_module_id = 1 } [[mux.relays]] id = "relay-3" From abbd88cac97902d2d809b75ce5f56e16b356b46c Mon Sep 17 00:00:00 2001 From: Pedro Carnerero Date: Wed, 8 Oct 2025 09:54:06 +0200 Subject: [PATCH 02/14] refactor: rename lido module enums to singular and more descriptive name --- crates/common/src/config/mux.rs | 24 ++++++++++++------------ crates/common/src/types.rs | 6 +++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 3aa3746e..6c629959 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -22,7 +22,7 @@ use super::{MUX_PATH_ENV, PbsConfig, RelayConfig, load_optional_env_var}; use crate::{ config::{remove_duplicate_keys, safe_read_http_response}, pbs::RelayClient, - types::{BlsPublicKey, Chain, HoleskyModules, HoodiModules, MainnetModules}, + types::{BlsPublicKey, Chain, HoleskyLidoModule, HoodiLidoModule, MainnetLidoModule}, }; #[derive(Debug, Deserialize, Serialize)] @@ -265,25 +265,25 @@ fn lido_registry_addresses_by_module() -> HashMap> { // --- Mainnet --- let mut mainnet = HashMap::new(); - mainnet.insert(MainnetModules::Curated as u8, address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5")); - mainnet.insert(MainnetModules::SimpleDVT as u8, address!("aE7B191A31f627b4eB1d4DaC64eaB9976995b433")); - mainnet.insert(MainnetModules::CommunityStaking as u8, address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F")); + mainnet.insert(MainnetLidoModule::Curated as u8, address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5")); + mainnet.insert(MainnetLidoModule::SimpleDVT as u8, address!("aE7B191A31f627b4eB1d4DaC64eaB9976995b433")); + mainnet.insert(MainnetLidoModule::CommunityStaking as u8, address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F")); map.insert(Chain::Mainnet, mainnet); // --- Holesky --- let mut holesky = HashMap::new(); - holesky.insert(HoleskyModules::Curated as u8, address!("595F64Ddc3856a3b5Ff4f4CC1d1fb4B46cFd2bAC")); - holesky.insert(HoleskyModules::SimpleDVT as u8, address!("11a93807078f8BB880c1BD0ee4C387537de4b4b6")); - holesky.insert(HoleskyModules::Sandbox as u8, address!("D6C2ce3BB8bea2832496Ac8b5144819719f343AC")); - holesky.insert(HoleskyModules::CommunityStaking as u8, address!("4562c3e63c2e586cD1651B958C22F88135aCAd4f")); + holesky.insert(HoleskyLidoModule::Curated as u8, address!("595F64Ddc3856a3b5Ff4f4CC1d1fb4B46cFd2bAC")); + holesky.insert(HoleskyLidoModule::SimpleDVT as u8, address!("11a93807078f8BB880c1BD0ee4C387537de4b4b6")); + holesky.insert(HoleskyLidoModule::Sandbox as u8, address!("D6C2ce3BB8bea2832496Ac8b5144819719f343AC")); + holesky.insert(HoleskyLidoModule::CommunityStaking as u8, address!("4562c3e63c2e586cD1651B958C22F88135aCAd4f")); map.insert(Chain::Holesky, holesky); // --- Hoodi --- let mut hoodi = HashMap::new(); - hoodi.insert(HoodiModules::Curated as u8, address!("5cDbE1590c083b5A2A64427fAA63A7cfDB91FbB5")); - hoodi.insert(HoodiModules::SimpleDVT as u8, address!("0B5236BECA68004DB89434462DfC3BB074d2c830")); - hoodi.insert(HoodiModules::Sandbox as u8, address!("682E94d2630846a503BDeE8b6810DF71C9806891")); - hoodi.insert(HoodiModules::CommunityStaking as u8, address!("79CEf36D84743222f37765204Bec41E92a93E59d")); + hoodi.insert(HoodiLidoModule::Curated as u8, address!("5cDbE1590c083b5A2A64427fAA63A7cfDB91FbB5")); + hoodi.insert(HoodiLidoModule::SimpleDVT as u8, address!("0B5236BECA68004DB89434462DfC3BB074d2c830")); + hoodi.insert(HoodiLidoModule::Sandbox as u8, address!("682E94d2630846a503BDeE8b6810DF71C9806891")); + hoodi.insert(HoodiLidoModule::CommunityStaking as u8, address!("79CEf36D84743222f37765204Bec41E92a93E59d")); map.insert(Chain::Hoodi, hoodi); // --- Sepolia -- diff --git a/crates/common/src/types.rs b/crates/common/src/types.rs index ef98ef22..e5eb593c 100644 --- a/crates/common/src/types.rs +++ b/crates/common/src/types.rs @@ -44,20 +44,20 @@ pub enum Chain { }, } -pub enum MainnetModules { +pub enum MainnetLidoModule { Curated = 1, SimpleDVT = 2, CommunityStaking = 3 } -pub enum HoleskyModules { +pub enum HoleskyLidoModule { Curated = 1, SimpleDVT = 2, Sandbox = 3, CommunityStaking = 4 } -pub enum HoodiModules { +pub enum HoodiLidoModule { Curated = 1, SimpleDVT = 2, Sandbox = 3, From edc7debc57466bf81063079b9acb66f588cffa5a Mon Sep 17 00:00:00 2001 From: Pedro Carnerero Date: Wed, 8 Oct 2025 12:02:59 +0200 Subject: [PATCH 03/14] chore: include lido_module_id in debug log --- crates/common/src/config/mux.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 6c629959..3de4ed0b 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -314,7 +314,7 @@ async fn fetch_lido_registry_keys( lido_module_id: Option, http_timeout: Duration, ) -> eyre::Result> { - debug!(?chain, %node_operator_id, "loading operator keys from Lido registry"); + debug!(?chain, %node_operator_id, ?lido_module_id, "loading operator keys from Lido registry"); // Create an RPC provider with HTTP timeout support let client = Client::builder().timeout(http_timeout).build()?; From 1d2dcb1035ac34575d7cddbd1d1227a999c0c75b Mon Sep 17 00:00:00 2001 From: Pedro Carnerero Date: Wed, 8 Oct 2025 17:22:01 +0200 Subject: [PATCH 04/14] refactor: make lido_module_id argument mandatory --- crates/common/src/config/mux.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 3de4ed0b..de3a4d6d 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -222,7 +222,7 @@ impl MuxKeysLoader { rpc_url, chain, U256::from(*node_operator_id), - *lido_module_id, + lido_module_id.unwrap_or(1), http_timeout, ) .await @@ -295,15 +295,15 @@ fn lido_registry_addresses_by_module() -> HashMap> { } // Fetching appropiate registry address -fn lido_registry_address(chain: Chain, maybe_module: Option) -> eyre::Result
{ +fn lido_registry_address(chain: Chain, lido_module_id: u8) -> eyre::Result
{ lido_registry_addresses_by_module() .get(&chain) .ok_or_else(|| eyre::eyre!("Lido registry not supported for chain: {chain:?}"))? - .get(&maybe_module.unwrap_or(1)) + .get(&lido_module_id) .copied() .ok_or_else(|| eyre::eyre!( "Lido module id {:?} not found for chain: {chain:?}", - maybe_module.unwrap_or(1) + lido_module_id )) } @@ -311,7 +311,7 @@ async fn fetch_lido_registry_keys( rpc_url: Url, chain: Chain, node_operator_id: U256, - lido_module_id: Option, + lido_module_id: u8, http_timeout: Duration, ) -> eyre::Result> { debug!(?chain, %node_operator_id, ?lido_module_id, "loading operator keys from Lido registry"); From 74e32d1e7e41a631994bfb029e93e17e24138efa Mon Sep 17 00:00:00 2001 From: Pedro Carnerero Date: Wed, 8 Oct 2025 20:06:00 +0200 Subject: [PATCH 05/14] feat: support lido community staking module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added support for Lido's Community Staking Module (CSM), since the deployed contract has a different ABI, so the same registry used for the other modules cannot be used. Because this difference in ABIs, the CSM contract ABI has been imported, containing the required functions. One key difference is that in the CSM contract, `getTotalSigningKeyCount´ does not exist; therefore, for CSM this call is replaced with `getNodeOperatorSummary´, which returns a tuple of several values, including `totalDepositedValidators´ and `depositableValidatorsCount´, whose sum is used to compute the total number of keys. Another difference between ABIs is the `getSigningKeys´ method, which in CSM returns directly a `Bytes´ type. The logic for obtaining `total_keys´ and the iteration to fetch the keys has been extracted into smaller functions since this new case for CSM encourages code duplication; therefore, the main function `fetch_lido_registry_keys´ clearly separate both cases. --- .../src/abi/LidoCSModuleNORegistry.json | 37 +++ crates/common/src/config/mux.rs | 235 ++++++++++++++++-- 2 files changed, 247 insertions(+), 25 deletions(-) create mode 100644 crates/common/src/abi/LidoCSModuleNORegistry.json diff --git a/crates/common/src/abi/LidoCSModuleNORegistry.json b/crates/common/src/abi/LidoCSModuleNORegistry.json new file mode 100644 index 00000000..a0b98aab --- /dev/null +++ b/crates/common/src/abi/LidoCSModuleNORegistry.json @@ -0,0 +1,37 @@ +[ + { + "constant": true, + "inputs": [ + { "name": "nodeOperatorId", "type": "uint256" } + ], + "name": "getNodeOperatorSummary", + "outputs": [ + { "name": "targetLimitMode", "type": "uint256" }, + { "name": "targetValidatorsCount", "type": "uint256" }, + { "name": "stuckValidatorsCount", "type": "uint256" }, + { "name": "refundedValidatorsCount", "type": "uint256" }, + { "name": "stuckPenaltyEndTimestamp", "type": "uint256" }, + { "name": "totalExitedValidators", "type": "uint256" }, + { "name": "totalDepositedValidators", "type": "uint256" }, + { "name": "depositableValidatorsCount", "type": "uint256" } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [ + { "name": "nodeOperatorId", "type": "uint256" }, + { "name": "startIndex", "type": "uint256" }, + { "name": "keysCount", "type": "uint256" } + ], + "name": "getSigningKeys", + "outputs": [ + { "name": "", "type": "bytes" } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + } +] diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index de3a4d6d..89b951c0 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -6,17 +6,18 @@ use std::{ }; use alloy::{ - primitives::{Address, U256, address}, + primitives::{address, Address, Bytes, U256}, providers::ProviderBuilder, rpc::{client::RpcClient, types::beacon::constants::BLS_PUBLIC_KEY_BYTES_LEN}, sol, transports::http::Http, }; -use eyre::{Context, bail, ensure}; +use eyre::{bail, ensure, Context}; use reqwest::Client; use serde::{Deserialize, Deserializer, Serialize}; use tracing::{debug, info, warn}; use url::Url; +use LidoCSMRegistry::getNodeOperatorSummaryReturn; use super::{MUX_PATH_ENV, PbsConfig, RelayConfig, load_optional_env_var}; use crate::{ @@ -260,6 +261,13 @@ sol! { "src/abi/LidoNORegistry.json" } +sol! { + #[allow(missing_docs)] + #[sol(rpc)] + LidoCSMRegistry, + "src/abi/LidoCSModuleNORegistry.json" +} + fn lido_registry_addresses_by_module() -> HashMap> { let mut map: HashMap> = HashMap::new(); @@ -307,46 +315,128 @@ fn lido_registry_address(chain: Chain, lido_module_id: u8) -> eyre::Result bool { + match chain { + Chain::Mainnet => module_id == MainnetLidoModule::CommunityStaking as u8, + Chain::Holesky => module_id == HoleskyLidoModule::CommunityStaking as u8, + Chain::Hoodi => module_id == HoodiLidoModule::CommunityStaking as u8, + _ => false, + } +} + +fn get_lido_csm_registry

( + registry_address: Address, + provider: P, +) -> LidoCSMRegistry::LidoCSMRegistryInstance

+where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + LidoCSMRegistry::new(registry_address, provider) +} + +fn get_lido_module_registry

( + registry_address: Address, + provider: P, +) -> LidoRegistry::LidoRegistryInstance

+where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + LidoRegistry::new(registry_address, provider) +} + +async fn fetch_lido_csm_keys_total

( + registry: &LidoCSMRegistry::LidoCSMRegistryInstance

, node_operator_id: U256, - lido_module_id: u8, - http_timeout: Duration, -) -> eyre::Result> { - debug!(?chain, %node_operator_id, ?lido_module_id, "loading operator keys from Lido registry"); +) -> eyre::Result +where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + let summary: getNodeOperatorSummaryReturn = registry + .getNodeOperatorSummary(node_operator_id) + .call() + .await?; - // Create an RPC provider with HTTP timeout support - let client = Client::builder().timeout(http_timeout).build()?; - let http = Http::with_client(client, rpc_url); - let is_local = http.guess_local(); - let rpc_client = RpcClient::new(http, is_local); - let provider = ProviderBuilder::new().connect_client(rpc_client); + let total_u256 = summary.totalDepositedValidators + summary.depositableValidatorsCount; - let registry_address = lido_registry_address(chain, lido_module_id)?; - let registry = LidoRegistry::new(registry_address, provider); + let total_u64 = u64::try_from(total_u256) + .wrap_err_with(|| format!("total keys ({total_u256}) does not fit into u64"))?; + + Ok(total_u64) +} + +async fn fetch_lido_module_keys_total

( + registry: &LidoRegistry::LidoRegistryInstance

, + node_operator_id: U256, +) -> eyre::Result +where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + let total_keys: u64 = registry + .getTotalSigningKeyCount(node_operator_id) + .call() + .await? + .try_into()?; + + Ok(total_keys) +} + +async fn fetch_lido_csm_keys_batch

( + registry: &LidoCSMRegistry::LidoCSMRegistryInstance

, + node_operator_id: U256, + offset: u64, + limit: u64 +) -> eyre::Result +where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + let pubkeys = registry + .getSigningKeys(node_operator_id, U256::from(offset), U256::from(limit)) + .call() + .await?; + + Ok(pubkeys) +} - let total_keys = registry.getTotalSigningKeyCount(node_operator_id).call().await?.try_into()?; +async fn fetch_lido_module_keys_batch

( + registry: &LidoRegistry::LidoRegistryInstance

, + node_operator_id: U256, + offset: u64, + limit: u64 +) -> eyre::Result +where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + let pubkeys = registry + .getSigningKeys(node_operator_id, U256::from(offset), U256::from(limit)) + .call() + .await? + .pubkeys; + + Ok(pubkeys) +} +async fn collect_registry_keys( + total_keys: u64, + mut fetch_batch: F, +) -> eyre::Result> +where + F: FnMut(u64, u64) -> Fut, + Fut: std::future::Future>, +{ if total_keys == 0 { return Ok(Vec::new()); } - debug!("fetching {total_keys} total keys"); const CALL_BATCH_SIZE: u64 = 250u64; let mut keys = vec![]; - let mut offset = 0; + let mut offset: u64 = 0; while offset < total_keys { let limit = CALL_BATCH_SIZE.min(total_keys - offset); - let pubkeys = registry - .getSigningKeys(node_operator_id, U256::from(offset), U256::from(limit)) - .call() - .await? - .pubkeys; + let pubkeys = fetch_batch(offset, limit).await?; ensure!( pubkeys.len() % BLS_PUBLIC_KEY_BYTES_LEN == 0, @@ -373,6 +463,58 @@ async fn fetch_lido_registry_keys( Ok(keys) } +async fn fetch_lido_csm_registry_keys ( + registry_address: Address, + rpc_client: RpcClient, + node_operator_id: U256, +) -> eyre::Result> { + let provider = ProviderBuilder::new().connect_client(rpc_client); + let registry = get_lido_csm_registry(registry_address, provider); + + let total_keys = fetch_lido_csm_keys_total(®istry, node_operator_id).await?.try_into()?; + + collect_registry_keys(total_keys, |offset, limit| { + fetch_lido_csm_keys_batch(®istry, node_operator_id, offset, limit) + }).await +} + +async fn fetch_lido_module_registry_keys ( + registry_address: Address, + rpc_client: RpcClient, + node_operator_id: U256, +) -> eyre::Result> { + let provider = ProviderBuilder::new().connect_client(rpc_client); + let registry = get_lido_module_registry(registry_address, provider); + let total_keys: u64 = fetch_lido_module_keys_total(®istry, node_operator_id).await?.try_into()?; + + collect_registry_keys(total_keys, |offset, limit| { + fetch_lido_module_keys_batch(®istry, node_operator_id, offset, limit) + }).await +} + +async fn fetch_lido_registry_keys( + rpc_url: Url, + chain: Chain, + node_operator_id: U256, + lido_module_id: u8, + http_timeout: Duration, +) -> eyre::Result> { + debug!(?chain, %node_operator_id, ?lido_module_id, "loading operator keys from Lido registry"); + + // Create an RPC provider with HTTP timeout support + let client = Client::builder().timeout(http_timeout).build()?; + let http = Http::with_client(client, rpc_url); + let is_local = http.guess_local(); + let rpc_client = RpcClient::new(http, is_local); + let registry_address = lido_registry_address(chain, lido_module_id)?; + + if is_csm_module(chain, lido_module_id) { + fetch_lido_csm_registry_keys(registry_address, rpc_client, node_operator_id).await + } else { + fetch_lido_module_registry_keys(registry_address, rpc_client, node_operator_id).await + } +} + async fn fetch_ssv_pubkeys( chain: Chain, node_operator_id: U256, @@ -520,6 +662,49 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_lido_csm_registry_address() -> eyre::Result<()> { + use alloy::{primitives::U256, providers::ProviderBuilder}; + + let url = Url::parse("https://ethereum-rpc.publicnode.com")?; + let provider = ProviderBuilder::new().connect_http(url); + + let registry = LidoCSMRegistry::new( + address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), + provider, + ); + + const LIMIT: usize = 3; + let node_operator_id = U256::from(1); + + let summary = registry + .getNodeOperatorSummary(node_operator_id) + .call() + .await?; + + let total_keys_u256 = summary.totalDepositedValidators + summary.depositableValidatorsCount; + let total_keys: u64 = total_keys_u256.try_into()?; + + assert!(total_keys > LIMIT as u64, "expected more than {LIMIT} keys, got {total_keys}"); + + let pubkeys = registry + .getSigningKeys(node_operator_id, U256::ZERO, U256::from(LIMIT)) + .call() + .await?; + + let mut vec = Vec::new(); + for chunk in pubkeys.chunks(BLS_PUBLIC_KEY_BYTES_LEN) { + vec.push( + BlsPublicKey::deserialize(chunk) + .map_err(|_| eyre::eyre!("invalid BLS public key"))?, + ); + } + + assert_eq!(vec.len(), LIMIT, "expected {LIMIT} keys, got {}", vec.len()); + + Ok(()) + } + #[tokio::test] /// Tests that a successful SSV network fetch is handled and parsed properly async fn test_ssv_network_fetch() -> eyre::Result<()> { From 0ad55ac9059ee4aa327f138c90475327aa2a1acb Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 21 Oct 2025 10:57:24 -0400 Subject: [PATCH 06/14] Add automatic registry mux refreshing (#384) Co-authored-by: ltitanb <163874448+ltitanb@users.noreply.github.com> --- Cargo.lock | 28 +- config.example.toml | 19 +- crates/common/Cargo.toml | 3 + crates/common/src/config/mux.rs | 488 ++++++-------------- crates/common/src/config/pbs.rs | 86 ++-- crates/common/src/interop/mod.rs | 1 + crates/common/src/interop/ssv/mod.rs | 2 + crates/common/src/interop/ssv/types.rs | 61 +++ crates/common/src/interop/ssv/utils.rs | 24 + crates/common/src/lib.rs | 1 + crates/common/src/pbs/constants.rs | 2 + crates/common/src/utils.rs | 15 +- crates/pbs/src/routes/get_header.rs | 2 +- crates/pbs/src/routes/register_validator.rs | 2 +- crates/pbs/src/routes/reload.rs | 2 +- crates/pbs/src/routes/status.rs | 2 +- crates/pbs/src/routes/submit_block.rs | 2 +- crates/pbs/src/service.rs | 147 +++++- crates/pbs/src/state.rs | 2 +- tests/Cargo.toml | 4 + tests/src/lib.rs | 1 + tests/src/mock_ssv.rs | 108 +++++ tests/src/utils.rs | 12 +- tests/tests/pbs_mux.rs | 143 +++++- tests/tests/pbs_mux_refresh.rs | 175 +++++++ 25 files changed, 941 insertions(+), 391 deletions(-) create mode 100644 crates/common/src/interop/mod.rs create mode 100644 crates/common/src/interop/ssv/mod.rs create mode 100644 crates/common/src/interop/ssv/types.rs create mode 100644 crates/common/src/interop/ssv/utils.rs create mode 100644 tests/src/mock_ssv.rs create mode 100644 tests/tests/pbs_mux_refresh.rs diff --git a/Cargo.lock b/Cargo.lock index e3fc4fa8..906a8507 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1732,6 +1732,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "tracing-test", "tree_hash", "types", "url", @@ -2747,7 +2748,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.0", ] [[package]] @@ -5451,7 +5452,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.0", ] [[package]] @@ -6271,7 +6272,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.0", ] [[package]] @@ -6809,6 +6810,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.106", +] + [[package]] name = "tree_hash" version = "0.10.0" diff --git a/config.example.toml b/config.example.toml index 77c83311..6ea6a1b6 100644 --- a/config.example.toml +++ b/config.example.toml @@ -55,6 +55,9 @@ extra_validation_enabled = false # Execution Layer RPC url to use for extra validation # OPTIONAL # rpc_url = "https://ethereum-holesky-rpc.publicnode.com" +# URL of the SSV API server to use, if you have a mux that targets an SSV node operator +# OPTIONAL, DEFAULT: "https://api.ssv.network/api/v4" +# ssv_api_url = "https://api.ssv.network/api/v4" # Timeout for any HTTP requests sent from the PBS module to other services, in seconds # OPTIONAL, DEFAULT: 10 http_timeout_seconds = 10 @@ -64,6 +67,13 @@ register_validator_retry_limit = 3 # Maximum number of validators to register in a single request. # OPTIONAL, DEFAULT: "" (unlimited) validator_registration_batch_size = "" +# For any Registry-based Mux configurations that have dynamic pubkey +# refreshing enabled, this is how often to refresh the list of pubkeys +# from the registry, in seconds. Enabling registry refreshing is done per-mux +# with the mux's `enable_refreshing` property. If none of the muxes have it +# enabled, this value will not be used. +# OPTIONAL, DEFAULT: 384 +mux_registry_refresh_interval_seconds = 384 # The PBS module needs one or more [[relays]] as defined below. [[relays]] @@ -126,6 +136,11 @@ validator_pubkeys = [ # - Registry: details of a registry to load keys from. Supported registries: # - Lido: NodeOperatorsRegistry # - SSV: SSV API +# You can toggle the 'enable_refreshing' flag to let this registry periodically query Lido or SSV and refresh the list of validator pubkeys belonging to the corresponding operator. +# Each of these registry entries must be unique: +# - There can only be one Lido entry with a given Lido node operator ID. +# - There can only be one SSV entry with a given SSV node operator ID. +# - A Lido entry can have the same node operator ID as an SSV entry if they happen to coincide; they're treated as separate entities. # # Example JSON list: # [ @@ -135,8 +150,8 @@ validator_pubkeys = [ # OPTIONAL loader = "./tests/data/mux_keys.example.json" # loader = { url = "http://localhost:8000/keys" } -# loader = { registry = "lido", node_operator_id = 8, lido_module_id = 1 } -# loader = { registry = "ssv", node_operator_id = 8 } +# loader = { registry = "lido", node_operator_id = 8, lido_module_id = 1, enable_refreshing = false } +# loader = { registry = "ssv", node_operator_id = 8, enable_refreshing = false } late_in_slot_time_ms = 1500 timeout_get_header_ms = 900 # For each mux, one or more [[mux.relays]] can be defined, which will be used for the matching validator pubkeys diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index ea5c0199..57a5fbb3 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -5,6 +5,9 @@ publish = false rust-version.workspace = true version.workspace = true +[features] +testing-flags = [] + [dependencies] aes.workspace = true alloy.workspace = true diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 89b951c0..86318174 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -1,32 +1,35 @@ use std::{ collections::{HashMap, HashSet}, + hash::Hash, path::{Path, PathBuf}, sync::Arc, time::Duration, }; +use LidoCSMRegistry::getNodeOperatorSummaryReturn; use alloy::{ - primitives::{address, Address, Bytes, U256}, + primitives::{Address, Bytes, U256, address}, providers::ProviderBuilder, rpc::{client::RpcClient, types::beacon::constants::BLS_PUBLIC_KEY_BYTES_LEN}, sol, transports::http::Http, }; -use eyre::{bail, ensure, Context}; +use eyre::{Context, bail, ensure}; use reqwest::Client; -use serde::{Deserialize, Deserializer, Serialize}; +use serde::{Deserialize, Serialize}; use tracing::{debug, info, warn}; use url::Url; -use LidoCSMRegistry::getNodeOperatorSummaryReturn; use super::{MUX_PATH_ENV, PbsConfig, RelayConfig, load_optional_env_var}; use crate::{ config::{remove_duplicate_keys, safe_read_http_response}, + interop::ssv::utils::fetch_ssv_pubkeys_from_url, pbs::RelayClient, types::{BlsPublicKey, Chain, HoleskyLidoModule, HoodiLidoModule, MainnetLidoModule}, + utils::default_bool, }; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct PbsMuxes { /// List of PBS multiplexers #[serde(rename = "mux")] @@ -45,7 +48,10 @@ impl PbsMuxes { self, chain: Chain, default_pbs: &PbsConfig, - ) -> eyre::Result> { + ) -> eyre::Result<( + HashMap, + HashMap, + )> { let http_timeout = Duration::from_secs(default_pbs.http_timeout_seconds); let mut muxes = self.muxes; @@ -54,8 +60,15 @@ impl PbsMuxes { ensure!(!mux.relays.is_empty(), "mux config {} must have at least one relay", mux.id); if let Some(loader) = &mux.loader { - let extra_keys = - loader.load(&mux.id, chain, default_pbs.rpc_url.clone(), http_timeout).await?; + let extra_keys = loader + .load( + &mux.id, + chain, + default_pbs.ssv_api_url.clone(), + default_pbs.rpc_url.clone(), + http_timeout, + ) + .await?; mux.validator_pubkeys.extend(extra_keys); } @@ -77,6 +90,7 @@ impl PbsMuxes { } let mut configs = HashMap::new(); + let mut registry_muxes = HashMap::new(); // fill the configs using the default pbs config and relay entries for mux in muxes { info!( @@ -103,18 +117,30 @@ impl PbsMuxes { config.validate(chain).await?; let config = Arc::new(config); + // Build the map of pubkeys to mux configs let runtime_config = RuntimeMuxConfig { id: mux.id, config, relays: relay_clients }; for pubkey in mux.validator_pubkeys.into_iter() { configs.insert(pubkey, runtime_config.clone()); } + + // Track registry muxes with refreshing enabled + if let Some(loader) = &mux.loader && + let MuxKeysLoader::Registry { enable_refreshing: true, .. } = loader + { + info!( + "mux {} uses registry loader with dynamic refreshing enabled", + runtime_config.id + ); + registry_muxes.insert(loader.clone(), runtime_config.clone()); + } } - Ok(configs) + Ok((configs, registry_muxes)) } } /// Configuration for the PBS Multiplexer -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct MuxConfig { /// Identifier for this mux config pub id: String, @@ -157,7 +183,7 @@ impl MuxConfig { } } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Hash)] #[serde(untagged)] pub enum MuxKeysLoader { /// A file containing a list of validator pubkeys @@ -169,11 +195,13 @@ pub enum MuxKeysLoader { registry: NORegistry, node_operator_id: u64, #[serde(default)] - lido_module_id: Option + lido_module_id: Option, + #[serde(default = "default_bool::")] + enable_refreshing: bool, }, } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Hash)] pub enum NORegistry { #[serde(alias = "lido")] Lido, @@ -186,6 +214,7 @@ impl MuxKeysLoader { &self, mux_id: &str, chain: Chain, + ssv_api_url: Url, rpc_url: Option, http_timeout: Duration, ) -> eyre::Result> { @@ -213,25 +242,33 @@ impl MuxKeysLoader { .wrap_err("failed to fetch mux keys from HTTP endpoint") } - Self::Registry { registry, node_operator_id, lido_module_id } => match registry { - NORegistry::Lido => { - let Some(rpc_url) = rpc_url else { - bail!("Lido registry requires RPC URL to be set in the PBS config"); - }; - - fetch_lido_registry_keys( - rpc_url, - chain, - U256::from(*node_operator_id), - lido_module_id.unwrap_or(1), - http_timeout, - ) - .await + Self::Registry { registry, node_operator_id, lido_module_id, enable_refreshing: _ } => { + match registry { + NORegistry::Lido => { + let Some(rpc_url) = rpc_url else { + bail!("Lido registry requires RPC URL to be set in the PBS config"); + }; + + fetch_lido_registry_keys( + rpc_url, + chain, + U256::from(*node_operator_id), + lido_module_id.unwrap_or(1), + http_timeout, + ) + .await + } + NORegistry::SSV => { + fetch_ssv_pubkeys( + ssv_api_url, + chain, + U256::from(*node_operator_id), + http_timeout, + ) + .await + } } - NORegistry::SSV => { - fetch_ssv_pubkeys(chain, U256::from(*node_operator_id), http_timeout).await - } - }, + } }?; // Remove duplicates @@ -273,25 +310,58 @@ fn lido_registry_addresses_by_module() -> HashMap> { // --- Mainnet --- let mut mainnet = HashMap::new(); - mainnet.insert(MainnetLidoModule::Curated as u8, address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5")); - mainnet.insert(MainnetLidoModule::SimpleDVT as u8, address!("aE7B191A31f627b4eB1d4DaC64eaB9976995b433")); - mainnet.insert(MainnetLidoModule::CommunityStaking as u8, address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F")); + mainnet.insert( + MainnetLidoModule::Curated as u8, + address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5"), + ); + mainnet.insert( + MainnetLidoModule::SimpleDVT as u8, + address!("aE7B191A31f627b4eB1d4DaC64eaB9976995b433"), + ); + mainnet.insert( + MainnetLidoModule::CommunityStaking as u8, + address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), + ); map.insert(Chain::Mainnet, mainnet); // --- Holesky --- let mut holesky = HashMap::new(); - holesky.insert(HoleskyLidoModule::Curated as u8, address!("595F64Ddc3856a3b5Ff4f4CC1d1fb4B46cFd2bAC")); - holesky.insert(HoleskyLidoModule::SimpleDVT as u8, address!("11a93807078f8BB880c1BD0ee4C387537de4b4b6")); - holesky.insert(HoleskyLidoModule::Sandbox as u8, address!("D6C2ce3BB8bea2832496Ac8b5144819719f343AC")); - holesky.insert(HoleskyLidoModule::CommunityStaking as u8, address!("4562c3e63c2e586cD1651B958C22F88135aCAd4f")); + holesky.insert( + HoleskyLidoModule::Curated as u8, + address!("595F64Ddc3856a3b5Ff4f4CC1d1fb4B46cFd2bAC"), + ); + holesky.insert( + HoleskyLidoModule::SimpleDVT as u8, + address!("11a93807078f8BB880c1BD0ee4C387537de4b4b6"), + ); + holesky.insert( + HoleskyLidoModule::Sandbox as u8, + address!("D6C2ce3BB8bea2832496Ac8b5144819719f343AC"), + ); + holesky.insert( + HoleskyLidoModule::CommunityStaking as u8, + address!("4562c3e63c2e586cD1651B958C22F88135aCAd4f"), + ); map.insert(Chain::Holesky, holesky); // --- Hoodi --- let mut hoodi = HashMap::new(); - hoodi.insert(HoodiLidoModule::Curated as u8, address!("5cDbE1590c083b5A2A64427fAA63A7cfDB91FbB5")); - hoodi.insert(HoodiLidoModule::SimpleDVT as u8, address!("0B5236BECA68004DB89434462DfC3BB074d2c830")); - hoodi.insert(HoodiLidoModule::Sandbox as u8, address!("682E94d2630846a503BDeE8b6810DF71C9806891")); - hoodi.insert(HoodiLidoModule::CommunityStaking as u8, address!("79CEf36D84743222f37765204Bec41E92a93E59d")); + hoodi.insert( + HoodiLidoModule::Curated as u8, + address!("5cDbE1590c083b5A2A64427fAA63A7cfDB91FbB5"), + ); + hoodi.insert( + HoodiLidoModule::SimpleDVT as u8, + address!("0B5236BECA68004DB89434462DfC3BB074d2c830"), + ); + hoodi.insert( + HoodiLidoModule::Sandbox as u8, + address!("682E94d2630846a503BDeE8b6810DF71C9806891"), + ); + hoodi.insert( + HoodiLidoModule::CommunityStaking as u8, + address!("79CEf36D84743222f37765204Bec41E92a93E59d"), + ); map.insert(Chain::Hoodi, hoodi); // --- Sepolia -- @@ -304,22 +374,21 @@ fn lido_registry_addresses_by_module() -> HashMap> { // Fetching appropiate registry address fn lido_registry_address(chain: Chain, lido_module_id: u8) -> eyre::Result

{ - lido_registry_addresses_by_module() - .get(&chain) + lido_registry_addresses_by_module() + .get(&chain) .ok_or_else(|| eyre::eyre!("Lido registry not supported for chain: {chain:?}"))? .get(&lido_module_id) .copied() - .ok_or_else(|| eyre::eyre!( - "Lido module id {:?} not found for chain: {chain:?}", - lido_module_id - )) + .ok_or_else(|| { + eyre::eyre!("Lido module id {:?} not found for chain: {chain:?}", lido_module_id) + }) } fn is_csm_module(chain: Chain, module_id: u8) -> bool { match chain { Chain::Mainnet => module_id == MainnetLidoModule::CommunityStaking as u8, Chain::Holesky => module_id == HoleskyLidoModule::CommunityStaking as u8, - Chain::Hoodi => module_id == HoodiLidoModule::CommunityStaking as u8, + Chain::Hoodi => module_id == HoodiLidoModule::CommunityStaking as u8, _ => false, } } @@ -351,10 +420,8 @@ async fn fetch_lido_csm_keys_total

( where P: Clone + Send + Sync + 'static + alloy::providers::Provider, { - let summary: getNodeOperatorSummaryReturn = registry - .getNodeOperatorSummary(node_operator_id) - .call() - .await?; + let summary: getNodeOperatorSummaryReturn = + registry.getNodeOperatorSummary(node_operator_id).call().await?; let total_u256 = summary.totalDepositedValidators + summary.depositableValidatorsCount; @@ -371,11 +438,8 @@ async fn fetch_lido_module_keys_total

( where P: Clone + Send + Sync + 'static + alloy::providers::Provider, { - let total_keys: u64 = registry - .getTotalSigningKeyCount(node_operator_id) - .call() - .await? - .try_into()?; + let total_keys: u64 = + registry.getTotalSigningKeyCount(node_operator_id).call().await?.try_into()?; Ok(total_keys) } @@ -384,7 +448,7 @@ async fn fetch_lido_csm_keys_batch

( registry: &LidoCSMRegistry::LidoCSMRegistryInstance

, node_operator_id: U256, offset: u64, - limit: u64 + limit: u64, ) -> eyre::Result where P: Clone + Send + Sync + 'static + alloy::providers::Provider, @@ -401,7 +465,7 @@ async fn fetch_lido_module_keys_batch

( registry: &LidoRegistry::LidoRegistryInstance

, node_operator_id: U256, offset: u64, - limit: u64 + limit: u64, ) -> eyre::Result where P: Clone + Send + Sync + 'static + alloy::providers::Provider, @@ -418,7 +482,7 @@ where async fn collect_registry_keys( total_keys: u64, mut fetch_batch: F, -) -> eyre::Result> +) -> eyre::Result> where F: FnMut(u64, u64) -> Fut, Fut: std::future::Future>, @@ -463,33 +527,35 @@ where Ok(keys) } -async fn fetch_lido_csm_registry_keys ( +async fn fetch_lido_csm_registry_keys( registry_address: Address, rpc_client: RpcClient, node_operator_id: U256, ) -> eyre::Result> { let provider = ProviderBuilder::new().connect_client(rpc_client); let registry = get_lido_csm_registry(registry_address, provider); - - let total_keys = fetch_lido_csm_keys_total(®istry, node_operator_id).await?.try_into()?; + + let total_keys = fetch_lido_csm_keys_total(®istry, node_operator_id).await?; collect_registry_keys(total_keys, |offset, limit| { fetch_lido_csm_keys_batch(®istry, node_operator_id, offset, limit) - }).await + }) + .await } -async fn fetch_lido_module_registry_keys ( +async fn fetch_lido_module_registry_keys( registry_address: Address, rpc_client: RpcClient, node_operator_id: U256, ) -> eyre::Result> { let provider = ProviderBuilder::new().connect_client(rpc_client); let registry = get_lido_module_registry(registry_address, provider); - let total_keys: u64 = fetch_lido_module_keys_total(®istry, node_operator_id).await?.try_into()?; + let total_keys: u64 = fetch_lido_module_keys_total(®istry, node_operator_id).await?; collect_registry_keys(total_keys, |offset, limit| { fetch_lido_module_keys_batch(®istry, node_operator_id, offset, limit) - }).await + }) + .await } async fn fetch_lido_registry_keys( @@ -516,6 +582,7 @@ async fn fetch_lido_registry_keys( } async fn fetch_ssv_pubkeys( + api_url: Url, chain: Chain, node_operator_id: U256, http_timeout: Duration, @@ -533,11 +600,12 @@ async fn fetch_ssv_pubkeys( let mut page = 1; loop { - let url = format!( - "https://api.ssv.network/api/v4/{chain_name}/validators/in_operator/{node_operator_id}?perPage={MAX_PER_PAGE}&page={page}", + let route = format!( + "{chain_name}/validators/in_operator/{node_operator_id}?perPage={MAX_PER_PAGE}&page={page}", ); + let url = api_url.join(&route).wrap_err("failed to construct SSV API URL")?; - let response = fetch_ssv_pubkeys_from_url(&url, http_timeout).await?; + let response = fetch_ssv_pubkeys_from_url(url, http_timeout).await?; let fetched = response.validators.len(); pubkeys.extend( response.validators.into_iter().map(|v| v.pubkey).collect::>(), @@ -558,74 +626,12 @@ async fn fetch_ssv_pubkeys( Ok(pubkeys) } -async fn fetch_ssv_pubkeys_from_url( - url: &str, - http_timeout: Duration, -) -> eyre::Result { - let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?; - let response = client.get(url).send().await.map_err(|e| { - if e.is_timeout() { - eyre::eyre!("Request to SSV network API timed out: {e}") - } else { - eyre::eyre!("Error sending request to SSV network API: {e}") - } - })?; - - // Parse the response as JSON - let body_bytes = safe_read_http_response(response).await?; - serde_json::from_slice::(&body_bytes).wrap_err("failed to parse SSV response") -} - -#[derive(Deserialize)] -struct SSVResponse { - validators: Vec, - pagination: SSVPagination, -} - -struct SSVValidator { - pubkey: BlsPublicKey, -} - -impl<'de> Deserialize<'de> for SSVValidator { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - #[derive(Deserialize)] - struct SSVValidator { - public_key: String, - } - - let s = SSVValidator::deserialize(deserializer)?; - let bytes = alloy::hex::decode(&s.public_key).map_err(serde::de::Error::custom)?; - let pubkey = BlsPublicKey::deserialize(&bytes) - .map_err(|e| serde::de::Error::custom(format!("invalid BLS public key: {e:?}")))?; - - Ok(Self { pubkey }) - } -} - -#[derive(Deserialize)] -struct SSVPagination { - total: usize, -} - #[cfg(test)] mod tests { - use std::net::SocketAddr; - use alloy::{primitives::U256, providers::ProviderBuilder}; - use axum::{response::Response, routing::get}; - use tokio::{net::TcpListener, task::JoinHandle}; use url::Url; use super::*; - use crate::{ - config::{HTTP_TIMEOUT_SECONDS_DEFAULT, MUXER_HTTP_MAX_LENGTH}, - utils::{ResponseReadError, bls_pubkey_from_hex_unchecked, set_ignore_content_length}, - }; - - const TEST_HTTP_TIMEOUT: u64 = 2; #[tokio::test] async fn test_lido_registry_address() -> eyre::Result<()> { @@ -664,34 +670,25 @@ mod tests { #[tokio::test] async fn test_lido_csm_registry_address() -> eyre::Result<()> { - use alloy::{primitives::U256, providers::ProviderBuilder}; - let url = Url::parse("https://ethereum-rpc.publicnode.com")?; let provider = ProviderBuilder::new().connect_http(url); - - let registry = LidoCSMRegistry::new( - address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), - provider, - ); - + + let registry = + LidoCSMRegistry::new(address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), provider); + const LIMIT: usize = 3; let node_operator_id = U256::from(1); - - let summary = registry - .getNodeOperatorSummary(node_operator_id) - .call() - .await?; - + + let summary = registry.getNodeOperatorSummary(node_operator_id).call().await?; + let total_keys_u256 = summary.totalDepositedValidators + summary.depositableValidatorsCount; let total_keys: u64 = total_keys_u256.try_into()?; - + assert!(total_keys > LIMIT as u64, "expected more than {LIMIT} keys, got {total_keys}"); - - let pubkeys = registry - .getSigningKeys(node_operator_id, U256::ZERO, U256::from(LIMIT)) - .call() - .await?; - + + let pubkeys = + registry.getSigningKeys(node_operator_id, U256::ZERO, U256::from(LIMIT)).call().await?; + let mut vec = Vec::new(); for chunk in pubkeys.chunks(BLS_PUBLIC_KEY_BYTES_LEN) { vec.push( @@ -699,190 +696,9 @@ mod tests { .map_err(|_| eyre::eyre!("invalid BLS public key"))?, ); } - - assert_eq!(vec.len(), LIMIT, "expected {LIMIT} keys, got {}", vec.len()); - - Ok(()) - } - - #[tokio::test] - /// Tests that a successful SSV network fetch is handled and parsed properly - async fn test_ssv_network_fetch() -> eyre::Result<()> { - // Start the mock server - let port = 30100; - let _server_handle = create_mock_server(port).await?; - let url = format!("http://localhost:{port}/ssv"); - let response = - fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)) - .await?; - - // Make sure the response is correct - // NOTE: requires that ssv_data.json dpesn't change - assert_eq!(response.validators.len(), 3); - let expected_pubkeys = [ - bls_pubkey_from_hex_unchecked( - "967ba17a3e7f82a25aa5350ec34d6923e28ad8237b5a41efe2c5e325240d74d87a015bf04634f21900963539c8229b2a", - ), - bls_pubkey_from_hex_unchecked( - "ac769e8cec802e8ffee34de3253be8f438a0c17ee84bdff0b6730280d24b5ecb77ebc9c985281b41ee3bda8663b6658c", - ), - bls_pubkey_from_hex_unchecked( - "8c866a5a05f3d45c49b457e29365259021a509c5daa82e124f9701a960ee87b8902e87175315ab638a3d8b1115b23639", - ), - ]; - for (i, validator) in response.validators.iter().enumerate() { - assert_eq!(validator.pubkey, expected_pubkeys[i]); - } - - // Clean up the server handle - _server_handle.abort(); - - Ok(()) - } - - #[tokio::test] - /// Tests that the SSV network fetch is handled properly when the response's - /// body is too large - async fn test_ssv_network_fetch_big_data() -> eyre::Result<()> { - // Start the mock server - let port = 30101; - let _server_handle = create_mock_server(port).await?; - let url = format!("http://localhost:{port}/big_data"); - let response = fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(120)).await; - - // The response should fail due to content length being too big - match response { - Ok(_) => { - panic!("Expected an error due to big content length, but got a successful response") - } - Err(e) => match e.downcast_ref::() { - Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { - assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); - assert!(*content_length > MUXER_HTTP_MAX_LENGTH); - assert!(raw.is_empty()); - } - _ => panic!("Expected PayloadTooLarge error, got: {}", e), - }, - } - - // Clean up the server handle - _server_handle.abort(); - - Ok(()) - } - - #[tokio::test] - /// Tests that the SSV network fetch is handled properly when the request - /// times out - async fn test_ssv_network_fetch_timeout() -> eyre::Result<()> { - // Start the mock server - let port = 30102; - let _server_handle = create_mock_server(port).await?; - let url = format!("http://localhost:{port}/timeout"); - let response = - fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; - - // The response should fail due to timeout - assert!(response.is_err(), "Expected timeout error, but got success"); - if let Err(e) = response { - assert!(e.to_string().contains("timed out"), "Expected timeout error, got: {}", e); - } - - // Clean up the server handle - _server_handle.abort(); - - Ok(()) - } - - #[tokio::test] - /// Tests that the SSV network fetch is handled properly when the response's - /// content-length header is missing - async fn test_ssv_network_fetch_big_data_without_content_length() -> eyre::Result<()> { - // Start the mock server - let port = 30103; - set_ignore_content_length(true); - let _server_handle = create_mock_server(port).await?; - let url = format!("http://localhost:{port}/big_data"); - let response = fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(120)).await; - - // The response should fail due to the body being too big - match response { - Ok(_) => { - panic!("Expected an error due to excessive data, but got a successful response") - } - Err(e) => match e.downcast_ref::() { - Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { - assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); - assert_eq!(*content_length, 0); - assert!(!raw.is_empty()); - } - _ => panic!("Expected PayloadTooLarge error, got: {}", e), - }, - } - // Clean up the server handle - _server_handle.abort(); + assert_eq!(vec.len(), LIMIT, "expected {LIMIT} keys, got {}", vec.len()); Ok(()) } - - /// Creates a simple mock server to simulate the SSV API endpoint under - /// various conditions for testing - async fn create_mock_server(port: u16) -> Result, axum::Error> { - let router = axum::Router::new() - .route("/ssv", get(handle_ssv)) - .route("/big_data", get(handle_big_data)) - .route("/timeout", get(handle_timeout)) - .into_make_service(); - - let address = SocketAddr::from(([127, 0, 0, 1], port)); - let listener = TcpListener::bind(address).await.map_err(axum::Error::new)?; - let server = axum::serve(listener, router).with_graceful_shutdown(async { - tokio::signal::ctrl_c().await.expect("Failed to listen for shutdown signal"); - }); - let result = Ok(tokio::spawn(async move { - if let Err(e) = server.await { - eprintln!("Server error: {}", e); - } - })); - info!("Mock server started on http://localhost:{port}/"); - result - } - - /// Sends the good SSV JSON data to the client - async fn handle_ssv() -> Response { - // Read the JSON data - let data = include_str!("../../../../tests/data/ssv_valid.json"); - - // Create a valid response - Response::builder() - .status(200) - .header("Content-Type", "application/json") - .body(data.into()) - .unwrap() - } - - /// Sends a response with a large body - larger than the maximum allowed. - /// Note that hyper overwrites the content-length header automatically, so - /// setting it here wouldn't actually change the value that ultimately - /// gets sent to the server. - async fn handle_big_data() -> Response { - let body = "f".repeat(2 * MUXER_HTTP_MAX_LENGTH); - Response::builder() - .status(200) - .header("Content-Type", "application/text") - .body(body.into()) - .unwrap() - } - - /// Simulates a timeout by sleeping for a long time - async fn handle_timeout() -> Response { - // Sleep for a long time to simulate a timeout - tokio::time::sleep(std::time::Duration::from_secs(2 * TEST_HTTP_TIMEOUT)).await; - Response::builder() - .status(200) - .header("Content-Type", "application/text") - .body("Timeout response".into()) - .unwrap() - } } diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index 3e2c0a14..c2c30d2f 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -21,12 +21,12 @@ use super::{ use crate::{ commit::client::SignerClient, config::{ - CONFIG_ENV, MODULE_JWT_ENV, PBS_MODULE_NAME, PbsMuxes, SIGNER_URL_ENV, load_env_var, - load_file_from_env, + CONFIG_ENV, MODULE_JWT_ENV, MuxKeysLoader, PBS_MODULE_NAME, PbsMuxes, SIGNER_URL_ENV, + load_env_var, load_file_from_env, }, pbs::{ - DEFAULT_PBS_PORT, DefaultTimeout, LATE_IN_SLOT_TIME_MS, REGISTER_VALIDATOR_RETRY_LIMIT, - RelayClient, RelayEntry, + DEFAULT_PBS_PORT, DEFAULT_REGISTRY_REFRESH_SECONDS, DefaultTimeout, LATE_IN_SLOT_TIME_MS, + REGISTER_VALIDATOR_RETRY_LIMIT, RelayClient, RelayEntry, }, types::{BlsPublicKey, Chain, Jwt, ModuleId}, utils::{ @@ -123,6 +123,9 @@ pub struct PbsConfig { pub extra_validation_enabled: bool, /// Execution Layer RPC url to use for extra validation pub rpc_url: Option, + /// URL for the SSV network API + #[serde(default = "default_ssv_api_url")] + pub ssv_api_url: Url, /// Timeout for HTTP requests in seconds #[serde(default = "default_u64::")] pub http_timeout_seconds: u64, @@ -133,6 +136,11 @@ pub struct PbsConfig { /// request #[serde(deserialize_with = "empty_string_as_none", default)] pub validator_registration_batch_size: Option, + /// For any Registry-based Mux configurations that have dynamic pubkey + /// refreshing enabled, this is how often to refresh the list of pubkeys + /// from the registry, in seconds + #[serde(default = "default_u64::<{ DEFAULT_REGISTRY_REFRESH_SECONDS }>")] + pub mux_registry_refresh_interval_seconds: u64, } impl PbsConfig { @@ -183,6 +191,11 @@ impl PbsConfig { } } + ensure!( + self.mux_registry_refresh_interval_seconds > 0, + "registry mux refreshing interval must be greater than 0" + ); + Ok(()) } } @@ -213,13 +226,15 @@ pub struct PbsModuleConfig { /// List of default relays pub relays: Vec, /// List of all default relays plus additional relays from muxes (based on - /// URL) DO NOT use this for get_header calls, use `relays` or `muxes` + /// URL) DO NOT use this for get_header calls, use `relays` or `mux_lookup` /// instead pub all_relays: Vec, /// Signer client to call Signer API pub signer_client: Option, - /// Muxes config - pub muxes: Option>, + /// List of raw mux details configured, if any + pub registry_muxes: Option>, + /// Lookup of pubkey to mux config + pub mux_lookup: Option>, } fn default_pbs() -> String { @@ -246,19 +261,23 @@ pub async fn load_pbs_config() -> Result { SocketAddr::from((config.pbs.pbs_config.host, config.pbs.pbs_config.port)) }; - let muxes = match config.muxes { - Some(muxes) => { - let mux_configs = muxes.validate_and_fill(config.chain, &config.pbs.pbs_config).await?; - Some(mux_configs) - } - None => None, - }; - + // Get the list of relays from the default config let relay_clients = config.relays.into_iter().map(RelayClient::new).collect::>>()?; let mut all_relays = HashMap::with_capacity(relay_clients.len()); - if let Some(muxes) = &muxes { + // Validate the muxes and build the lookup tables + let (mux_lookup, registry_muxes) = match config.muxes { + Some(muxes) => { + let (mux_lookup, registry_muxes) = + muxes.validate_and_fill(config.chain, &config.pbs.pbs_config).await?; + (Some(mux_lookup), Some(registry_muxes)) + } + None => (None, None), + }; + + // Build the list of all relays, starting with muxes + if let Some(muxes) = &mux_lookup { for (_, mux) in muxes.iter() { for relay in mux.relays.iter() { all_relays.insert(&relay.config.entry.url, relay.clone()); @@ -283,7 +302,8 @@ pub async fn load_pbs_config() -> Result { relays: relay_clients, all_relays, signer_client: None, - muxes, + registry_muxes, + mux_lookup, }) } @@ -319,20 +339,24 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC )) }; - let muxes = match cb_config.muxes { - Some(muxes) => Some( - muxes - .validate_and_fill(cb_config.chain, &cb_config.pbs.static_config.pbs_config) - .await?, - ), - None => None, - }; - + // Get the list of relays from the default config let relay_clients = cb_config.relays.into_iter().map(RelayClient::new).collect::>>()?; let mut all_relays = HashMap::with_capacity(relay_clients.len()); - if let Some(muxes) = &muxes { + // Validate the muxes and build the lookup tables + let (mux_lookup, registry_muxes) = match cb_config.muxes { + Some(muxes) => { + let (mux_lookup, registry_muxes) = muxes + .validate_and_fill(cb_config.chain, &cb_config.pbs.static_config.pbs_config) + .await?; + (Some(mux_lookup), Some(registry_muxes)) + } + None => (None, None), + }; + + // Build the list of all relays, starting with muxes + if let Some(muxes) = &mux_lookup { for (_, mux) in muxes.iter() { for relay in mux.relays.iter() { all_relays.insert(&relay.config.entry.url, relay.clone()); @@ -371,8 +395,14 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC relays: relay_clients, all_relays, signer_client, - muxes, + registry_muxes, + mux_lookup, }, cb_config.pbs.extra, )) } + +/// Default URL for the SSV network API +fn default_ssv_api_url() -> Url { + Url::parse("https://api.ssv.network/api/v4").expect("default URL is valid") +} diff --git a/crates/common/src/interop/mod.rs b/crates/common/src/interop/mod.rs new file mode 100644 index 00000000..42502f6f --- /dev/null +++ b/crates/common/src/interop/mod.rs @@ -0,0 +1 @@ +pub mod ssv; diff --git a/crates/common/src/interop/ssv/mod.rs b/crates/common/src/interop/ssv/mod.rs new file mode 100644 index 00000000..b4ab6a6a --- /dev/null +++ b/crates/common/src/interop/ssv/mod.rs @@ -0,0 +1,2 @@ +pub mod types; +pub mod utils; diff --git a/crates/common/src/interop/ssv/types.rs b/crates/common/src/interop/ssv/types.rs new file mode 100644 index 00000000..b8ac2e23 --- /dev/null +++ b/crates/common/src/interop/ssv/types.rs @@ -0,0 +1,61 @@ +use serde::{Deserialize, Deserializer, Serialize}; + +use crate::types::BlsPublicKey; + +/// Response from the SSV API for validators +#[derive(Deserialize, Serialize)] +pub struct SSVResponse { + /// List of validators returned by the SSV API + pub validators: Vec, + + /// Pagination information + pub pagination: SSVPagination, +} + +/// Representation of a validator in the SSV API +#[derive(Clone)] +pub struct SSVValidator { + /// The public key of the validator + pub pubkey: BlsPublicKey, +} + +impl<'de> Deserialize<'de> for SSVValidator { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + struct SSVValidator { + public_key: String, + } + + let s = SSVValidator::deserialize(deserializer)?; + let bytes = alloy::hex::decode(&s.public_key).map_err(serde::de::Error::custom)?; + let pubkey = BlsPublicKey::deserialize(&bytes) + .map_err(|e| serde::de::Error::custom(format!("invalid BLS public key: {e:?}")))?; + + Ok(Self { pubkey }) + } +} + +impl Serialize for SSVValidator { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + #[derive(Serialize)] + struct SSVValidator { + public_key: String, + } + + let s = SSVValidator { public_key: self.pubkey.as_hex_string() }; + s.serialize(serializer) + } +} + +/// Pagination information from the SSV API +#[derive(Deserialize, Serialize)] +pub struct SSVPagination { + /// Total number of validators available + pub total: usize, +} diff --git a/crates/common/src/interop/ssv/utils.rs b/crates/common/src/interop/ssv/utils.rs new file mode 100644 index 00000000..e443e018 --- /dev/null +++ b/crates/common/src/interop/ssv/utils.rs @@ -0,0 +1,24 @@ +use std::time::Duration; + +use eyre::Context; +use url::Url; + +use crate::{config::safe_read_http_response, interop::ssv::types::SSVResponse}; + +pub async fn fetch_ssv_pubkeys_from_url( + url: Url, + http_timeout: Duration, +) -> eyre::Result { + let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?; + let response = client.get(url).send().await.map_err(|e| { + if e.is_timeout() { + eyre::eyre!("Request to SSV network API timed out: {e}") + } else { + eyre::eyre!("Error sending request to SSV network API: {e}") + } + })?; + + // Parse the response as JSON + let body_bytes = safe_read_http_response(response).await?; + serde_json::from_slice::(&body_bytes).wrap_err("failed to parse SSV response") +} diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 1fe1f26a..462dcec1 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -3,6 +3,7 @@ use std::time::Duration; pub mod commit; pub mod config; pub mod constants; +pub mod interop; pub mod pbs; pub mod signature; pub mod signer; diff --git a/crates/common/src/pbs/constants.rs b/crates/common/src/pbs/constants.rs index 533da137..bbe20b0d 100644 --- a/crates/common/src/pbs/constants.rs +++ b/crates/common/src/pbs/constants.rs @@ -35,3 +35,5 @@ pub const LATE_IN_SLOT_TIME_MS: u64 = 2000; // Maximum number of retries for validator registration request per relay pub const REGISTER_VALIDATOR_RETRY_LIMIT: u32 = 3; + +pub const DEFAULT_REGISTRY_REFRESH_SECONDS: u64 = 12 * 32; // One epoch diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index d972477b..764ab188 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -1,4 +1,4 @@ -#[cfg(test)] +#[cfg(feature = "testing-flags")] use std::cell::Cell; use std::{ net::Ipv4Addr, @@ -43,17 +43,18 @@ pub enum ResponseReadError { ReqwestError(#[from] reqwest::Error), } -#[cfg(test)] +#[cfg(feature = "testing-flags")] thread_local! { static IGNORE_CONTENT_LENGTH: Cell = const { Cell::new(false) }; } -#[cfg(test)] +#[cfg(feature = "testing-flags")] pub fn set_ignore_content_length(val: bool) { IGNORE_CONTENT_LENGTH.with(|f| f.set(val)); } -#[cfg(test)] +#[cfg(feature = "testing-flags")] +#[allow(dead_code)] fn should_ignore_content_length() -> bool { IGNORE_CONTENT_LENGTH.with(|f| f.get()) } @@ -65,13 +66,13 @@ pub async fn read_chunked_body_with_max( max_size: usize, ) -> Result, ResponseReadError> { // Get the content length from the response headers - #[cfg(not(test))] + #[cfg(not(feature = "testing-flags"))] let content_length = res.content_length(); - #[cfg(test)] + #[cfg(feature = "testing-flags")] let mut content_length = res.content_length(); - #[cfg(test)] + #[cfg(feature = "testing-flags")] if should_ignore_content_length() { // Used for testing purposes to ignore content length content_length = None; diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 98a411f7..9ed312af 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -35,7 +35,7 @@ pub async fn handle_get_header>( info!(ua, ms_into_slot, "new request"); - match A::get_header(params, req_headers, state.clone()).await { + match A::get_header(params, req_headers, state).await { Ok(res) => { if let Some(max_bid) = res { info!(value_eth = format_ether(*max_bid.data.message.value()), block_hash =% max_bid.block_hash(), "received header"); diff --git a/crates/pbs/src/routes/register_validator.rs b/crates/pbs/src/routes/register_validator.rs index dad85d27..51c8ce6e 100644 --- a/crates/pbs/src/routes/register_validator.rs +++ b/crates/pbs/src/routes/register_validator.rs @@ -24,7 +24,7 @@ pub async fn handle_register_validator>( info!(ua, num_registrations = registrations.len(), "new request"); - if let Err(err) = A::register_validator(registrations, req_headers, state.clone()).await { + if let Err(err) = A::register_validator(registrations, req_headers, state).await { error!(%err, "all relays failed registration"); let err = PbsClientError::NoResponse; diff --git a/crates/pbs/src/routes/reload.rs b/crates/pbs/src/routes/reload.rs index 86c09273..aa031d47 100644 --- a/crates/pbs/src/routes/reload.rs +++ b/crates/pbs/src/routes/reload.rs @@ -20,7 +20,7 @@ pub async fn handle_reload>( info!(ua, relay_check = prev_state.config.pbs_config.relay_check); - match A::reload(prev_state.clone()).await { + match A::reload(prev_state).await { Ok(new_state) => { info!("config reload successful"); diff --git a/crates/pbs/src/routes/status.rs b/crates/pbs/src/routes/status.rs index d0f68ac7..52fd3e2f 100644 --- a/crates/pbs/src/routes/status.rs +++ b/crates/pbs/src/routes/status.rs @@ -21,7 +21,7 @@ pub async fn handle_get_status>( info!(ua, relay_check = state.config.pbs_config.relay_check, "new request"); - match A::get_status(req_headers, state.clone()).await { + match A::get_status(req_headers, state).await { Ok(_) => { info!("relay check successful"); diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index b3e98553..4784e6b1 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -65,7 +65,7 @@ async fn handle_submit_block_impl>( info!(ua, ms_into_slot = now.saturating_sub(slot_start_ms), "new request"); - match A::submit_block(signed_blinded_block, req_headers, state.clone(), &api_version).await { + match A::submit_block(signed_blinded_block, req_headers, state, &api_version).await { Ok(res) => match res { Some(block_response) => { trace!(?block_response); diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 16afb6a7..6659ae85 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -1,6 +1,11 @@ -use std::time::Duration; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; use cb_common::{ + config::{MuxKeysLoader, PbsModuleConfig}, constants::{COMMIT_BOOST_COMMIT, COMMIT_BOOST_VERSION}, pbs::{BUILDER_V1_API_PATH, GET_STATUS_PATH}, types::Chain, @@ -10,14 +15,14 @@ use eyre::{Context, Result, bail}; use parking_lot::RwLock; use prometheus::core::Collector; use tokio::net::TcpListener; -use tracing::info; +use tracing::{debug, info, warn}; use url::Url; use crate::{ api::BuilderApi, metrics::PBS_METRICS_REGISTRY, routes::create_app_router, - state::{BuilderApiState, PbsState}, + state::{BuilderApiState, PbsState, PbsStateGuard}, }; pub struct PbsService; @@ -27,7 +32,16 @@ impl PbsService { let addr = state.config.endpoint; info!(version = COMMIT_BOOST_VERSION, commit_hash = COMMIT_BOOST_COMMIT, ?addr, chain =? state.config.chain, "starting PBS service"); - let app = create_app_router::(RwLock::new(state).into()); + // Check if refreshing registry muxes is required + let registry_refresh_time = state.config.pbs_config.mux_registry_refresh_interval_seconds; + let is_refreshing_required = state.config.registry_muxes.as_ref().is_some_and(|muxes| { + muxes.iter().any(|(loader, _)| { + matches!(loader, MuxKeysLoader::Registry { enable_refreshing: true, .. }) + }) + }); + + let state: Arc>> = RwLock::new(state).into(); + let app = create_app_router::(state.clone()); let listener = TcpListener::bind(addr).await?; let task = @@ -45,6 +59,24 @@ impl PbsService { bail!("PBS server failed to start. Are the relays properly configured?"); } + // Run the registry refresher task + if is_refreshing_required { + let mut interval = tokio::time::interval(Duration::from_secs(registry_refresh_time)); + let state = state.clone(); + tokio::spawn(async move { + let mut is_first_tick = true; + loop { + interval.tick().await; + if is_first_tick { + // Don't run immediately on the first tick, since it was just initialized + is_first_tick = false; + continue; + } + Self::refresh_registry_muxes(state.clone()).await; + } + }); + } + task.await? } @@ -55,4 +87,111 @@ impl PbsService { pub fn init_metrics(network: Chain) -> Result<()> { MetricsProvider::load_and_run(network, PBS_METRICS_REGISTRY.clone()) } + + async fn refresh_registry_muxes(state: PbsStateGuard) { + // Read-only portion + let mut new_pubkeys = HashMap::new(); + let mut removed_pubkeys = HashSet::new(); + { + let state = state.read().clone(); + let config = &state.config; + + // Short circuit if there aren't any registry muxes with dynamic refreshing + let registry_muxes = match &config.registry_muxes { + Some(muxes) => muxes, + None => return, + }; + + // Initialize an empty lookup if the config doesn't have one yet + let mux_lookup = match &config.mux_lookup { + Some(lookup) => lookup, + None => &HashMap::new(), + }; + + // Go through each registry mux and refresh its pubkeys + let default_pbs = &config.pbs_config; + let http_timeout = Duration::from_secs(default_pbs.http_timeout_seconds); + for (loader, runtime_config) in registry_muxes.iter() { + debug!("refreshing pubkeys for registry mux {}", runtime_config.id); + match loader + .load( + &runtime_config.id, + config.chain, + default_pbs.ssv_api_url.clone(), + default_pbs.rpc_url.clone(), + http_timeout, + ) + .await + { + Ok(pubkeys) => { + debug!( + "fetched {} pubkeys for registry mux {}", + pubkeys.len(), + runtime_config.id + ); + + // Add any new pubkeys to the new lookup table + let mut pubkey_set = HashSet::new(); + for pubkey in pubkeys { + pubkey_set.insert(pubkey.clone()); + if mux_lookup.get(&pubkey).is_none() { + // New pubkey + new_pubkeys.insert(pubkey.clone(), runtime_config.clone()); + } + } + + // Find any pubkeys that were removed + for (pubkey, existing_runtime) in mux_lookup.iter() { + if existing_runtime.id == runtime_config.id && + !pubkey_set.contains(pubkey) + { + removed_pubkeys.insert(pubkey.clone()); + } + } + } + Err(err) => { + warn!(%err, "failed to refresh pubkeys for registry mux {}", runtime_config.id); + } + } + } + } + + // Report changes + let mut no_new_changes = true; + if !new_pubkeys.is_empty() { + no_new_changes = false; + info!("discovered {} new pubkeys from registries", new_pubkeys.len()); + } + if !removed_pubkeys.is_empty() { + no_new_changes = false; + info!("registries have removed {} old pubkeys", removed_pubkeys.len()); + } + + // Write portion + if no_new_changes { + return; + } + { + // Since config isn't an RwLock, the option with the least amount of code churn + // is to just clone the whole config and replace the mux_lookup + // field. Cloning the config may be expensive, but this should be a fairly rare + // operation. + let mut state = state.write(); + let config = state.config.as_ref(); + let new_mux_lookup = if let Some(existing) = &config.mux_lookup { + let mut map = HashMap::new(); + for (k, v) in existing.iter() { + if !removed_pubkeys.contains(k) { + map.insert(k.clone(), v.clone()); + } + } + map.extend(new_pubkeys); + map + } else { + new_pubkeys + }; + state.config = + Arc::new(PbsModuleConfig { mux_lookup: Some(new_mux_lookup), ..config.clone() }); + } + } } diff --git a/crates/pbs/src/state.rs b/crates/pbs/src/state.rs index 4403b348..dd0e118e 100644 --- a/crates/pbs/src/state.rs +++ b/crates/pbs/src/state.rs @@ -56,7 +56,7 @@ where &self, pubkey: &BlsPublicKey, ) -> (&PbsConfig, &[RelayClient], Option<&str>) { - match self.config.muxes.as_ref().and_then(|muxes| muxes.get(pubkey)) { + match self.config.mux_lookup.as_ref().and_then(|muxes| muxes.get(pubkey)) { Some(mux) => (&mux.config, mux.relays.as_slice(), Some(&mux.id)), // return only the default relays if there's no match None => (self.pbs_config(), &self.config.relays, None), diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 3195cc11..5e8e1596 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -18,5 +18,9 @@ tempfile.workspace = true tokio.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +tracing-test.workspace = true tree_hash.workspace = true url.workspace = true + +[dev-dependencies] +cb-common = { path = "../crates/common", features = ["testing-flags"] } \ No newline at end of file diff --git a/tests/src/lib.rs b/tests/src/lib.rs index a4fbbb6a..42e36a8e 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -1,3 +1,4 @@ pub mod mock_relay; +pub mod mock_ssv; pub mod mock_validator; pub mod utils; diff --git a/tests/src/mock_ssv.rs b/tests/src/mock_ssv.rs new file mode 100644 index 00000000..3fa12546 --- /dev/null +++ b/tests/src/mock_ssv.rs @@ -0,0 +1,108 @@ +use std::{net::SocketAddr, sync::Arc}; + +use axum::{ + extract::{Path, State}, + response::Response, + routing::get, +}; +use cb_common::{ + config::MUXER_HTTP_MAX_LENGTH, + interop::ssv::types::{SSVPagination, SSVResponse, SSVValidator}, +}; +use tokio::{net::TcpListener, sync::RwLock, task::JoinHandle}; +use tracing::info; + +pub const TEST_HTTP_TIMEOUT: u64 = 2; + +/// State for the mock server +#[derive(Clone)] +pub struct SsvMockState { + /// List of pubkeys for the mock server to return + pub validators: Arc>>, + + /// Whether to force a timeout response to simulate a server error + pub force_timeout: Arc>, +} + +/// Creates a simple mock server to simulate the SSV API endpoint under +/// various conditions for testing. Note this ignores +pub async fn create_mock_ssv_server( + port: u16, + state: Option, +) -> Result, axum::Error> { + let data = include_str!("../../tests/data/ssv_valid.json"); + let response = serde_json::from_str::(data).expect("failed to parse test data"); + let state = state.unwrap_or(SsvMockState { + validators: Arc::new(RwLock::new(response.validators)), + force_timeout: Arc::new(RwLock::new(false)), + }); + let router = axum::Router::new() + .route("/{chain_name}/validators/in_operator/{node_operator_id}", get(handle_validators)) + .route("/big_data", get(handle_big_data)) + .with_state(state) + .into_make_service(); + + let address = SocketAddr::from(([127, 0, 0, 1], port)); + let listener = TcpListener::bind(address).await.map_err(axum::Error::new)?; + let server = axum::serve(listener, router).with_graceful_shutdown(async { + tokio::signal::ctrl_c().await.expect("Failed to listen for shutdown signal"); + }); + let result = Ok(tokio::spawn(async move { + if let Err(e) = server.await { + eprintln!("Server error: {e}"); + } + })); + info!("Mock server started on http://localhost:{port}/"); + result +} + +/// Returns a valid SSV validators response, or a timeout if requested in +/// the server state +async fn handle_validators( + State(state): State, + Path((_, _)): Path<(String, u64)>, +) -> Response { + // Time out if requested + if *state.force_timeout.read().await { + return handle_timeout().await; + } + + // Generate the response based on the current validators + let response: SSVResponse; + { + let validators = state.validators.read().await; + let pagination = SSVPagination { total: validators.len() }; + response = SSVResponse { validators: validators.clone(), pagination }; + } + + // Create a valid response + Response::builder() + .status(200) + .header("Content-Type", "application/json") + .body(serde_json::to_string(&response).unwrap().into()) + .unwrap() +} + +/// Sends a response with a large body - larger than the maximum allowed. +/// Note that hyper overwrites the content-length header automatically, so +/// setting it here wouldn't actually change the value that ultimately +/// gets sent to the server. +async fn handle_big_data() -> Response { + let body = "f".repeat(2 * MUXER_HTTP_MAX_LENGTH); + Response::builder() + .status(200) + .header("Content-Type", "application/text") + .body(body.into()) + .unwrap() +} + +/// Simulates a timeout by sleeping for a long time +async fn handle_timeout() -> Response { + // Sleep for a long time to simulate a timeout + tokio::time::sleep(std::time::Duration::from_secs(2 * TEST_HTTP_TIMEOUT)).await; + Response::builder() + .status(200) + .header("Content-Type", "application/text") + .body("Timeout response".into()) + .unwrap() +} diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 1c83e0eb..58ef42cf 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -14,9 +14,10 @@ use cb_common::{ pbs::{RelayClient, RelayEntry}, signer::SignerLoader, types::{BlsPublicKey, Chain, ModuleId}, - utils::default_host, + utils::{bls_pubkey_from_hex, default_host}, }; use eyre::Result; +use url::Url; pub fn get_local_address(port: u16) -> String { format!("http://0.0.0.0:{port}") @@ -78,10 +79,12 @@ pub fn get_pbs_static_config(port: u16) -> PbsConfig { min_bid_wei: U256::ZERO, late_in_slot_time_ms: u64::MAX, extra_validation_enabled: false, + ssv_api_url: Url::parse("https://example.net").unwrap(), rpc_url: None, http_timeout_seconds: 10, register_validator_retry_limit: u32::MAX, validator_registration_batch_size: None, + mux_registry_refresh_interval_seconds: 5, } } @@ -97,7 +100,8 @@ pub fn to_pbs_config( signer_client: None, all_relays: relays.clone(), relays, - muxes: None, + registry_muxes: None, + mux_lookup: None, } } @@ -131,3 +135,7 @@ pub fn get_start_signer_config( _ => panic!("Only local signers are supported in tests"), } } + +pub fn bls_pubkey_from_hex_unchecked(hex: &str) -> BlsPublicKey { + bls_pubkey_from_hex(hex).unwrap() +} diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 28a3d369..5cde1158 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -1,15 +1,152 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; -use cb_common::{config::RuntimeMuxConfig, signer::random_secret, types::Chain}; +use cb_common::{ + config::{HTTP_TIMEOUT_SECONDS_DEFAULT, MUXER_HTTP_MAX_LENGTH, RuntimeMuxConfig}, + interop::ssv::utils::fetch_ssv_pubkeys_from_url, + signer::random_secret, + types::Chain, + utils::{ResponseReadError, set_ignore_content_length}, +}; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ mock_relay::{MockRelayState, start_mock_relay_service}, + mock_ssv::{SsvMockState, TEST_HTTP_TIMEOUT, create_mock_ssv_server}, mock_validator::MockValidator, - utils::{generate_mock_relay, get_pbs_static_config, setup_test_env, to_pbs_config}, + utils::{ + bls_pubkey_from_hex_unchecked, generate_mock_relay, get_pbs_static_config, setup_test_env, + to_pbs_config, + }, }; use eyre::Result; use reqwest::StatusCode; +use tokio::sync::RwLock; use tracing::info; +use url::Url; + +#[tokio::test] +/// Tests that a successful SSV network fetch is handled and parsed properly +async fn test_ssv_network_fetch() -> Result<()> { + // Start the mock server + let port = 30100; + let _server_handle = create_mock_ssv_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) + .unwrap(); + let response = + fetch_ssv_pubkeys_from_url(url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)).await?; + + // Make sure the response is correct + // NOTE: requires that ssv_data.json dpesn't change + assert_eq!(response.validators.len(), 3); + let expected_pubkeys = [ + bls_pubkey_from_hex_unchecked( + "967ba17a3e7f82a25aa5350ec34d6923e28ad8237b5a41efe2c5e325240d74d87a015bf04634f21900963539c8229b2a", + ), + bls_pubkey_from_hex_unchecked( + "ac769e8cec802e8ffee34de3253be8f438a0c17ee84bdff0b6730280d24b5ecb77ebc9c985281b41ee3bda8663b6658c", + ), + bls_pubkey_from_hex_unchecked( + "8c866a5a05f3d45c49b457e29365259021a509c5daa82e124f9701a960ee87b8902e87175315ab638a3d8b1115b23639", + ), + ]; + for (i, validator) in response.validators.iter().enumerate() { + assert_eq!(validator.pubkey, expected_pubkeys[i]); + } + + // Clean up the server handle + _server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +/// Tests that the SSV network fetch is handled properly when the response's +/// body is too large +async fn test_ssv_network_fetch_big_data() -> Result<()> { + // Start the mock server + let port = 30101; + let server_handle = cb_tests::mock_ssv::create_mock_ssv_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); + let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; + + // The response should fail due to content length being too big + match response { + Ok(_) => { + panic!("Expected an error due to big content length, but got a successful response") + } + Err(e) => match e.downcast_ref::() { + Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { + assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); + assert!(*content_length > MUXER_HTTP_MAX_LENGTH); + assert!(raw.is_empty()); + } + _ => panic!("Expected PayloadTooLarge error, got: {}", e), + }, + } + + // Clean up the server handle + server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +/// Tests that the SSV network fetch is handled properly when the request +/// times out +async fn test_ssv_network_fetch_timeout() -> Result<()> { + // Start the mock server + let port = 30102; + let state = SsvMockState { + validators: Arc::new(RwLock::new(vec![])), + force_timeout: Arc::new(RwLock::new(true)), + }; + let server_handle = create_mock_ssv_server(port, Some(state)).await?; + let url = Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) + .unwrap(); + let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; + + // The response should fail due to timeout + assert!(response.is_err(), "Expected timeout error, but got success"); + if let Err(e) = response { + assert!(e.to_string().contains("timed out"), "Expected timeout error, got: {}", e); + } + + // Clean up the server handle + server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +/// Tests that the SSV network fetch is handled properly when the response's +/// content-length header is missing +async fn test_ssv_network_fetch_big_data_without_content_length() -> Result<()> { + // Start the mock server + let port = 30103; + set_ignore_content_length(true); + let server_handle = create_mock_ssv_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); + let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; + + // The response should fail due to the body being too big + match response { + Ok(_) => { + panic!("Expected an error due to excessive data, but got a successful response") + } + Err(e) => match e.downcast_ref::() { + Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { + assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); + assert_eq!(*content_length, 0); + assert!(!raw.is_empty()); + } + _ => panic!("Expected PayloadTooLarge error, got: {}", e), + }, + } + + // Clean up the server handle + server_handle.abort(); + + Ok(()) +} #[tokio::test] async fn test_mux() -> Result<()> { @@ -44,7 +181,7 @@ async fn test_mux() -> Result<()> { // Bind mux to a specific validator key let validator_pubkey = random_secret().public_key(); - config.muxes = Some(HashMap::from([(validator_pubkey.clone(), mux)])); + config.mux_lookup = Some(HashMap::from([(validator_pubkey.clone(), mux)])); // Run PBS service let state = PbsState::new(config); diff --git a/tests/tests/pbs_mux_refresh.rs b/tests/tests/pbs_mux_refresh.rs new file mode 100644 index 00000000..1d4eb3ac --- /dev/null +++ b/tests/tests/pbs_mux_refresh.rs @@ -0,0 +1,175 @@ +use std::{sync::Arc, time::Duration}; + +use cb_common::{ + config::{MuxConfig, MuxKeysLoader, PbsMuxes}, + interop::ssv::types::SSVValidator, + signer::random_secret, + types::Chain, +}; +use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; +use cb_tests::{ + mock_relay::{MockRelayState, start_mock_relay_service}, + mock_ssv::{SsvMockState, create_mock_ssv_server}, + mock_validator::MockValidator, + utils::{generate_mock_relay, get_pbs_static_config, to_pbs_config}, +}; +use eyre::Result; +use reqwest::StatusCode; +use tokio::sync::RwLock; +use tracing::info; +use url::Url; + +#[tokio::test] +#[allow(unused_assignments)] +#[tracing_test::traced_test] +async fn test_auto_refresh() -> Result<()> { + // This test reads the log files to verify behavior, so we can't attach a global + // trace listener setup_test_env(); + + // Generate 3 keys: one not in the mux relay, one in the relay, and one that + // hasn't been added yet but will be later. The existing key isn't used but is + // needed in the initial config since CB won't start a mux without at least one + // key. + let default_signer = random_secret(); + let default_pubkey = default_signer.public_key(); + let existing_mux_signer = random_secret(); + let existing_mux_pubkey = existing_mux_signer.public_key(); + let new_mux_signer = random_secret(); + let new_mux_pubkey = new_mux_signer.public_key(); + + let chain = Chain::Hoodi; + let pbs_port = 3710; + + // Start the mock SSV API server + let ssv_api_port = pbs_port + 1; + let ssv_api_url = Url::parse(&format!("http://localhost:{ssv_api_port}"))?; + let mock_ssv_state = SsvMockState { + validators: Arc::new(RwLock::new(vec![SSVValidator { + pubkey: existing_mux_pubkey.clone(), + }])), + force_timeout: Arc::new(RwLock::new(false)), + }; + let ssv_server_handle = + create_mock_ssv_server(ssv_api_port, Some(mock_ssv_state.clone())).await?; + + // Start a default relay for non-mux keys + let default_relay_port = ssv_api_port + 1; + let default_relay = generate_mock_relay(default_relay_port, default_pubkey.clone())?; + let default_relay_state = Arc::new(MockRelayState::new(chain, default_signer.clone())); + let default_relay_task = + tokio::spawn(start_mock_relay_service(default_relay_state.clone(), default_relay_port)); + + // Start a mock relay to be used by the mux + let mux_relay_port = default_relay_port + 1; + let mux_relay = generate_mock_relay(mux_relay_port, default_pubkey.clone())?; + let mux_relay_id = mux_relay.id.clone().to_string(); + let mux_relay_state = Arc::new(MockRelayState::new(chain, default_signer)); + let mux_relay_task = + tokio::spawn(start_mock_relay_service(mux_relay_state.clone(), mux_relay_port)); + + // Create the registry mux + let loader = MuxKeysLoader::Registry { + enable_refreshing: true, + node_operator_id: 1, + lido_module_id: None, + registry: cb_common::config::NORegistry::SSV, + }; + let muxes = PbsMuxes { + muxes: vec![MuxConfig { + id: mux_relay_id.clone(), + loader: Some(loader), + late_in_slot_time_ms: Some(u64::MAX), + relays: vec![(*mux_relay.config).clone()], + timeout_get_header_ms: Some(u64::MAX - 1), + validator_pubkeys: vec![], + }], + }; + + // Set up the PBS config + let mut pbs_config = get_pbs_static_config(pbs_port); + pbs_config.ssv_api_url = ssv_api_url.clone(); + pbs_config.mux_registry_refresh_interval_seconds = 1; // Refresh the mux every second + let (mux_lookup, registry_muxes) = muxes.validate_and_fill(chain, &pbs_config).await?; + let relays = vec![default_relay.clone()]; // Default relay only + let mut config = to_pbs_config(chain, pbs_config, relays); + config.all_relays.push(mux_relay.clone()); // Add the mux relay to just this field + config.mux_lookup = Some(mux_lookup); + config.registry_muxes = Some(registry_muxes); + + // Run PBS service + let state = PbsState::new(config); + let pbs_server = tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + info!("Started PBS server with pubkey {default_pubkey}"); + + // Wait for the server to start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Try to run a get_header on the new pubkey, which should use the default + // relay only since it hasn't been seen in the mux yet + let mock_validator = MockValidator::new(pbs_port)?; + info!("Sending get header"); + let res = mock_validator.do_get_header(Some(new_mux_pubkey.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(default_relay_state.received_get_header(), 1); // default relay was used + assert_eq!(mux_relay_state.received_get_header(), 0); // mux relay was not used + + // Wait for the first refresh to complete + let wait_for_refresh_time = Duration::from_secs(2); + tokio::time::sleep(wait_for_refresh_time).await; + + // Check the logs to ensure a refresh happened + assert!(logs_contain(&format!("fetched 1 pubkeys for registry mux {mux_relay_id}"))); + assert!(!logs_contain(&format!("fetched 2 pubkeys for registry mux {mux_relay_id}"))); + assert!(!logs_contain("adding new pubkey")); + + // Add another validator + { + let mut validators = mock_ssv_state.validators.write().await; + validators.push(SSVValidator { pubkey: new_mux_pubkey.clone() }); + info!("Added new validator {new_mux_pubkey} to the SSV mock server"); + } + + // Wait for the next refresh to complete + tokio::time::sleep(wait_for_refresh_time).await; + + // Check the logs to ensure the new pubkey was added + assert!(logs_contain(&format!("fetched 2 pubkeys for registry mux {mux_relay_id}"))); + + // Try to run a get_header on the new pubkey - now it should use the mux relay + let res = mock_validator.do_get_header(Some(new_mux_pubkey.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(default_relay_state.received_get_header(), 1); // default relay was not used here + assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was used + + // Now try to do a get_header with the old pubkey - it should only use the + // default relay + let res = mock_validator.do_get_header(Some(default_pubkey.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(default_relay_state.received_get_header(), 2); // default relay was used + assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used + + // Finally, remove the original mux pubkey from the SSV server + { + let mut validators = mock_ssv_state.validators.write().await; + validators.retain(|v| v.pubkey != existing_mux_pubkey); + info!("Removed existing validator {existing_mux_pubkey} from the SSV mock server"); + } + + // Wait for the next refresh to complete + tokio::time::sleep(wait_for_refresh_time).await; + + // Try to do a get_header with the removed pubkey - it should only use the + // default relay + let res = mock_validator.do_get_header(Some(existing_mux_pubkey.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(default_relay_state.received_get_header(), 3); // default relay was used + assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used + + // Shut down the server handles + pbs_server.abort(); + ssv_server_handle.abort(); + default_relay_task.abort(); + mux_relay_task.abort(); + + Ok(()) +} From d1fe243b947dc8f68a6f656ed89b6acdc4b96ced Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 21 Oct 2025 13:21:19 -0400 Subject: [PATCH 07/14] Update to v0.9.0 (#394) --- Cargo.lock | 20 ++++++++++---------- Cargo.toml | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 906a8507..f034c39c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1575,7 +1575,7 @@ dependencies = [ [[package]] name = "cb-bench-pbs" -version = "0.8.1-rc.4" +version = "0.9.0" dependencies = [ "alloy", "cb-common", @@ -1592,7 +1592,7 @@ dependencies = [ [[package]] name = "cb-cli" -version = "0.8.1-rc.4" +version = "0.9.0" dependencies = [ "cb-common", "clap", @@ -1604,7 +1604,7 @@ dependencies = [ [[package]] name = "cb-common" -version = "0.8.1-rc.4" +version = "0.9.0" dependencies = [ "aes 0.8.4", "alloy", @@ -1650,7 +1650,7 @@ dependencies = [ [[package]] name = "cb-metrics" -version = "0.8.1-rc.4" +version = "0.9.0" dependencies = [ "axum 0.8.4", "cb-common", @@ -1663,7 +1663,7 @@ dependencies = [ [[package]] name = "cb-pbs" -version = "0.8.1-rc.4" +version = "0.9.0" dependencies = [ "alloy", "async-trait", @@ -1689,7 +1689,7 @@ dependencies = [ [[package]] name = "cb-signer" -version = "0.8.1-rc.4" +version = "0.9.0" dependencies = [ "alloy", "axum 0.8.4", @@ -1718,7 +1718,7 @@ dependencies = [ [[package]] name = "cb-tests" -version = "0.8.1-rc.4" +version = "0.9.0" dependencies = [ "alloy", "axum 0.8.4", @@ -1877,7 +1877,7 @@ dependencies = [ [[package]] name = "commit-boost" -version = "0.8.1-rc.4" +version = "0.9.0" dependencies = [ "cb-cli", "cb-common", @@ -2165,7 +2165,7 @@ dependencies = [ [[package]] name = "da_commit" -version = "0.8.1-rc.4" +version = "0.9.0" dependencies = [ "alloy", "color-eyre", @@ -6076,7 +6076,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "status_api" -version = "0.8.1-rc.4" +version = "0.9.0" dependencies = [ "async-trait", "axum 0.8.4", diff --git a/Cargo.toml b/Cargo.toml index 5e89d803..68cb9e27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ resolver = "2" [workspace.package] edition = "2024" rust-version = "1.89" -version = "0.8.1-rc.4" +version = "0.9.0" [workspace.dependencies] aes = "0.8" From ade0a8f2aea42efe53b47801bc0778a85317d245 Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Wed, 22 Oct 2025 10:29:02 -0400 Subject: [PATCH 08/14] Support SSV API URLs without trailing slashes (#396) --- crates/common/src/config/mux.rs | 11 ++++++++++- crates/common/src/config/pbs.rs | 2 +- tests/src/mock_ssv.rs | 5 ++++- tests/tests/pbs_mux.rs | 10 ++++++---- tests/tests/pbs_mux_refresh.rs | 3 ++- 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 86318174..d26b7edf 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -582,7 +582,7 @@ async fn fetch_lido_registry_keys( } async fn fetch_ssv_pubkeys( - api_url: Url, + mut api_url: Url, chain: Chain, node_operator_id: U256, http_timeout: Duration, @@ -599,6 +599,15 @@ async fn fetch_ssv_pubkeys( let mut pubkeys: Vec = vec![]; let mut page = 1; + // Validate the URL - this appends a trailing slash if missing as efficiently as + // possible + if !api_url.path().ends_with('/') { + match api_url.path_segments_mut() { + Ok(mut segments) => segments.push(""), // Analogous to a trailing slash + Err(_) => bail!("SSV API URL is not a valid base URL"), + }; + } + loop { let route = format!( "{chain_name}/validators/in_operator/{node_operator_id}?perPage={MAX_PER_PAGE}&page={page}", diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index c2c30d2f..7bcf91e3 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -404,5 +404,5 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC /// Default URL for the SSV network API fn default_ssv_api_url() -> Url { - Url::parse("https://api.ssv.network/api/v4").expect("default URL is valid") + Url::parse("https://api.ssv.network/api/v4/").expect("default URL is valid") } diff --git a/tests/src/mock_ssv.rs b/tests/src/mock_ssv.rs index 3fa12546..7ed8eb23 100644 --- a/tests/src/mock_ssv.rs +++ b/tests/src/mock_ssv.rs @@ -37,7 +37,10 @@ pub async fn create_mock_ssv_server( force_timeout: Arc::new(RwLock::new(false)), }); let router = axum::Router::new() - .route("/{chain_name}/validators/in_operator/{node_operator_id}", get(handle_validators)) + .route( + "/api/v4/{chain_name}/validators/in_operator/{node_operator_id}", + get(handle_validators), + ) .route("/big_data", get(handle_big_data)) .with_state(state) .into_make_service(); diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 5cde1158..3a15b49b 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -29,8 +29,9 @@ async fn test_ssv_network_fetch() -> Result<()> { // Start the mock server let port = 30100; let _server_handle = create_mock_ssv_server(port, None).await?; - let url = Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) - .unwrap(); + let url = + Url::parse(&format!("http://localhost:{port}/api/v4/test_chain/validators/in_operator/1")) + .unwrap(); let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)).await?; @@ -100,8 +101,9 @@ async fn test_ssv_network_fetch_timeout() -> Result<()> { force_timeout: Arc::new(RwLock::new(true)), }; let server_handle = create_mock_ssv_server(port, Some(state)).await?; - let url = Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) - .unwrap(); + let url = + Url::parse(&format!("http://localhost:{port}/api/v4/test_chain/validators/in_operator/1")) + .unwrap(); let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; // The response should fail due to timeout diff --git a/tests/tests/pbs_mux_refresh.rs b/tests/tests/pbs_mux_refresh.rs index 1d4eb3ac..da582ec7 100644 --- a/tests/tests/pbs_mux_refresh.rs +++ b/tests/tests/pbs_mux_refresh.rs @@ -42,7 +42,8 @@ async fn test_auto_refresh() -> Result<()> { // Start the mock SSV API server let ssv_api_port = pbs_port + 1; - let ssv_api_url = Url::parse(&format!("http://localhost:{ssv_api_port}"))?; + // Intentionally missing a trailing slash to ensure this is handled properly + let ssv_api_url = Url::parse(&format!("http://localhost:{ssv_api_port}/api/v4"))?; let mock_ssv_state = SsvMockState { validators: Arc::new(RwLock::new(vec![SSVValidator { pubkey: existing_mux_pubkey.clone(), From 8bf5e09f3679a18a70542f282bf8f45b1e5e205b Mon Sep 17 00:00:00 2001 From: Nils Effinghausen Date: Mon, 27 Oct 2025 20:58:17 +0100 Subject: [PATCH 09/14] feat(docs): add submodule initialization (#391) --- docs/docs/get_started/overview.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/docs/get_started/overview.md b/docs/docs/get_started/overview.md index b9ea1d7f..e5209f09 100644 --- a/docs/docs/get_started/overview.md +++ b/docs/docs/get_started/overview.md @@ -40,6 +40,9 @@ git clone https://github.com/Commit-Boost/commit-boost-client # Stable branch has the latest released version git checkout stable + +# Init submodules +git submodule update --init --recursive ``` :::note From 65fa572b50a49cfe45d4e550f5f3aaa600bcb7fa Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 28 Oct 2025 09:09:24 -0400 Subject: [PATCH 10/14] Moved Lido implementation to interop --- crates/common/src/config/mux.rs | 273 +----------------------- crates/common/src/interop/lido/mod.rs | 2 + crates/common/src/interop/lido/types.rs | 15 ++ crates/common/src/interop/lido/utils.rs | 268 +++++++++++++++++++++++ crates/common/src/interop/mod.rs | 1 + 5 files changed, 289 insertions(+), 270 deletions(-) create mode 100644 crates/common/src/interop/lido/mod.rs create mode 100644 crates/common/src/interop/lido/types.rs create mode 100644 crates/common/src/interop/lido/utils.rs diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index d26b7edf..7922ee8b 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -6,12 +6,10 @@ use std::{ time::Duration, }; -use LidoCSMRegistry::getNodeOperatorSummaryReturn; use alloy::{ - primitives::{Address, Bytes, U256, address}, + primitives::{Address, Bytes, U256}, providers::ProviderBuilder, rpc::{client::RpcClient, types::beacon::constants::BLS_PUBLIC_KEY_BYTES_LEN}, - sol, transports::http::Http, }; use eyre::{Context, bail, ensure}; @@ -23,9 +21,9 @@ use url::Url; use super::{MUX_PATH_ENV, PbsConfig, RelayConfig, load_optional_env_var}; use crate::{ config::{remove_duplicate_keys, safe_read_http_response}, - interop::ssv::utils::fetch_ssv_pubkeys_from_url, + interop::{lido::utils::*, ssv::utils::*}, pbs::RelayClient, - types::{BlsPublicKey, Chain, HoleskyLidoModule, HoodiLidoModule, MainnetLidoModule}, + types::{BlsPublicKey, Chain}, utils::default_bool, }; @@ -291,194 +289,6 @@ fn get_mux_path(mux_id: &str) -> String { format!("/{mux_id}-mux_keys.json") } -sol! { - #[allow(missing_docs)] - #[sol(rpc)] - LidoRegistry, - "src/abi/LidoNORegistry.json" -} - -sol! { - #[allow(missing_docs)] - #[sol(rpc)] - LidoCSMRegistry, - "src/abi/LidoCSModuleNORegistry.json" -} - -fn lido_registry_addresses_by_module() -> HashMap> { - let mut map: HashMap> = HashMap::new(); - - // --- Mainnet --- - let mut mainnet = HashMap::new(); - mainnet.insert( - MainnetLidoModule::Curated as u8, - address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5"), - ); - mainnet.insert( - MainnetLidoModule::SimpleDVT as u8, - address!("aE7B191A31f627b4eB1d4DaC64eaB9976995b433"), - ); - mainnet.insert( - MainnetLidoModule::CommunityStaking as u8, - address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), - ); - map.insert(Chain::Mainnet, mainnet); - - // --- Holesky --- - let mut holesky = HashMap::new(); - holesky.insert( - HoleskyLidoModule::Curated as u8, - address!("595F64Ddc3856a3b5Ff4f4CC1d1fb4B46cFd2bAC"), - ); - holesky.insert( - HoleskyLidoModule::SimpleDVT as u8, - address!("11a93807078f8BB880c1BD0ee4C387537de4b4b6"), - ); - holesky.insert( - HoleskyLidoModule::Sandbox as u8, - address!("D6C2ce3BB8bea2832496Ac8b5144819719f343AC"), - ); - holesky.insert( - HoleskyLidoModule::CommunityStaking as u8, - address!("4562c3e63c2e586cD1651B958C22F88135aCAd4f"), - ); - map.insert(Chain::Holesky, holesky); - - // --- Hoodi --- - let mut hoodi = HashMap::new(); - hoodi.insert( - HoodiLidoModule::Curated as u8, - address!("5cDbE1590c083b5A2A64427fAA63A7cfDB91FbB5"), - ); - hoodi.insert( - HoodiLidoModule::SimpleDVT as u8, - address!("0B5236BECA68004DB89434462DfC3BB074d2c830"), - ); - hoodi.insert( - HoodiLidoModule::Sandbox as u8, - address!("682E94d2630846a503BDeE8b6810DF71C9806891"), - ); - hoodi.insert( - HoodiLidoModule::CommunityStaking as u8, - address!("79CEf36D84743222f37765204Bec41E92a93E59d"), - ); - map.insert(Chain::Hoodi, hoodi); - - // --- Sepolia -- - let mut sepolia = HashMap::new(); - sepolia.insert(1, address!("33d6E15047E8644F8DDf5CD05d202dfE587DA6E3")); - map.insert(Chain::Sepolia, sepolia); - - map -} - -// Fetching appropiate registry address -fn lido_registry_address(chain: Chain, lido_module_id: u8) -> eyre::Result

{ - lido_registry_addresses_by_module() - .get(&chain) - .ok_or_else(|| eyre::eyre!("Lido registry not supported for chain: {chain:?}"))? - .get(&lido_module_id) - .copied() - .ok_or_else(|| { - eyre::eyre!("Lido module id {:?} not found for chain: {chain:?}", lido_module_id) - }) -} - -fn is_csm_module(chain: Chain, module_id: u8) -> bool { - match chain { - Chain::Mainnet => module_id == MainnetLidoModule::CommunityStaking as u8, - Chain::Holesky => module_id == HoleskyLidoModule::CommunityStaking as u8, - Chain::Hoodi => module_id == HoodiLidoModule::CommunityStaking as u8, - _ => false, - } -} - -fn get_lido_csm_registry

( - registry_address: Address, - provider: P, -) -> LidoCSMRegistry::LidoCSMRegistryInstance

-where - P: Clone + Send + Sync + 'static + alloy::providers::Provider, -{ - LidoCSMRegistry::new(registry_address, provider) -} - -fn get_lido_module_registry

( - registry_address: Address, - provider: P, -) -> LidoRegistry::LidoRegistryInstance

-where - P: Clone + Send + Sync + 'static + alloy::providers::Provider, -{ - LidoRegistry::new(registry_address, provider) -} - -async fn fetch_lido_csm_keys_total

( - registry: &LidoCSMRegistry::LidoCSMRegistryInstance

, - node_operator_id: U256, -) -> eyre::Result -where - P: Clone + Send + Sync + 'static + alloy::providers::Provider, -{ - let summary: getNodeOperatorSummaryReturn = - registry.getNodeOperatorSummary(node_operator_id).call().await?; - - let total_u256 = summary.totalDepositedValidators + summary.depositableValidatorsCount; - - let total_u64 = u64::try_from(total_u256) - .wrap_err_with(|| format!("total keys ({total_u256}) does not fit into u64"))?; - - Ok(total_u64) -} - -async fn fetch_lido_module_keys_total

( - registry: &LidoRegistry::LidoRegistryInstance

, - node_operator_id: U256, -) -> eyre::Result -where - P: Clone + Send + Sync + 'static + alloy::providers::Provider, -{ - let total_keys: u64 = - registry.getTotalSigningKeyCount(node_operator_id).call().await?.try_into()?; - - Ok(total_keys) -} - -async fn fetch_lido_csm_keys_batch

( - registry: &LidoCSMRegistry::LidoCSMRegistryInstance

, - node_operator_id: U256, - offset: u64, - limit: u64, -) -> eyre::Result -where - P: Clone + Send + Sync + 'static + alloy::providers::Provider, -{ - let pubkeys = registry - .getSigningKeys(node_operator_id, U256::from(offset), U256::from(limit)) - .call() - .await?; - - Ok(pubkeys) -} - -async fn fetch_lido_module_keys_batch

( - registry: &LidoRegistry::LidoRegistryInstance

, - node_operator_id: U256, - offset: u64, - limit: u64, -) -> eyre::Result -where - P: Clone + Send + Sync + 'static + alloy::providers::Provider, -{ - let pubkeys = registry - .getSigningKeys(node_operator_id, U256::from(offset), U256::from(limit)) - .call() - .await? - .pubkeys; - - Ok(pubkeys) -} - async fn collect_registry_keys( total_keys: u64, mut fetch_batch: F, @@ -634,80 +444,3 @@ async fn fetch_ssv_pubkeys( Ok(pubkeys) } - -#[cfg(test)] -mod tests { - use alloy::{primitives::U256, providers::ProviderBuilder}; - use url::Url; - - use super::*; - - #[tokio::test] - async fn test_lido_registry_address() -> eyre::Result<()> { - let url = Url::parse("https://ethereum-rpc.publicnode.com")?; - let provider = ProviderBuilder::new().connect_http(url); - - let registry = - LidoRegistry::new(address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5"), provider); - - const LIMIT: usize = 3; - let node_operator_id = U256::from(1); - - let total_keys: u64 = - registry.getTotalSigningKeyCount(node_operator_id).call().await?.try_into()?; - - assert!(total_keys > LIMIT as u64); - - let pubkeys = registry - .getSigningKeys(node_operator_id, U256::ZERO, U256::from(LIMIT)) - .call() - .await? - .pubkeys; - - let mut vec = vec![]; - for chunk in pubkeys.chunks(BLS_PUBLIC_KEY_BYTES_LEN) { - vec.push( - BlsPublicKey::deserialize(chunk) - .map_err(|_| eyre::eyre!("invalid BLS public key"))?, - ); - } - - assert_eq!(vec.len(), LIMIT); - - Ok(()) - } - - #[tokio::test] - async fn test_lido_csm_registry_address() -> eyre::Result<()> { - let url = Url::parse("https://ethereum-rpc.publicnode.com")?; - let provider = ProviderBuilder::new().connect_http(url); - - let registry = - LidoCSMRegistry::new(address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), provider); - - const LIMIT: usize = 3; - let node_operator_id = U256::from(1); - - let summary = registry.getNodeOperatorSummary(node_operator_id).call().await?; - - let total_keys_u256 = summary.totalDepositedValidators + summary.depositableValidatorsCount; - let total_keys: u64 = total_keys_u256.try_into()?; - - assert!(total_keys > LIMIT as u64, "expected more than {LIMIT} keys, got {total_keys}"); - - let pubkeys = - registry.getSigningKeys(node_operator_id, U256::ZERO, U256::from(LIMIT)).call().await?; - - let mut vec = Vec::new(); - for chunk in pubkeys.chunks(BLS_PUBLIC_KEY_BYTES_LEN) { - vec.push( - BlsPublicKey::deserialize(chunk) - .map_err(|_| eyre::eyre!("invalid BLS public key"))?, - ); - } - - assert_eq!(vec.len(), LIMIT, "expected {LIMIT} keys, got {}", vec.len()); - - Ok(()) - } -} diff --git a/crates/common/src/interop/lido/mod.rs b/crates/common/src/interop/lido/mod.rs new file mode 100644 index 00000000..b4ab6a6a --- /dev/null +++ b/crates/common/src/interop/lido/mod.rs @@ -0,0 +1,2 @@ +pub mod types; +pub mod utils; diff --git a/crates/common/src/interop/lido/types.rs b/crates/common/src/interop/lido/types.rs new file mode 100644 index 00000000..48aad122 --- /dev/null +++ b/crates/common/src/interop/lido/types.rs @@ -0,0 +1,15 @@ +use alloy::sol; + +sol! { + #[allow(missing_docs)] + #[sol(rpc)] + LidoRegistry, + "src/abi/LidoNORegistry.json" +} + +sol! { + #[allow(missing_docs)] + #[sol(rpc)] + LidoCSMRegistry, + "src/abi/LidoCSModuleNORegistry.json" +} diff --git a/crates/common/src/interop/lido/utils.rs b/crates/common/src/interop/lido/utils.rs new file mode 100644 index 00000000..27ece34b --- /dev/null +++ b/crates/common/src/interop/lido/utils.rs @@ -0,0 +1,268 @@ +use std::collections::HashMap; + +use alloy::primitives::{Address, Bytes, U256, address}; +use eyre::Context; + +use crate::{ + interop::lido::types::{ + LidoCSMRegistry::{self, getNodeOperatorSummaryReturn}, + LidoRegistry, + }, + types::{Chain, HoleskyLidoModule, HoodiLidoModule, MainnetLidoModule}, +}; + +pub fn lido_registry_addresses_by_module() -> HashMap> { + let mut map: HashMap> = HashMap::new(); + + // --- Mainnet --- + let mut mainnet = HashMap::new(); + mainnet.insert( + MainnetLidoModule::Curated as u8, + address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5"), + ); + mainnet.insert( + MainnetLidoModule::SimpleDVT as u8, + address!("aE7B191A31f627b4eB1d4DaC64eaB9976995b433"), + ); + mainnet.insert( + MainnetLidoModule::CommunityStaking as u8, + address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), + ); + map.insert(Chain::Mainnet, mainnet); + + // --- Holesky --- + let mut holesky = HashMap::new(); + holesky.insert( + HoleskyLidoModule::Curated as u8, + address!("595F64Ddc3856a3b5Ff4f4CC1d1fb4B46cFd2bAC"), + ); + holesky.insert( + HoleskyLidoModule::SimpleDVT as u8, + address!("11a93807078f8BB880c1BD0ee4C387537de4b4b6"), + ); + holesky.insert( + HoleskyLidoModule::Sandbox as u8, + address!("D6C2ce3BB8bea2832496Ac8b5144819719f343AC"), + ); + holesky.insert( + HoleskyLidoModule::CommunityStaking as u8, + address!("4562c3e63c2e586cD1651B958C22F88135aCAd4f"), + ); + map.insert(Chain::Holesky, holesky); + + // --- Hoodi --- + let mut hoodi = HashMap::new(); + hoodi.insert( + HoodiLidoModule::Curated as u8, + address!("5cDbE1590c083b5A2A64427fAA63A7cfDB91FbB5"), + ); + hoodi.insert( + HoodiLidoModule::SimpleDVT as u8, + address!("0B5236BECA68004DB89434462DfC3BB074d2c830"), + ); + hoodi.insert( + HoodiLidoModule::Sandbox as u8, + address!("682E94d2630846a503BDeE8b6810DF71C9806891"), + ); + hoodi.insert( + HoodiLidoModule::CommunityStaking as u8, + address!("79CEf36D84743222f37765204Bec41E92a93E59d"), + ); + map.insert(Chain::Hoodi, hoodi); + + // --- Sepolia -- + let mut sepolia = HashMap::new(); + sepolia.insert(1, address!("33d6E15047E8644F8DDf5CD05d202dfE587DA6E3")); + map.insert(Chain::Sepolia, sepolia); + + map +} + +// Fetching appropiate registry address +pub fn lido_registry_address(chain: Chain, lido_module_id: u8) -> eyre::Result

{ + lido_registry_addresses_by_module() + .get(&chain) + .ok_or_else(|| eyre::eyre!("Lido registry not supported for chain: {chain:?}"))? + .get(&lido_module_id) + .copied() + .ok_or_else(|| { + eyre::eyre!("Lido module id {:?} not found for chain: {chain:?}", lido_module_id) + }) +} + +pub fn is_csm_module(chain: Chain, module_id: u8) -> bool { + match chain { + Chain::Mainnet => module_id == MainnetLidoModule::CommunityStaking as u8, + Chain::Holesky => module_id == HoleskyLidoModule::CommunityStaking as u8, + Chain::Hoodi => module_id == HoodiLidoModule::CommunityStaking as u8, + _ => false, + } +} + +pub fn get_lido_csm_registry

( + registry_address: Address, + provider: P, +) -> LidoCSMRegistry::LidoCSMRegistryInstance

+where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + LidoCSMRegistry::new(registry_address, provider) +} + +pub fn get_lido_module_registry

( + registry_address: Address, + provider: P, +) -> LidoRegistry::LidoRegistryInstance

+where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + LidoRegistry::new(registry_address, provider) +} + +pub async fn fetch_lido_csm_keys_total

( + registry: &LidoCSMRegistry::LidoCSMRegistryInstance

, + node_operator_id: U256, +) -> eyre::Result +where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + let summary: getNodeOperatorSummaryReturn = + registry.getNodeOperatorSummary(node_operator_id).call().await?; + + let total_u256 = summary.totalDepositedValidators + summary.depositableValidatorsCount; + + let total_u64 = u64::try_from(total_u256) + .wrap_err_with(|| format!("total keys ({total_u256}) does not fit into u64"))?; + + Ok(total_u64) +} + +pub async fn fetch_lido_module_keys_total

( + registry: &LidoRegistry::LidoRegistryInstance

, + node_operator_id: U256, +) -> eyre::Result +where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + let total_keys: u64 = + registry.getTotalSigningKeyCount(node_operator_id).call().await?.try_into()?; + + Ok(total_keys) +} + +pub async fn fetch_lido_csm_keys_batch

( + registry: &LidoCSMRegistry::LidoCSMRegistryInstance

, + node_operator_id: U256, + offset: u64, + limit: u64, +) -> eyre::Result +where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + let pubkeys = registry + .getSigningKeys(node_operator_id, U256::from(offset), U256::from(limit)) + .call() + .await?; + + Ok(pubkeys) +} + +pub async fn fetch_lido_module_keys_batch

( + registry: &LidoRegistry::LidoRegistryInstance

, + node_operator_id: U256, + offset: u64, + limit: u64, +) -> eyre::Result +where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + let pubkeys = registry + .getSigningKeys(node_operator_id, U256::from(offset), U256::from(limit)) + .call() + .await? + .pubkeys; + + Ok(pubkeys) +} + +#[cfg(test)] +mod tests { + use alloy::{ + primitives::{U256, address}, + providers::ProviderBuilder, + rpc::types::beacon::constants::BLS_PUBLIC_KEY_BYTES_LEN, + }; + use url::Url; + + use super::*; + use crate::{interop::lido::types::LidoRegistry, types::BlsPublicKey}; + + #[tokio::test] + async fn test_lido_registry_address() -> eyre::Result<()> { + let url = Url::parse("https://ethereum-rpc.publicnode.com")?; + let provider = ProviderBuilder::new().connect_http(url); + + let registry = + LidoRegistry::new(address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5"), provider); + + const LIMIT: usize = 3; + let node_operator_id = U256::from(1); + + let total_keys: u64 = + registry.getTotalSigningKeyCount(node_operator_id).call().await?.try_into()?; + + assert!(total_keys > LIMIT as u64); + + let pubkeys = registry + .getSigningKeys(node_operator_id, U256::ZERO, U256::from(LIMIT)) + .call() + .await? + .pubkeys; + + let mut vec = vec![]; + for chunk in pubkeys.chunks(BLS_PUBLIC_KEY_BYTES_LEN) { + vec.push( + BlsPublicKey::deserialize(chunk) + .map_err(|_| eyre::eyre!("invalid BLS public key"))?, + ); + } + + assert_eq!(vec.len(), LIMIT); + + Ok(()) + } + + #[tokio::test] + async fn test_lido_csm_registry_address() -> eyre::Result<()> { + let url = Url::parse("https://ethereum-rpc.publicnode.com")?; + let provider = ProviderBuilder::new().connect_http(url); + + let registry = + LidoCSMRegistry::new(address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), provider); + + const LIMIT: usize = 3; + let node_operator_id = U256::from(1); + + let summary = registry.getNodeOperatorSummary(node_operator_id).call().await?; + + let total_keys_u256 = summary.totalDepositedValidators + summary.depositableValidatorsCount; + let total_keys: u64 = total_keys_u256.try_into()?; + + assert!(total_keys > LIMIT as u64, "expected more than {LIMIT} keys, got {total_keys}"); + + let pubkeys = + registry.getSigningKeys(node_operator_id, U256::ZERO, U256::from(LIMIT)).call().await?; + + let mut vec = Vec::new(); + for chunk in pubkeys.chunks(BLS_PUBLIC_KEY_BYTES_LEN) { + vec.push( + BlsPublicKey::deserialize(chunk) + .map_err(|_| eyre::eyre!("invalid BLS public key"))?, + ); + } + + assert_eq!(vec.len(), LIMIT, "expected {LIMIT} keys, got {}", vec.len()); + + Ok(()) + } +} diff --git a/crates/common/src/interop/mod.rs b/crates/common/src/interop/mod.rs index 42502f6f..4d0230a9 100644 --- a/crates/common/src/interop/mod.rs +++ b/crates/common/src/interop/mod.rs @@ -1 +1,2 @@ +pub mod lido; pub mod ssv; From 253a91c51445a7cb51ad6689b711d5bd1bb9d31f Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 28 Oct 2025 09:33:08 -0400 Subject: [PATCH 11/14] Moved the Lido address map into a static var --- Cargo.lock | 1 + crates/common/Cargo.toml | 1 + crates/common/src/config/mux.rs | 1 - crates/common/src/interop/lido/utils.rs | 135 ++++++++++++------------ 4 files changed, 71 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f034c39c..80c25cf0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1625,6 +1625,7 @@ dependencies = [ "eyre", "futures", "jsonwebtoken", + "lazy_static", "pbkdf2 0.12.2", "rand 0.9.2", "rayon", diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 57a5fbb3..35367a12 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -26,6 +26,7 @@ ethereum_ssz_derive.workspace = true eyre.workspace = true futures.workspace = true jsonwebtoken.workspace = true +lazy_static.workspace = true lh_eth2.workspace = true lh_eth2_keystore.workspace = true lh_types.workspace = true diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 7922ee8b..27950d1c 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -344,7 +344,6 @@ async fn fetch_lido_csm_registry_keys( ) -> eyre::Result> { let provider = ProviderBuilder::new().connect_client(rpc_client); let registry = get_lido_csm_registry(registry_address, provider); - let total_keys = fetch_lido_csm_keys_total(®istry, node_operator_id).await?; collect_registry_keys(total_keys, |offset, limit| { diff --git a/crates/common/src/interop/lido/utils.rs b/crates/common/src/interop/lido/utils.rs index 27ece34b..02ff7c42 100644 --- a/crates/common/src/interop/lido/utils.rs +++ b/crates/common/src/interop/lido/utils.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use alloy::primitives::{Address, Bytes, U256, address}; use eyre::Context; +use lazy_static::lazy_static; use crate::{ interop::lido::types::{ @@ -11,76 +12,78 @@ use crate::{ types::{Chain, HoleskyLidoModule, HoodiLidoModule, MainnetLidoModule}, }; -pub fn lido_registry_addresses_by_module() -> HashMap> { - let mut map: HashMap> = HashMap::new(); - - // --- Mainnet --- - let mut mainnet = HashMap::new(); - mainnet.insert( - MainnetLidoModule::Curated as u8, - address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5"), - ); - mainnet.insert( - MainnetLidoModule::SimpleDVT as u8, - address!("aE7B191A31f627b4eB1d4DaC64eaB9976995b433"), - ); - mainnet.insert( - MainnetLidoModule::CommunityStaking as u8, - address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), - ); - map.insert(Chain::Mainnet, mainnet); - - // --- Holesky --- - let mut holesky = HashMap::new(); - holesky.insert( - HoleskyLidoModule::Curated as u8, - address!("595F64Ddc3856a3b5Ff4f4CC1d1fb4B46cFd2bAC"), - ); - holesky.insert( - HoleskyLidoModule::SimpleDVT as u8, - address!("11a93807078f8BB880c1BD0ee4C387537de4b4b6"), - ); - holesky.insert( - HoleskyLidoModule::Sandbox as u8, - address!("D6C2ce3BB8bea2832496Ac8b5144819719f343AC"), - ); - holesky.insert( - HoleskyLidoModule::CommunityStaking as u8, - address!("4562c3e63c2e586cD1651B958C22F88135aCAd4f"), - ); - map.insert(Chain::Holesky, holesky); - - // --- Hoodi --- - let mut hoodi = HashMap::new(); - hoodi.insert( - HoodiLidoModule::Curated as u8, - address!("5cDbE1590c083b5A2A64427fAA63A7cfDB91FbB5"), - ); - hoodi.insert( - HoodiLidoModule::SimpleDVT as u8, - address!("0B5236BECA68004DB89434462DfC3BB074d2c830"), - ); - hoodi.insert( - HoodiLidoModule::Sandbox as u8, - address!("682E94d2630846a503BDeE8b6810DF71C9806891"), - ); - hoodi.insert( - HoodiLidoModule::CommunityStaking as u8, - address!("79CEf36D84743222f37765204Bec41E92a93E59d"), - ); - map.insert(Chain::Hoodi, hoodi); - - // --- Sepolia -- - let mut sepolia = HashMap::new(); - sepolia.insert(1, address!("33d6E15047E8644F8DDf5CD05d202dfE587DA6E3")); - map.insert(Chain::Sepolia, sepolia); - - map +lazy_static! { + static ref LIDO_REGISTRY_ADDRESSES_BY_MODULE: HashMap> = { + let mut map: HashMap> = HashMap::new(); + + // --- Mainnet --- + let mut mainnet = HashMap::new(); + mainnet.insert( + MainnetLidoModule::Curated as u8, + address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5"), + ); + mainnet.insert( + MainnetLidoModule::SimpleDVT as u8, + address!("aE7B191A31f627b4eB1d4DaC64eaB9976995b433"), + ); + mainnet.insert( + MainnetLidoModule::CommunityStaking as u8, + address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), + ); + map.insert(Chain::Mainnet, mainnet); + + // --- Holesky --- + let mut holesky = HashMap::new(); + holesky.insert( + HoleskyLidoModule::Curated as u8, + address!("595F64Ddc3856a3b5Ff4f4CC1d1fb4B46cFd2bAC"), + ); + holesky.insert( + HoleskyLidoModule::SimpleDVT as u8, + address!("11a93807078f8BB880c1BD0ee4C387537de4b4b6"), + ); + holesky.insert( + HoleskyLidoModule::Sandbox as u8, + address!("D6C2ce3BB8bea2832496Ac8b5144819719f343AC"), + ); + holesky.insert( + HoleskyLidoModule::CommunityStaking as u8, + address!("4562c3e63c2e586cD1651B958C22F88135aCAd4f"), + ); + map.insert(Chain::Holesky, holesky); + + // --- Hoodi --- + let mut hoodi = HashMap::new(); + hoodi.insert( + HoodiLidoModule::Curated as u8, + address!("5cDbE1590c083b5A2A64427fAA63A7cfDB91FbB5"), + ); + hoodi.insert( + HoodiLidoModule::SimpleDVT as u8, + address!("0B5236BECA68004DB89434462DfC3BB074d2c830"), + ); + hoodi.insert( + HoodiLidoModule::Sandbox as u8, + address!("682E94d2630846a503BDeE8b6810DF71C9806891"), + ); + hoodi.insert( + HoodiLidoModule::CommunityStaking as u8, + address!("79CEf36D84743222f37765204Bec41E92a93E59d"), + ); + map.insert(Chain::Hoodi, hoodi); + + // --- Sepolia -- + let mut sepolia = HashMap::new(); + sepolia.insert(1, address!("33d6E15047E8644F8DDf5CD05d202dfE587DA6E3")); + map.insert(Chain::Sepolia, sepolia); + + map + }; } // Fetching appropiate registry address pub fn lido_registry_address(chain: Chain, lido_module_id: u8) -> eyre::Result

{ - lido_registry_addresses_by_module() + LIDO_REGISTRY_ADDRESSES_BY_MODULE .get(&chain) .ok_or_else(|| eyre::eyre!("Lido registry not supported for chain: {chain:?}"))? .get(&lido_module_id) From d42a5f7fb14f97ae7e1e097c89d5e3762f702df0 Mon Sep 17 00:00:00 2001 From: pedro <30592532+PedroCM96@users.noreply.github.com> Date: Tue, 11 Nov 2025 20:54:33 +0100 Subject: [PATCH 12/14] Update crates/common/src/types.rs Co-authored-by: Joe Clapis --- crates/common/src/types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/common/src/types.rs b/crates/common/src/types.rs index 995ceaf8..c16fbf65 100644 --- a/crates/common/src/types.rs +++ b/crates/common/src/types.rs @@ -54,7 +54,7 @@ pub enum HoleskyLidoModule { Curated = 1, SimpleDVT = 2, Sandbox = 3, - CommunityStaking = 4 + CommunityStaking = 4, } pub enum HoodiLidoModule { From 05d4ea9a7aa9393e23d40d13be63df9f24891585 Mon Sep 17 00:00:00 2001 From: pedro <30592532+PedroCM96@users.noreply.github.com> Date: Tue, 11 Nov 2025 20:55:49 +0100 Subject: [PATCH 13/14] Update crates/common/src/types.rs Co-authored-by: Joe Clapis --- crates/common/src/types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/common/src/types.rs b/crates/common/src/types.rs index c16fbf65..e25ffd7c 100644 --- a/crates/common/src/types.rs +++ b/crates/common/src/types.rs @@ -61,7 +61,7 @@ pub enum HoodiLidoModule { Curated = 1, SimpleDVT = 2, Sandbox = 3, - CommunityStaking = 4 + CommunityStaking = 4, } pub type ForkVersion = [u8; 4]; From 28aa0ca6e3794ce1e22208000dc47fb8bbadc7d2 Mon Sep 17 00:00:00 2001 From: pedro <30592532+PedroCM96@users.noreply.github.com> Date: Tue, 11 Nov 2025 20:56:12 +0100 Subject: [PATCH 14/14] Update crates/common/src/types.rs Co-authored-by: Joe Clapis --- crates/common/src/types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/common/src/types.rs b/crates/common/src/types.rs index e25ffd7c..89934471 100644 --- a/crates/common/src/types.rs +++ b/crates/common/src/types.rs @@ -47,7 +47,7 @@ pub enum Chain { pub enum MainnetLidoModule { Curated = 1, SimpleDVT = 2, - CommunityStaking = 3 + CommunityStaking = 3, } pub enum HoleskyLidoModule {