From 762307eb04e79610b9df8070dd96c96ec9978faa Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 30 Sep 2025 10:26:11 -0400 Subject: [PATCH 01/21] Lock swapped --- bin/pbs.rs | 4 +- config.example.toml | 9 +- crates/common/src/config/mux.rs | 34 +++++-- crates/common/src/config/pbs.rs | 73 ++++++++------ crates/common/src/pbs/constants.rs | 2 + crates/pbs/src/routes/get_header.rs | 2 +- crates/pbs/src/routes/register_validator.rs | 2 +- crates/pbs/src/routes/reload.rs | 4 +- crates/pbs/src/routes/status.rs | 2 +- crates/pbs/src/routes/submit_block.rs | 2 +- crates/pbs/src/service.rs | 104 ++++++++++++++++++-- crates/pbs/src/state.rs | 4 +- tests/src/utils.rs | 4 +- tests/tests/pbs_mux.rs | 2 +- 14 files changed, 193 insertions(+), 55 deletions(-) diff --git a/bin/pbs.rs b/bin/pbs.rs index 69945fe8..9c97f973 100644 --- a/bin/pbs.rs +++ b/bin/pbs.rs @@ -1,11 +1,11 @@ use cb_common::{ - config::{LogsSettings, PBS_MODULE_NAME, load_pbs_config}, + config::{LogsSettings, PBS_MODULE_NAME, PbsMuxes, load_pbs_config}, utils::{initialize_tracing_log, wait_for_signal}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use clap::Parser; use eyre::Result; -use tracing::{error, info}; +use tracing::{error, info, warn}; #[tokio::main] async fn main() -> Result<()> { diff --git a/config.example.toml b/config.example.toml index 5936d2d2..2bfb09aa 100644 --- a/config.example.toml +++ b/config.example.toml @@ -64,6 +64,10 @@ register_validator_retry_limit = 3 # Maximum number of validators to register in a single request. # OPTIONAL, DEFAULT: "" (unlimited) validator_registration_batch_size = "" +# For any Registry-based Mux configurations that have dynamic pubkey +# refreshing enabled, this is how often to refresh the list of pubkeys +# from the registry, in seconds +mux_registry_refresh_interval_seconds = 384 # The PBS module needs one or more [[relays]] as defined below. [[relays]] @@ -126,6 +130,7 @@ validator_pubkeys = [ # - Registry: details of a registry to load keys from. Supported registries: # - Lido: NodeOperatorsRegistry # - SSV: SSV API + You can toggle the 'enable_refreshing' flag to let this registry periodically query Lido or SSV and refresh the list of validator pubkeys belonging to the corresponding operator. # # Example JSON list: # [ @@ -135,8 +140,8 @@ validator_pubkeys = [ # OPTIONAL loader = "./tests/data/mux_keys.example.json" # loader = { url = "http://localhost:8000/keys" } -# loader = { registry = "lido", node_operator_id = 8 } -# loader = { registry = "ssv", node_operator_id = 8 } +# loader = { registry = "lido", node_operator_id = 8, enable_refreshing = false } +# loader = { registry = "ssv", node_operator_id = 8, enable_refreshing = false } late_in_slot_time_ms = 1500 timeout_get_header_ms = 900 # For each mux, one or more [[mux.relays]] can be defined, which will be used for the matching validator pubkeys diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 15bf9b7e..9ef2307d 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -1,5 +1,6 @@ use std::{ collections::{HashMap, HashSet}, + hash::Hash, path::{Path, PathBuf}, sync::Arc, time::Duration, @@ -23,9 +24,10 @@ use crate::{ config::{remove_duplicate_keys, safe_read_http_response}, pbs::RelayClient, types::{BlsPublicKey, Chain}, + utils::default_bool, }; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct PbsMuxes { /// List of PBS multiplexers #[serde(rename = "mux")] @@ -44,7 +46,10 @@ impl PbsMuxes { self, chain: Chain, default_pbs: &PbsConfig, - ) -> eyre::Result> { + ) -> eyre::Result<( + HashMap, + HashMap, + )> { let http_timeout = Duration::from_secs(default_pbs.http_timeout_seconds); let mut muxes = self.muxes; @@ -76,6 +81,7 @@ impl PbsMuxes { } let mut configs = HashMap::new(); + let mut registry_muxes = HashMap::new(); // fill the configs using the default pbs config and relay entries for mux in muxes { info!( @@ -102,18 +108,30 @@ impl PbsMuxes { config.validate(chain).await?; let config = Arc::new(config); + // Build the map of pubkeys to mux configs let runtime_config = RuntimeMuxConfig { id: mux.id, config, relays: relay_clients }; for pubkey in mux.validator_pubkeys.into_iter() { configs.insert(pubkey, runtime_config.clone()); } + + // Track registry muxes with refreshing enabled + if let Some(loader) = &mux.loader && + let MuxKeysLoader::Registry { enable_refreshing: true, .. } = loader + { + info!( + "mux {} uses registry loader with dynamic refreshing enabled", + runtime_config.id + ); + registry_muxes.insert(loader.clone(), runtime_config.clone()); + } } - Ok(configs) + Ok((configs, registry_muxes)) } } /// Configuration for the PBS Multiplexer -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct MuxConfig { /// Identifier for this mux config pub id: String, @@ -156,7 +174,7 @@ impl MuxConfig { } } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Hash)] #[serde(untagged)] pub enum MuxKeysLoader { /// A file containing a list of validator pubkeys @@ -167,10 +185,12 @@ pub enum MuxKeysLoader { Registry { registry: NORegistry, node_operator_id: u64, + #[serde(default = "default_bool::")] + enable_refreshing: bool, }, } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Hash)] pub enum NORegistry { #[serde(alias = "lido")] Lido, @@ -210,7 +230,7 @@ impl MuxKeysLoader { .wrap_err("failed to fetch mux keys from HTTP endpoint") } - Self::Registry { registry, node_operator_id } => match registry { + Self::Registry { registry, node_operator_id, enable_refreshing: _ } => match registry { NORegistry::Lido => { let Some(rpc_url) = rpc_url else { bail!("Lido registry requires RPC URL to be set in the PBS config"); diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index dc11a513..06150dd4 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -21,12 +21,12 @@ use super::{ use crate::{ commit::client::SignerClient, config::{ - CONFIG_ENV, MODULE_JWT_ENV, PBS_MODULE_NAME, PbsMuxes, SIGNER_URL_ENV, load_env_var, - load_file_from_env, + CONFIG_ENV, MODULE_JWT_ENV, MuxKeysLoader, PBS_MODULE_NAME, PbsMuxes, SIGNER_URL_ENV, + load_env_var, load_file_from_env, }, pbs::{ - DEFAULT_PBS_PORT, DefaultTimeout, LATE_IN_SLOT_TIME_MS, REGISTER_VALIDATOR_RETRY_LIMIT, - RelayClient, RelayEntry, + DEFAULT_PBS_PORT, DEFAULT_REGISTRY_REFRESH_SECONDS, DefaultTimeout, LATE_IN_SLOT_TIME_MS, + REGISTER_VALIDATOR_RETRY_LIMIT, RelayClient, RelayEntry, }, types::{BlsPublicKey, Chain, Jwt, ModuleId}, utils::{ @@ -133,6 +133,11 @@ pub struct PbsConfig { /// request #[serde(deserialize_with = "empty_string_as_none", default)] pub validator_registration_batch_size: Option, + /// For any Registry-based Mux configurations that have dynamic pubkey + /// refreshing enabled, this is how often to refresh the list of pubkeys + /// from the registry, in seconds + #[serde(default = "default_u64::<{ DEFAULT_REGISTRY_REFRESH_SECONDS }>")] + pub mux_registry_refresh_interval_seconds: u64, } impl PbsConfig { @@ -213,13 +218,15 @@ pub struct PbsModuleConfig { /// List of default relays pub relays: Vec, /// List of all default relays plus additional relays from muxes (based on - /// URL) DO NOT use this for get_header calls, use `relays` or `muxes` + /// URL) DO NOT use this for get_header calls, use `relays` or `mux_lookup` /// instead pub all_relays: Vec, /// Signer client to call Signer API pub signer_client: Option, - /// Muxes config - pub muxes: Option>, + /// List of raw mux details configured, if any + pub registry_muxes: Option>, + /// Lookup of pubkey to mux config + pub mux_lookup: Option>, } fn default_pbs() -> String { @@ -246,19 +253,23 @@ pub async fn load_pbs_config() -> Result { SocketAddr::from((config.pbs.pbs_config.host, config.pbs.pbs_config.port)) }; - let muxes = match config.muxes { - Some(muxes) => { - let mux_configs = muxes.validate_and_fill(config.chain, &config.pbs.pbs_config).await?; - Some(mux_configs) - } - None => None, - }; - + // Get the list of relays from the default config let relay_clients = config.relays.into_iter().map(RelayClient::new).collect::>>()?; let mut all_relays = HashMap::with_capacity(relay_clients.len()); - if let Some(muxes) = &muxes { + // Validate the muxes and build the lookup tables + let (mux_lookup, registry_muxes) = match config.muxes { + Some(muxes) => { + let (mux_lookup, registry_muxes) = + muxes.validate_and_fill(config.chain, &config.pbs.pbs_config).await?; + (Some(mux_lookup), Some(registry_muxes)) + } + None => (None, None), + }; + + // Build the list of all relays, starting with muxes + if let Some(muxes) = &mux_lookup { for (_, mux) in muxes.iter() { for relay in mux.relays.iter() { all_relays.insert(&relay.config.entry.url, relay.clone()); @@ -283,7 +294,8 @@ pub async fn load_pbs_config() -> Result { relays: relay_clients, all_relays, signer_client: None, - muxes, + registry_muxes, + mux_lookup, }) } @@ -319,20 +331,24 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC )) }; - let muxes = match cb_config.muxes { - Some(muxes) => Some( - muxes - .validate_and_fill(cb_config.chain, &cb_config.pbs.static_config.pbs_config) - .await?, - ), - None => None, - }; - + // Get the list of relays from the default config let relay_clients = cb_config.relays.into_iter().map(RelayClient::new).collect::>>()?; let mut all_relays = HashMap::with_capacity(relay_clients.len()); - if let Some(muxes) = &muxes { + // Validate the muxes and build the lookup tables + let (mux_lookup, registry_muxes) = match cb_config.muxes { + Some(muxes) => { + let (mux_lookup, registry_muxes) = muxes + .validate_and_fill(cb_config.chain, &cb_config.pbs.static_config.pbs_config) + .await?; + (Some(mux_lookup), Some(registry_muxes)) + } + None => (None, None), + }; + + // Build the list of all relays, starting with muxes + if let Some(muxes) = &mux_lookup { for (_, mux) in muxes.iter() { for relay in mux.relays.iter() { all_relays.insert(&relay.config.entry.url, relay.clone()); @@ -371,7 +387,8 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC relays: relay_clients, all_relays, signer_client, - muxes, + registry_muxes, + mux_lookup, }, cb_config.pbs.extra, )) diff --git a/crates/common/src/pbs/constants.rs b/crates/common/src/pbs/constants.rs index 0b43da1a..db03e897 100644 --- a/crates/common/src/pbs/constants.rs +++ b/crates/common/src/pbs/constants.rs @@ -34,3 +34,5 @@ pub const LATE_IN_SLOT_TIME_MS: u64 = 2000; // Maximum number of retries for validator registration request per relay pub const REGISTER_VALIDATOR_RETRY_LIMIT: u32 = 3; + +pub const DEFAULT_REGISTRY_REFRESH_SECONDS: u64 = 12 * 32; // One epoch diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 77607c28..55e34d04 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -28,7 +28,7 @@ pub async fn handle_get_header>( tracing::Span::current().record("parent_hash", tracing::field::debug(params.parent_hash)); tracing::Span::current().record("validator", tracing::field::debug(¶ms.pubkey)); - let state = state.read().clone(); + let state = state.read().await.clone(); let ua = get_user_agent(&req_headers); let ms_into_slot = ms_into_slot(params.slot, state.config.chain); diff --git a/crates/pbs/src/routes/register_validator.rs b/crates/pbs/src/routes/register_validator.rs index dad85d27..ba5b3466 100644 --- a/crates/pbs/src/routes/register_validator.rs +++ b/crates/pbs/src/routes/register_validator.rs @@ -16,7 +16,7 @@ pub async fn handle_register_validator>( req_headers: HeaderMap, Json(registrations): Json>, ) -> Result { - let state = state.read().clone(); + let state = state.read().await.clone(); trace!(?registrations); diff --git a/crates/pbs/src/routes/reload.rs b/crates/pbs/src/routes/reload.rs index 86c09273..4f7e6d5e 100644 --- a/crates/pbs/src/routes/reload.rs +++ b/crates/pbs/src/routes/reload.rs @@ -14,7 +14,7 @@ pub async fn handle_reload>( req_headers: HeaderMap, State(state): State>, ) -> Result { - let prev_state = state.read().clone(); + let prev_state = state.read().await.clone(); let ua = get_user_agent(&req_headers); @@ -24,7 +24,7 @@ pub async fn handle_reload>( Ok(new_state) => { info!("config reload successful"); - *state.write() = new_state; + *state.write().await = new_state; BEACON_NODE_STATUS.with_label_values(&["200", RELOAD_ENDPOINT_TAG]).inc(); Ok((StatusCode::OK, "OK")) diff --git a/crates/pbs/src/routes/status.rs b/crates/pbs/src/routes/status.rs index d0f68ac7..b6ac0f91 100644 --- a/crates/pbs/src/routes/status.rs +++ b/crates/pbs/src/routes/status.rs @@ -15,7 +15,7 @@ pub async fn handle_get_status>( req_headers: HeaderMap, State(state): State>, ) -> Result { - let state = state.read().clone(); + let state = state.read().await.clone(); let ua = get_user_agent(&req_headers); diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index 27d2c798..057776ab 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -55,7 +55,7 @@ async fn handle_submit_block_impl>( tracing::Span::current() .record("parent_hash", tracing::field::debug(signed_blinded_block.parent_hash())); - let state = state.read().clone(); + let state = state.read().await.clone(); let now = utcnow_ms(); let slot = signed_blinded_block.slot(); diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 16afb6a7..55b95717 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -1,23 +1,23 @@ -use std::time::Duration; +use std::{collections::HashMap, sync::Arc, time::Duration}; use cb_common::{ + config::PbsModuleConfig, constants::{COMMIT_BOOST_COMMIT, COMMIT_BOOST_VERSION}, pbs::{BUILDER_V1_API_PATH, GET_STATUS_PATH}, types::Chain, }; use cb_metrics::provider::MetricsProvider; use eyre::{Context, Result, bail}; -use parking_lot::RwLock; use prometheus::core::Collector; -use tokio::net::TcpListener; -use tracing::info; +use tokio::{net::TcpListener, sync::RwLock}; +use tracing::{debug, info, warn}; use url::Url; use crate::{ api::BuilderApi, metrics::PBS_METRICS_REGISTRY, routes::create_app_router, - state::{BuilderApiState, PbsState}, + state::{BuilderApiState, PbsState, PbsStateGuard}, }; pub struct PbsService; @@ -25,9 +25,12 @@ pub struct PbsService; impl PbsService { pub async fn run>(state: PbsState) -> Result<()> { let addr = state.config.endpoint; + let registry_refresh_time = + Duration::from_secs(state.config.pbs_config.mux_registry_refresh_interval_seconds); info!(version = COMMIT_BOOST_VERSION, commit_hash = COMMIT_BOOST_COMMIT, ?addr, chain =? state.config.chain, "starting PBS service"); - let app = create_app_router::(RwLock::new(state).into()); + let state = RwLock::new(state).into(); + let app = create_app_router::(state); let listener = TcpListener::bind(addr).await?; let task = @@ -45,6 +48,18 @@ impl PbsService { bail!("PBS server failed to start. Are the relays properly configured?"); } + // Run the registry refresher task + let registry_task = { + let state = state.clone(); + let mut interval = tokio::time::interval(registry_refresh_time); + tokio::spawn(async move { + loop { + interval.tick().await; + Self::refresh_registry_muxes(state.clone()).await; + } + }) + }; + task.await? } @@ -55,4 +70,81 @@ impl PbsService { pub fn init_metrics(network: Chain) -> Result<()> { MetricsProvider::load_and_run(network, PBS_METRICS_REGISTRY.clone()) } + + async fn refresh_registry_muxes(state: PbsStateGuard) { + // Read-only portion + let mut new_pubkeys = HashMap::new(); + { + let state = state.read().await; + let config = &state.config; + + // Short circuit if there aren't any registry muxes with dynamic refreshing + let registry_muxes = match &config.registry_muxes { + Some(muxes) => muxes, + None => return, + }; + + // Initialize an empty lookup if the config doesn't have one yet + let mux_lookup = match &config.mux_lookup { + Some(lookup) => lookup, + None => &HashMap::new(), + }; + + // Go through each registry mux and refresh its pubkeys + let default_pbs = &config.pbs_config; + let http_timeout = Duration::from_secs(default_pbs.http_timeout_seconds); + for (loader, runtime_config) in registry_muxes.iter() { + debug!("refreshing pubkeys for registry mux {}", runtime_config.id); + match loader + .load( + &runtime_config.id, + config.chain, + default_pbs.rpc_url.clone(), + http_timeout, + ) + .await + { + Ok(pubkeys) => { + debug!( + "fetched {} pubkeys for registry mux {}", + pubkeys.len(), + runtime_config.id + ); + + // Add any new pubkeys to the new lookup table + for pubkey in pubkeys { + if mux_lookup.get(&pubkey).is_none() { + new_pubkeys.insert(pubkey, runtime_config.clone()); + } + } + } + Err(err) => { + warn!(%err, "failed to refresh pubkeys for registry mux {}", runtime_config.id); + } + } + } + } + + // Write portion + if new_pubkeys.is_empty() { + return; + } + { + // Since config isn't an RwLock, the option with the least amount of code churn + // is to just clone the whole config and replace the mux_lookup + // field. Cloning the config may be expensive, but this should be a fairly rare + // operation. + let mut state = state.write().await; + let config = state.config.as_ref(); + let new_mux_lookup = match &config.mux_lookup { + Some(existing) => { + new_pubkeys.extend(existing.iter().map(|(k, v)| (k.clone(), v.clone()))); + new_pubkeys + } + None => new_pubkeys, + }; + state.config = + Arc::new(PbsModuleConfig { mux_lookup: Some(new_mux_lookup), ..config.clone() }); + } + } } diff --git a/crates/pbs/src/state.rs b/crates/pbs/src/state.rs index 4403b348..63b3027a 100644 --- a/crates/pbs/src/state.rs +++ b/crates/pbs/src/state.rs @@ -5,7 +5,7 @@ use cb_common::{ pbs::RelayClient, types::BlsPublicKey, }; -use parking_lot::RwLock; +use tokio::sync::RwLock; pub trait BuilderApiState: Clone + Sync + Send + 'static {} impl BuilderApiState for () {} @@ -56,7 +56,7 @@ where &self, pubkey: &BlsPublicKey, ) -> (&PbsConfig, &[RelayClient], Option<&str>) { - match self.config.muxes.as_ref().and_then(|muxes| muxes.get(pubkey)) { + match self.config.mux_lookup.as_ref().and_then(|muxes| muxes.get(pubkey)) { Some(mux) => (&mux.config, mux.relays.as_slice(), Some(&mux.id)), // return only the default relays if there's no match None => (self.pbs_config(), &self.config.relays, None), diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 1c83e0eb..d5e93ff2 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -82,6 +82,7 @@ pub fn get_pbs_static_config(port: u16) -> PbsConfig { http_timeout_seconds: 10, register_validator_retry_limit: u32::MAX, validator_registration_batch_size: None, + mux_registry_refresh_interval_seconds: 5, } } @@ -97,7 +98,8 @@ pub fn to_pbs_config( signer_client: None, all_relays: relays.clone(), relays, - muxes: None, + registry_muxes: None, + mux_lookup: None, } } diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 28a3d369..0d2067a6 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -44,7 +44,7 @@ async fn test_mux() -> Result<()> { // Bind mux to a specific validator key let validator_pubkey = random_secret().public_key(); - config.muxes = Some(HashMap::from([(validator_pubkey.clone(), mux)])); + config.mux_lookup = Some(HashMap::from([(validator_pubkey.clone(), mux)])); // Run PBS service let state = PbsState::new(config); From eebbae65eeb81863ff4e5ad4d6af47c4586ef2be Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 30 Sep 2025 13:57:47 -0400 Subject: [PATCH 02/21] Finished initial registry refresh task --- bin/pbs.rs | 4 ++-- config.example.toml | 2 +- crates/common/src/signer/store.rs | 2 +- crates/pbs/src/service.rs | 28 ++++++++++++++++++---------- examples/status_api/src/main.rs | 5 ++++- tests/tests/payloads.rs | 11 +++++------ 6 files changed, 31 insertions(+), 21 deletions(-) diff --git a/bin/pbs.rs b/bin/pbs.rs index 9c97f973..69945fe8 100644 --- a/bin/pbs.rs +++ b/bin/pbs.rs @@ -1,11 +1,11 @@ use cb_common::{ - config::{LogsSettings, PBS_MODULE_NAME, PbsMuxes, load_pbs_config}, + config::{LogsSettings, PBS_MODULE_NAME, load_pbs_config}, utils::{initialize_tracing_log, wait_for_signal}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use clap::Parser; use eyre::Result; -use tracing::{error, info, warn}; +use tracing::{error, info}; #[tokio::main] async fn main() -> Result<()> { diff --git a/config.example.toml b/config.example.toml index 2bfb09aa..215059ee 100644 --- a/config.example.toml +++ b/config.example.toml @@ -130,7 +130,7 @@ validator_pubkeys = [ # - Registry: details of a registry to load keys from. Supported registries: # - Lido: NodeOperatorsRegistry # - SSV: SSV API - You can toggle the 'enable_refreshing' flag to let this registry periodically query Lido or SSV and refresh the list of validator pubkeys belonging to the corresponding operator. +# You can toggle the 'enable_refreshing' flag to let this registry periodically query Lido or SSV and refresh the list of validator pubkeys belonging to the corresponding operator. # # Example JSON list: # [ diff --git a/crates/common/src/signer/store.rs b/crates/common/src/signer/store.rs index 09b9b91b..7cc0fc17 100644 --- a/crates/common/src/signer/store.rs +++ b/crates/common/src/signer/store.rs @@ -709,7 +709,7 @@ mod test { .join(consensus_signer.pubkey().to_string()) .join("TEST_MODULE") .join("bls") - .join(format!("{}.sig", proxy_signer.pubkey().to_string())) + .join(format!("{}.sig", proxy_signer.pubkey())) ) .unwrap() ) diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 55b95717..185c4953 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use cb_common::{ - config::PbsModuleConfig, + config::{MuxKeysLoader, PbsModuleConfig}, constants::{COMMIT_BOOST_COMMIT, COMMIT_BOOST_VERSION}, pbs::{BUILDER_V1_API_PATH, GET_STATUS_PATH}, types::Chain, @@ -25,12 +25,21 @@ pub struct PbsService; impl PbsService { pub async fn run>(state: PbsState) -> Result<()> { let addr = state.config.endpoint; - let registry_refresh_time = - Duration::from_secs(state.config.pbs_config.mux_registry_refresh_interval_seconds); info!(version = COMMIT_BOOST_VERSION, commit_hash = COMMIT_BOOST_COMMIT, ?addr, chain =? state.config.chain, "starting PBS service"); - let state = RwLock::new(state).into(); - let app = create_app_router::(state); + // Check if refreshing registry muxes is required + let registry_refresh_time = state.config.pbs_config.mux_registry_refresh_interval_seconds; + let mut is_refreshing_required = false; + if state.config.pbs_config.mux_registry_refresh_interval_seconds == 0 { + info!("registry mux refreshing interval is 0; refreshing is disabled"); + } else if let Some(muxes) = &state.config.registry_muxes { + is_refreshing_required = muxes.iter().any(|(loader, _)| { + matches!(loader, MuxKeysLoader::Registry { enable_refreshing: true, .. }) + }); + } + + let state: Arc>> = RwLock::new(state).into(); + let app = create_app_router::(state.clone()); let listener = TcpListener::bind(addr).await?; let task = @@ -49,16 +58,15 @@ impl PbsService { } // Run the registry refresher task - let registry_task = { - let state = state.clone(); - let mut interval = tokio::time::interval(registry_refresh_time); + if is_refreshing_required { + let mut interval = tokio::time::interval(Duration::from_secs(registry_refresh_time)); tokio::spawn(async move { loop { interval.tick().await; Self::refresh_registry_muxes(state.clone()).await; } - }) - }; + }); + } task.await? } diff --git a/examples/status_api/src/main.rs b/examples/status_api/src/main.rs index 7ad9b533..748fd77f 100644 --- a/examples/status_api/src/main.rs +++ b/examples/status_api/src/main.rs @@ -81,7 +81,10 @@ impl BuilderApi for MyBuilderApi { } async fn handle_check(State(state): State>) -> Response { - (StatusCode::OK, format!("Received {count} status requests!", count = state.read().data.get())) + ( + StatusCode::OK, + format!("Received {count} status requests!", count = state.read().await.data.get()), + ) .into_response() } diff --git a/tests/tests/payloads.rs b/tests/tests/payloads.rs index c43df7ef..2a631d22 100644 --- a/tests/tests/payloads.rs +++ b/tests/tests/payloads.rs @@ -30,12 +30,11 @@ fn test_missing_registration_field(field_name: &str) -> String { let mut values: Value = serde_json::from_str(data).unwrap(); // Remove specified field from the first validator's message - if let Value::Array(arr) = &mut values { - if let Some(first_validator) = arr.get_mut(0) { - if let Some(Value::Object(msg_obj)) = first_validator.get_mut("message") { - msg_obj.remove(field_name); - } - } + if let Value::Array(arr) = &mut values && + let Some(first_validator) = arr.get_mut(0) && + let Some(Value::Object(msg_obj)) = first_validator.get_mut("message") + { + msg_obj.remove(field_name); } // This should fail since the field is required From 2f7a94573d74d3f79f58ec3fa8b8feed68d798ab Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Mon, 6 Oct 2025 12:39:45 -0400 Subject: [PATCH 03/21] Started adding state to the SSV mock service --- crates/common/src/config/mux.rs | 130 ++++++++++++++++++++++++-------- crates/common/src/config/pbs.rs | 8 ++ crates/pbs/src/service.rs | 1 + tests/src/utils.rs | 1 + 4 files changed, 108 insertions(+), 32 deletions(-) diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index e5c91587..56f92a4f 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -58,8 +58,15 @@ impl PbsMuxes { ensure!(!mux.relays.is_empty(), "mux config {} must have at least one relay", mux.id); if let Some(loader) = &mux.loader { - let extra_keys = - loader.load(&mux.id, chain, default_pbs.rpc_url.clone(), http_timeout).await?; + let extra_keys = loader + .load( + &mux.id, + chain, + default_pbs.ssv_api_url.clone(), + default_pbs.rpc_url.clone(), + http_timeout, + ) + .await?; mux.validator_pubkeys.extend(extra_keys); } @@ -203,6 +210,7 @@ impl MuxKeysLoader { &self, mux_id: &str, chain: Chain, + ssv_api_url: Option, rpc_url: Option, http_timeout: Duration, ) -> eyre::Result> { @@ -245,7 +253,17 @@ impl MuxKeysLoader { .await } NORegistry::SSV => { - fetch_ssv_pubkeys(chain, U256::from(*node_operator_id), http_timeout).await + let Some(ssv_api_url) = ssv_api_url else { + bail!("SSV registry requires SSV API URL to be set in the PBS config"); + }; + + fetch_ssv_pubkeys( + ssv_api_url, + chain, + U256::from(*node_operator_id), + http_timeout, + ) + .await } }, }?; @@ -354,6 +372,7 @@ async fn fetch_lido_registry_keys( } async fn fetch_ssv_pubkeys( + api_url: Url, chain: Chain, node_operator_id: U256, http_timeout: Duration, @@ -371,11 +390,12 @@ async fn fetch_ssv_pubkeys( let mut page = 1; loop { - let url = format!( - "https://api.ssv.network/api/v4/{chain_name}/validators/in_operator/{node_operator_id}?perPage={MAX_PER_PAGE}&page={page}", + let route = format!( + "{chain_name}/validators/in_operator/{node_operator_id}?perPage={MAX_PER_PAGE}&page={page}", ); + let url = api_url.join(&route).wrap_err("failed to construct SSV API URL")?; - let response = fetch_ssv_pubkeys_from_url(&url, http_timeout).await?; + let response = fetch_ssv_pubkeys_from_url(url, http_timeout).await?; let fetched = response.validators.len(); pubkeys.extend( response.validators.into_iter().map(|v| v.pubkey).collect::>(), @@ -396,10 +416,7 @@ async fn fetch_ssv_pubkeys( Ok(pubkeys) } -async fn fetch_ssv_pubkeys_from_url( - url: &str, - http_timeout: Duration, -) -> eyre::Result { +async fn fetch_ssv_pubkeys_from_url(url: Url, http_timeout: Duration) -> eyre::Result { let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?; let response = client.get(url).send().await.map_err(|e| { if e.is_timeout() { @@ -414,12 +431,13 @@ async fn fetch_ssv_pubkeys_from_url( serde_json::from_slice::(&body_bytes).wrap_err("failed to parse SSV response") } -#[derive(Deserialize)] +#[derive(Deserialize, Serialize)] struct SSVResponse { validators: Vec, pagination: SSVPagination, } +#[derive(Clone)] struct SSVValidator { pubkey: BlsPublicKey, } @@ -443,7 +461,22 @@ impl<'de> Deserialize<'de> for SSVValidator { } } -#[derive(Deserialize)] +impl Serialize for SSVValidator { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + #[derive(Serialize)] + struct SSVValidator { + public_key: String, + } + + let s = SSVValidator { public_key: self.pubkey.as_hex_string() }; + s.serialize(serializer) + } +} + +#[derive(Deserialize, Serialize)] struct SSVPagination { total: usize, } @@ -453,8 +486,8 @@ mod tests { use std::net::SocketAddr; use alloy::{primitives::U256, providers::ProviderBuilder}; - use axum::{response::Response, routing::get}; - use tokio::{net::TcpListener, task::JoinHandle}; + use axum::{extract::State, response::Response, routing::get}; + use tokio::{net::TcpListener, sync::RwLock, task::JoinHandle}; use url::Url; use super::*; @@ -505,10 +538,10 @@ mod tests { async fn test_ssv_network_fetch() -> eyre::Result<()> { // Start the mock server let port = 30100; - let _server_handle = create_mock_server(port).await?; - let url = format!("http://localhost:{port}/ssv"); + let _server_handle = create_mock_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/ssv")).unwrap(); let response = - fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)) + fetch_ssv_pubkeys_from_url(url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)) .await?; // Make sure the response is correct @@ -541,9 +574,9 @@ mod tests { async fn test_ssv_network_fetch_big_data() -> eyre::Result<()> { // Start the mock server let port = 30101; - let _server_handle = create_mock_server(port).await?; - let url = format!("http://localhost:{port}/big_data"); - let response = fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(120)).await; + let _server_handle = create_mock_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); + let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; // The response should fail due to content length being too big match response { @@ -572,10 +605,10 @@ mod tests { async fn test_ssv_network_fetch_timeout() -> eyre::Result<()> { // Start the mock server let port = 30102; - let _server_handle = create_mock_server(port).await?; - let url = format!("http://localhost:{port}/timeout"); + let _server_handle = create_mock_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/timeout")).unwrap(); let response = - fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; + fetch_ssv_pubkeys_from_url(url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; // The response should fail due to timeout assert!(response.is_err(), "Expected timeout error, but got success"); @@ -596,9 +629,9 @@ mod tests { // Start the mock server let port = 30103; set_ignore_content_length(true); - let _server_handle = create_mock_server(port).await?; - let url = format!("http://localhost:{port}/big_data"); - let response = fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(120)).await; + let _server_handle = create_mock_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); + let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; // The response should fail due to the body being too big match response { @@ -621,13 +654,42 @@ mod tests { Ok(()) } + /// State for the mock server + #[derive(Clone)] + pub struct SsvMockState { + /// Chain to use for the mock server + pub chain: Chain, + + /// Node operator ID to use for the mock server + pub node_operator_id: U256, + + /// List of pubkeys for the mock server to return + pub validators: Arc>>, + + /// Whether to force a timeout response to simulate a server error + pub force_timeout: Arc>, + } + /// Creates a simple mock server to simulate the SSV API endpoint under - /// various conditions for testing - async fn create_mock_server(port: u16) -> Result, axum::Error> { + /// various conditions for testing. Note this ignores + async fn create_mock_server( + port: u16, + state: Option, + ) -> Result, axum::Error> { + let data = include_str!("../../../../tests/data/ssv_valid.json"); + let response = + serde_json::from_str::(data).expect("failed to parse test data"); + let state = state.unwrap_or(SsvMockState { + chain: Chain::Hoodi, + node_operator_id: U256::from(16), + validators: Arc::new(RwLock::new(response.validators)), + force_timeout: Arc::new(RwLock::new(false)), + }); let router = axum::Router::new() .route("/ssv", get(handle_ssv)) .route("/big_data", get(handle_big_data)) .route("/timeout", get(handle_timeout)) + .with_state(state) .into_make_service(); let address = SocketAddr::from(([127, 0, 0, 1], port)); @@ -645,15 +707,19 @@ mod tests { } /// Sends the good SSV JSON data to the client - async fn handle_ssv() -> Response { - // Read the JSON data - let data = include_str!("../../../../tests/data/ssv_valid.json"); + async fn handle_ssv(State(state): State) -> Response { + let response: SSVResponse; + { + let validators = state.validators.read().await; + let pagination = SSVPagination { total: validators.len() }; + response = SSVResponse { validators: validators.clone(), pagination }; + } // Create a valid response Response::builder() .status(200) .header("Content-Type", "application/json") - .body(data.into()) + .body(serde_json::to_string(&response).unwrap().into()) .unwrap() } diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index 8fa5cb97..27d4cfc0 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -123,6 +123,9 @@ pub struct PbsConfig { pub extra_validation_enabled: bool, /// Execution Layer RPC url to use for extra validation pub rpc_url: Option, + /// URL for the SSV network API + #[serde(default = "default_ssv_api_url")] + pub ssv_api_url: Option, /// Timeout for HTTP requests in seconds #[serde(default = "default_u64::")] pub http_timeout_seconds: u64, @@ -393,3 +396,8 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC cb_config.pbs.extra, )) } + +/// Default URL for the SSV network API +fn default_ssv_api_url() -> Option { + Some(Url::parse("https://api.ssv.network/api/v4").expect("default URL is valid")) +} diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 185c4953..2a91407b 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -107,6 +107,7 @@ impl PbsService { .load( &runtime_config.id, config.chain, + default_pbs.ssv_api_url.clone(), default_pbs.rpc_url.clone(), http_timeout, ) diff --git a/tests/src/utils.rs b/tests/src/utils.rs index d5e93ff2..58611d82 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -78,6 +78,7 @@ pub fn get_pbs_static_config(port: u16) -> PbsConfig { min_bid_wei: U256::ZERO, late_in_slot_time_ms: u64::MAX, extra_validation_enabled: false, + ssv_api_url: None, rpc_url: None, http_timeout_seconds: 10, register_validator_retry_limit: u32::MAX, From 7ed808a9724d50d9c71102884d69abffcf073e84 Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Mon, 6 Oct 2025 13:04:01 -0400 Subject: [PATCH 04/21] A little mock cleanup --- crates/common/src/config/mux.rs | 60 +++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 56f92a4f..b485441c 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -486,7 +486,11 @@ mod tests { use std::net::SocketAddr; use alloy::{primitives::U256, providers::ProviderBuilder}; - use axum::{extract::State, response::Response, routing::get}; + use axum::{ + extract::{Path, State}, + response::Response, + routing::get, + }; use tokio::{net::TcpListener, sync::RwLock, task::JoinHandle}; use url::Url; @@ -539,7 +543,9 @@ mod tests { // Start the mock server let port = 30100; let _server_handle = create_mock_server(port, None).await?; - let url = Url::parse(&format!("http://localhost:{port}/ssv")).unwrap(); + let url = + Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) + .unwrap(); let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)) .await?; @@ -574,7 +580,7 @@ mod tests { async fn test_ssv_network_fetch_big_data() -> eyre::Result<()> { // Start the mock server let port = 30101; - let _server_handle = create_mock_server(port, None).await?; + let server_handle = create_mock_server(port, None).await?; let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; @@ -594,7 +600,7 @@ mod tests { } // Clean up the server handle - _server_handle.abort(); + server_handle.abort(); Ok(()) } @@ -605,8 +611,14 @@ mod tests { async fn test_ssv_network_fetch_timeout() -> eyre::Result<()> { // Start the mock server let port = 30102; - let _server_handle = create_mock_server(port, None).await?; - let url = Url::parse(&format!("http://localhost:{port}/timeout")).unwrap(); + let state = SsvMockState { + validators: Arc::new(RwLock::new(vec![])), + force_timeout: Arc::new(RwLock::new(true)), + }; + let server_handle = create_mock_server(port, Some(state)).await?; + let url = + Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) + .unwrap(); let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; @@ -617,7 +629,7 @@ mod tests { } // Clean up the server handle - _server_handle.abort(); + server_handle.abort(); Ok(()) } @@ -629,7 +641,7 @@ mod tests { // Start the mock server let port = 30103; set_ignore_content_length(true); - let _server_handle = create_mock_server(port, None).await?; + let server_handle = create_mock_server(port, None).await?; let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; @@ -649,7 +661,7 @@ mod tests { } // Clean up the server handle - _server_handle.abort(); + server_handle.abort(); Ok(()) } @@ -657,12 +669,6 @@ mod tests { /// State for the mock server #[derive(Clone)] pub struct SsvMockState { - /// Chain to use for the mock server - pub chain: Chain, - - /// Node operator ID to use for the mock server - pub node_operator_id: U256, - /// List of pubkeys for the mock server to return pub validators: Arc>>, @@ -672,7 +678,7 @@ mod tests { /// Creates a simple mock server to simulate the SSV API endpoint under /// various conditions for testing. Note this ignores - async fn create_mock_server( + pub async fn create_mock_server( port: u16, state: Option, ) -> Result, axum::Error> { @@ -680,15 +686,15 @@ mod tests { let response = serde_json::from_str::(data).expect("failed to parse test data"); let state = state.unwrap_or(SsvMockState { - chain: Chain::Hoodi, - node_operator_id: U256::from(16), validators: Arc::new(RwLock::new(response.validators)), force_timeout: Arc::new(RwLock::new(false)), }); let router = axum::Router::new() - .route("/ssv", get(handle_ssv)) + .route( + "/{chain_name}/validators/in_operator/{node_operator_id}", + get(handle_validators), + ) .route("/big_data", get(handle_big_data)) - .route("/timeout", get(handle_timeout)) .with_state(state) .into_make_service(); @@ -706,8 +712,18 @@ mod tests { result } - /// Sends the good SSV JSON data to the client - async fn handle_ssv(State(state): State) -> Response { + /// Returns a valid SSV validators response, or a timeout if requested in + /// the server state + async fn handle_validators( + State(state): State, + Path((_, _)): Path<(String, u64)>, + ) -> Response { + // Time out if requested + if *state.force_timeout.read().await { + return handle_timeout().await; + } + + // Generate the response based on the current validators let response: SSVResponse; { let validators = state.validators.read().await; From 2d2bc7f7b1a66856d685bcab633bdf4be31cffe4 Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Mon, 6 Oct 2025 16:43:30 -0400 Subject: [PATCH 05/21] Broke SSV out into its own module --- Cargo.lock | 28 ++- crates/common/Cargo.toml | 3 + crates/common/src/config/mux.rs | 308 +------------------------ crates/common/src/interop/mod.rs | 1 + crates/common/src/interop/ssv/mod.rs | 2 + crates/common/src/interop/ssv/types.rs | 61 +++++ crates/common/src/interop/ssv/utils.rs | 24 ++ crates/common/src/lib.rs | 1 + crates/common/src/utils.rs | 15 +- tests/Cargo.toml | 4 + tests/src/lib.rs | 1 + tests/src/mock_ssv.rs | 108 +++++++++ tests/src/utils.rs | 6 +- tests/tests/pbs_mux.rs | 170 +++++++++++++- 14 files changed, 413 insertions(+), 319 deletions(-) create mode 100644 crates/common/src/interop/mod.rs create mode 100644 crates/common/src/interop/ssv/mod.rs create mode 100644 crates/common/src/interop/ssv/types.rs create mode 100644 crates/common/src/interop/ssv/utils.rs create mode 100644 tests/src/mock_ssv.rs diff --git a/Cargo.lock b/Cargo.lock index e3fc4fa8..906a8507 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1732,6 +1732,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "tracing-test", "tree_hash", "types", "url", @@ -2747,7 +2748,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.0", ] [[package]] @@ -5451,7 +5452,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.0", ] [[package]] @@ -6271,7 +6272,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.0", ] [[package]] @@ -6809,6 +6810,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.106", +] + [[package]] name = "tree_hash" version = "0.10.0" diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index ea5c0199..57a5fbb3 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -5,6 +5,9 @@ publish = false rust-version.workspace = true version.workspace = true +[features] +testing-flags = [] + [dependencies] aes.workspace = true alloy.workspace = true diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index b485441c..bbaff08c 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -15,13 +15,14 @@ use alloy::{ }; use eyre::{Context, bail, ensure}; use reqwest::Client; -use serde::{Deserialize, Deserializer, Serialize}; +use serde::{Deserialize, Serialize}; use tracing::{debug, info, warn}; use url::Url; use super::{MUX_PATH_ENV, PbsConfig, RelayConfig, load_optional_env_var}; use crate::{ config::{remove_duplicate_keys, safe_read_http_response}, + interop::ssv::utils::fetch_ssv_pubkeys_from_url, pbs::RelayClient, types::{BlsPublicKey, Chain}, utils::default_bool, @@ -416,91 +417,12 @@ async fn fetch_ssv_pubkeys( Ok(pubkeys) } -async fn fetch_ssv_pubkeys_from_url(url: Url, http_timeout: Duration) -> eyre::Result { - let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?; - let response = client.get(url).send().await.map_err(|e| { - if e.is_timeout() { - eyre::eyre!("Request to SSV network API timed out: {e}") - } else { - eyre::eyre!("Error sending request to SSV network API: {e}") - } - })?; - - // Parse the response as JSON - let body_bytes = safe_read_http_response(response).await?; - serde_json::from_slice::(&body_bytes).wrap_err("failed to parse SSV response") -} - -#[derive(Deserialize, Serialize)] -struct SSVResponse { - validators: Vec, - pagination: SSVPagination, -} - -#[derive(Clone)] -struct SSVValidator { - pubkey: BlsPublicKey, -} - -impl<'de> Deserialize<'de> for SSVValidator { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - #[derive(Deserialize)] - struct SSVValidator { - public_key: String, - } - - let s = SSVValidator::deserialize(deserializer)?; - let bytes = alloy::hex::decode(&s.public_key).map_err(serde::de::Error::custom)?; - let pubkey = BlsPublicKey::deserialize(&bytes) - .map_err(|e| serde::de::Error::custom(format!("invalid BLS public key: {e:?}")))?; - - Ok(Self { pubkey }) - } -} - -impl Serialize for SSVValidator { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - #[derive(Serialize)] - struct SSVValidator { - public_key: String, - } - - let s = SSVValidator { public_key: self.pubkey.as_hex_string() }; - s.serialize(serializer) - } -} - -#[derive(Deserialize, Serialize)] -struct SSVPagination { - total: usize, -} - #[cfg(test)] mod tests { - use std::net::SocketAddr; - use alloy::{primitives::U256, providers::ProviderBuilder}; - use axum::{ - extract::{Path, State}, - response::Response, - routing::get, - }; - use tokio::{net::TcpListener, sync::RwLock, task::JoinHandle}; use url::Url; use super::*; - use crate::{ - config::{HTTP_TIMEOUT_SECONDS_DEFAULT, MUXER_HTTP_MAX_LENGTH}, - utils::{ResponseReadError, bls_pubkey_from_hex_unchecked, set_ignore_content_length}, - }; - - const TEST_HTTP_TIMEOUT: u64 = 2; #[tokio::test] async fn test_lido_registry_address() -> eyre::Result<()> { @@ -536,230 +458,4 @@ mod tests { Ok(()) } - - #[tokio::test] - /// Tests that a successful SSV network fetch is handled and parsed properly - async fn test_ssv_network_fetch() -> eyre::Result<()> { - // Start the mock server - let port = 30100; - let _server_handle = create_mock_server(port, None).await?; - let url = - Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) - .unwrap(); - let response = - fetch_ssv_pubkeys_from_url(url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)) - .await?; - - // Make sure the response is correct - // NOTE: requires that ssv_data.json dpesn't change - assert_eq!(response.validators.len(), 3); - let expected_pubkeys = [ - bls_pubkey_from_hex_unchecked( - "967ba17a3e7f82a25aa5350ec34d6923e28ad8237b5a41efe2c5e325240d74d87a015bf04634f21900963539c8229b2a", - ), - bls_pubkey_from_hex_unchecked( - "ac769e8cec802e8ffee34de3253be8f438a0c17ee84bdff0b6730280d24b5ecb77ebc9c985281b41ee3bda8663b6658c", - ), - bls_pubkey_from_hex_unchecked( - "8c866a5a05f3d45c49b457e29365259021a509c5daa82e124f9701a960ee87b8902e87175315ab638a3d8b1115b23639", - ), - ]; - for (i, validator) in response.validators.iter().enumerate() { - assert_eq!(validator.pubkey, expected_pubkeys[i]); - } - - // Clean up the server handle - _server_handle.abort(); - - Ok(()) - } - - #[tokio::test] - /// Tests that the SSV network fetch is handled properly when the response's - /// body is too large - async fn test_ssv_network_fetch_big_data() -> eyre::Result<()> { - // Start the mock server - let port = 30101; - let server_handle = create_mock_server(port, None).await?; - let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); - let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; - - // The response should fail due to content length being too big - match response { - Ok(_) => { - panic!("Expected an error due to big content length, but got a successful response") - } - Err(e) => match e.downcast_ref::() { - Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { - assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); - assert!(*content_length > MUXER_HTTP_MAX_LENGTH); - assert!(raw.is_empty()); - } - _ => panic!("Expected PayloadTooLarge error, got: {}", e), - }, - } - - // Clean up the server handle - server_handle.abort(); - - Ok(()) - } - - #[tokio::test] - /// Tests that the SSV network fetch is handled properly when the request - /// times out - async fn test_ssv_network_fetch_timeout() -> eyre::Result<()> { - // Start the mock server - let port = 30102; - let state = SsvMockState { - validators: Arc::new(RwLock::new(vec![])), - force_timeout: Arc::new(RwLock::new(true)), - }; - let server_handle = create_mock_server(port, Some(state)).await?; - let url = - Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) - .unwrap(); - let response = - fetch_ssv_pubkeys_from_url(url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; - - // The response should fail due to timeout - assert!(response.is_err(), "Expected timeout error, but got success"); - if let Err(e) = response { - assert!(e.to_string().contains("timed out"), "Expected timeout error, got: {}", e); - } - - // Clean up the server handle - server_handle.abort(); - - Ok(()) - } - - #[tokio::test] - /// Tests that the SSV network fetch is handled properly when the response's - /// content-length header is missing - async fn test_ssv_network_fetch_big_data_without_content_length() -> eyre::Result<()> { - // Start the mock server - let port = 30103; - set_ignore_content_length(true); - let server_handle = create_mock_server(port, None).await?; - let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); - let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; - - // The response should fail due to the body being too big - match response { - Ok(_) => { - panic!("Expected an error due to excessive data, but got a successful response") - } - Err(e) => match e.downcast_ref::() { - Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { - assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); - assert_eq!(*content_length, 0); - assert!(!raw.is_empty()); - } - _ => panic!("Expected PayloadTooLarge error, got: {}", e), - }, - } - - // Clean up the server handle - server_handle.abort(); - - Ok(()) - } - - /// State for the mock server - #[derive(Clone)] - pub struct SsvMockState { - /// List of pubkeys for the mock server to return - pub validators: Arc>>, - - /// Whether to force a timeout response to simulate a server error - pub force_timeout: Arc>, - } - - /// Creates a simple mock server to simulate the SSV API endpoint under - /// various conditions for testing. Note this ignores - pub async fn create_mock_server( - port: u16, - state: Option, - ) -> Result, axum::Error> { - let data = include_str!("../../../../tests/data/ssv_valid.json"); - let response = - serde_json::from_str::(data).expect("failed to parse test data"); - let state = state.unwrap_or(SsvMockState { - validators: Arc::new(RwLock::new(response.validators)), - force_timeout: Arc::new(RwLock::new(false)), - }); - let router = axum::Router::new() - .route( - "/{chain_name}/validators/in_operator/{node_operator_id}", - get(handle_validators), - ) - .route("/big_data", get(handle_big_data)) - .with_state(state) - .into_make_service(); - - let address = SocketAddr::from(([127, 0, 0, 1], port)); - let listener = TcpListener::bind(address).await.map_err(axum::Error::new)?; - let server = axum::serve(listener, router).with_graceful_shutdown(async { - tokio::signal::ctrl_c().await.expect("Failed to listen for shutdown signal"); - }); - let result = Ok(tokio::spawn(async move { - if let Err(e) = server.await { - eprintln!("Server error: {}", e); - } - })); - info!("Mock server started on http://localhost:{port}/"); - result - } - - /// Returns a valid SSV validators response, or a timeout if requested in - /// the server state - async fn handle_validators( - State(state): State, - Path((_, _)): Path<(String, u64)>, - ) -> Response { - // Time out if requested - if *state.force_timeout.read().await { - return handle_timeout().await; - } - - // Generate the response based on the current validators - let response: SSVResponse; - { - let validators = state.validators.read().await; - let pagination = SSVPagination { total: validators.len() }; - response = SSVResponse { validators: validators.clone(), pagination }; - } - - // Create a valid response - Response::builder() - .status(200) - .header("Content-Type", "application/json") - .body(serde_json::to_string(&response).unwrap().into()) - .unwrap() - } - - /// Sends a response with a large body - larger than the maximum allowed. - /// Note that hyper overwrites the content-length header automatically, so - /// setting it here wouldn't actually change the value that ultimately - /// gets sent to the server. - async fn handle_big_data() -> Response { - let body = "f".repeat(2 * MUXER_HTTP_MAX_LENGTH); - Response::builder() - .status(200) - .header("Content-Type", "application/text") - .body(body.into()) - .unwrap() - } - - /// Simulates a timeout by sleeping for a long time - async fn handle_timeout() -> Response { - // Sleep for a long time to simulate a timeout - tokio::time::sleep(std::time::Duration::from_secs(2 * TEST_HTTP_TIMEOUT)).await; - Response::builder() - .status(200) - .header("Content-Type", "application/text") - .body("Timeout response".into()) - .unwrap() - } } diff --git a/crates/common/src/interop/mod.rs b/crates/common/src/interop/mod.rs new file mode 100644 index 00000000..42502f6f --- /dev/null +++ b/crates/common/src/interop/mod.rs @@ -0,0 +1 @@ +pub mod ssv; diff --git a/crates/common/src/interop/ssv/mod.rs b/crates/common/src/interop/ssv/mod.rs new file mode 100644 index 00000000..b4ab6a6a --- /dev/null +++ b/crates/common/src/interop/ssv/mod.rs @@ -0,0 +1,2 @@ +pub mod types; +pub mod utils; diff --git a/crates/common/src/interop/ssv/types.rs b/crates/common/src/interop/ssv/types.rs new file mode 100644 index 00000000..b8ac2e23 --- /dev/null +++ b/crates/common/src/interop/ssv/types.rs @@ -0,0 +1,61 @@ +use serde::{Deserialize, Deserializer, Serialize}; + +use crate::types::BlsPublicKey; + +/// Response from the SSV API for validators +#[derive(Deserialize, Serialize)] +pub struct SSVResponse { + /// List of validators returned by the SSV API + pub validators: Vec, + + /// Pagination information + pub pagination: SSVPagination, +} + +/// Representation of a validator in the SSV API +#[derive(Clone)] +pub struct SSVValidator { + /// The public key of the validator + pub pubkey: BlsPublicKey, +} + +impl<'de> Deserialize<'de> for SSVValidator { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + struct SSVValidator { + public_key: String, + } + + let s = SSVValidator::deserialize(deserializer)?; + let bytes = alloy::hex::decode(&s.public_key).map_err(serde::de::Error::custom)?; + let pubkey = BlsPublicKey::deserialize(&bytes) + .map_err(|e| serde::de::Error::custom(format!("invalid BLS public key: {e:?}")))?; + + Ok(Self { pubkey }) + } +} + +impl Serialize for SSVValidator { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + #[derive(Serialize)] + struct SSVValidator { + public_key: String, + } + + let s = SSVValidator { public_key: self.pubkey.as_hex_string() }; + s.serialize(serializer) + } +} + +/// Pagination information from the SSV API +#[derive(Deserialize, Serialize)] +pub struct SSVPagination { + /// Total number of validators available + pub total: usize, +} diff --git a/crates/common/src/interop/ssv/utils.rs b/crates/common/src/interop/ssv/utils.rs new file mode 100644 index 00000000..e443e018 --- /dev/null +++ b/crates/common/src/interop/ssv/utils.rs @@ -0,0 +1,24 @@ +use std::time::Duration; + +use eyre::Context; +use url::Url; + +use crate::{config::safe_read_http_response, interop::ssv::types::SSVResponse}; + +pub async fn fetch_ssv_pubkeys_from_url( + url: Url, + http_timeout: Duration, +) -> eyre::Result { + let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?; + let response = client.get(url).send().await.map_err(|e| { + if e.is_timeout() { + eyre::eyre!("Request to SSV network API timed out: {e}") + } else { + eyre::eyre!("Error sending request to SSV network API: {e}") + } + })?; + + // Parse the response as JSON + let body_bytes = safe_read_http_response(response).await?; + serde_json::from_slice::(&body_bytes).wrap_err("failed to parse SSV response") +} diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 1fe1f26a..462dcec1 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -3,6 +3,7 @@ use std::time::Duration; pub mod commit; pub mod config; pub mod constants; +pub mod interop; pub mod pbs; pub mod signature; pub mod signer; diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index d972477b..764ab188 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -1,4 +1,4 @@ -#[cfg(test)] +#[cfg(feature = "testing-flags")] use std::cell::Cell; use std::{ net::Ipv4Addr, @@ -43,17 +43,18 @@ pub enum ResponseReadError { ReqwestError(#[from] reqwest::Error), } -#[cfg(test)] +#[cfg(feature = "testing-flags")] thread_local! { static IGNORE_CONTENT_LENGTH: Cell = const { Cell::new(false) }; } -#[cfg(test)] +#[cfg(feature = "testing-flags")] pub fn set_ignore_content_length(val: bool) { IGNORE_CONTENT_LENGTH.with(|f| f.set(val)); } -#[cfg(test)] +#[cfg(feature = "testing-flags")] +#[allow(dead_code)] fn should_ignore_content_length() -> bool { IGNORE_CONTENT_LENGTH.with(|f| f.get()) } @@ -65,13 +66,13 @@ pub async fn read_chunked_body_with_max( max_size: usize, ) -> Result, ResponseReadError> { // Get the content length from the response headers - #[cfg(not(test))] + #[cfg(not(feature = "testing-flags"))] let content_length = res.content_length(); - #[cfg(test)] + #[cfg(feature = "testing-flags")] let mut content_length = res.content_length(); - #[cfg(test)] + #[cfg(feature = "testing-flags")] if should_ignore_content_length() { // Used for testing purposes to ignore content length content_length = None; diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 3195cc11..5e8e1596 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -18,5 +18,9 @@ tempfile.workspace = true tokio.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +tracing-test.workspace = true tree_hash.workspace = true url.workspace = true + +[dev-dependencies] +cb-common = { path = "../crates/common", features = ["testing-flags"] } \ No newline at end of file diff --git a/tests/src/lib.rs b/tests/src/lib.rs index a4fbbb6a..42e36a8e 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -1,3 +1,4 @@ pub mod mock_relay; +pub mod mock_ssv; pub mod mock_validator; pub mod utils; diff --git a/tests/src/mock_ssv.rs b/tests/src/mock_ssv.rs new file mode 100644 index 00000000..dcaea16d --- /dev/null +++ b/tests/src/mock_ssv.rs @@ -0,0 +1,108 @@ +use std::{net::SocketAddr, sync::Arc}; + +use axum::{ + extract::{Path, State}, + response::Response, + routing::get, +}; +use cb_common::{ + config::MUXER_HTTP_MAX_LENGTH, + interop::ssv::types::{SSVPagination, SSVResponse, SSVValidator}, +}; +use tokio::{net::TcpListener, sync::RwLock, task::JoinHandle}; +use tracing::info; + +pub const TEST_HTTP_TIMEOUT: u64 = 2; + +/// State for the mock server +#[derive(Clone)] +pub struct SsvMockState { + /// List of pubkeys for the mock server to return + pub validators: Arc>>, + + /// Whether to force a timeout response to simulate a server error + pub force_timeout: Arc>, +} + +/// Creates a simple mock server to simulate the SSV API endpoint under +/// various conditions for testing. Note this ignores +pub async fn create_mock_server( + port: u16, + state: Option, +) -> Result, axum::Error> { + let data = include_str!("../../tests/data/ssv_valid.json"); + let response = serde_json::from_str::(data).expect("failed to parse test data"); + let state = state.unwrap_or(SsvMockState { + validators: Arc::new(RwLock::new(response.validators)), + force_timeout: Arc::new(RwLock::new(false)), + }); + let router = axum::Router::new() + .route("/{chain_name}/validators/in_operator/{node_operator_id}", get(handle_validators)) + .route("/big_data", get(handle_big_data)) + .with_state(state) + .into_make_service(); + + let address = SocketAddr::from(([127, 0, 0, 1], port)); + let listener = TcpListener::bind(address).await.map_err(axum::Error::new)?; + let server = axum::serve(listener, router).with_graceful_shutdown(async { + tokio::signal::ctrl_c().await.expect("Failed to listen for shutdown signal"); + }); + let result = Ok(tokio::spawn(async move { + if let Err(e) = server.await { + eprintln!("Server error: {}", e); + } + })); + info!("Mock server started on http://localhost:{port}/"); + result +} + +/// Returns a valid SSV validators response, or a timeout if requested in +/// the server state +async fn handle_validators( + State(state): State, + Path((_, _)): Path<(String, u64)>, +) -> Response { + // Time out if requested + if *state.force_timeout.read().await { + return handle_timeout().await; + } + + // Generate the response based on the current validators + let response: SSVResponse; + { + let validators = state.validators.read().await; + let pagination = SSVPagination { total: validators.len() }; + response = SSVResponse { validators: validators.clone(), pagination }; + } + + // Create a valid response + Response::builder() + .status(200) + .header("Content-Type", "application/json") + .body(serde_json::to_string(&response).unwrap().into()) + .unwrap() +} + +/// Sends a response with a large body - larger than the maximum allowed. +/// Note that hyper overwrites the content-length header automatically, so +/// setting it here wouldn't actually change the value that ultimately +/// gets sent to the server. +async fn handle_big_data() -> Response { + let body = "f".repeat(2 * MUXER_HTTP_MAX_LENGTH); + Response::builder() + .status(200) + .header("Content-Type", "application/text") + .body(body.into()) + .unwrap() +} + +/// Simulates a timeout by sleeping for a long time +async fn handle_timeout() -> Response { + // Sleep for a long time to simulate a timeout + tokio::time::sleep(std::time::Duration::from_secs(2 * TEST_HTTP_TIMEOUT)).await; + Response::builder() + .status(200) + .header("Content-Type", "application/text") + .body("Timeout response".into()) + .unwrap() +} diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 58611d82..af4f80c3 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -14,7 +14,7 @@ use cb_common::{ pbs::{RelayClient, RelayEntry}, signer::SignerLoader, types::{BlsPublicKey, Chain, ModuleId}, - utils::default_host, + utils::{bls_pubkey_from_hex, default_host}, }; use eyre::Result; @@ -134,3 +134,7 @@ pub fn get_start_signer_config( _ => panic!("Only local signers are supported in tests"), } } + +pub fn bls_pubkey_from_hex_unchecked(hex: &str) -> BlsPublicKey { + bls_pubkey_from_hex(hex).unwrap() +} diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 0d2067a6..731fa3be 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -1,15 +1,152 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; -use cb_common::{config::RuntimeMuxConfig, signer::random_secret, types::Chain}; +use cb_common::{ + config::{HTTP_TIMEOUT_SECONDS_DEFAULT, MUXER_HTTP_MAX_LENGTH, RuntimeMuxConfig}, + interop::ssv::utils::fetch_ssv_pubkeys_from_url, + signer::random_secret, + types::Chain, + utils::{ResponseReadError, set_ignore_content_length}, +}; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ mock_relay::{MockRelayState, start_mock_relay_service}, + mock_ssv::{SsvMockState, TEST_HTTP_TIMEOUT, create_mock_server}, mock_validator::MockValidator, - utils::{generate_mock_relay, get_pbs_static_config, setup_test_env, to_pbs_config}, + utils::{ + bls_pubkey_from_hex_unchecked, generate_mock_relay, get_pbs_static_config, setup_test_env, + to_pbs_config, + }, }; use eyre::Result; use reqwest::StatusCode; +use tokio::sync::RwLock; use tracing::info; +use url::Url; + +#[tokio::test] +/// Tests that a successful SSV network fetch is handled and parsed properly +async fn test_ssv_network_fetch() -> eyre::Result<()> { + // Start the mock server + let port = 30100; + let _server_handle = create_mock_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) + .unwrap(); + let response = + fetch_ssv_pubkeys_from_url(url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)).await?; + + // Make sure the response is correct + // NOTE: requires that ssv_data.json dpesn't change + assert_eq!(response.validators.len(), 3); + let expected_pubkeys = [ + bls_pubkey_from_hex_unchecked( + "967ba17a3e7f82a25aa5350ec34d6923e28ad8237b5a41efe2c5e325240d74d87a015bf04634f21900963539c8229b2a", + ), + bls_pubkey_from_hex_unchecked( + "ac769e8cec802e8ffee34de3253be8f438a0c17ee84bdff0b6730280d24b5ecb77ebc9c985281b41ee3bda8663b6658c", + ), + bls_pubkey_from_hex_unchecked( + "8c866a5a05f3d45c49b457e29365259021a509c5daa82e124f9701a960ee87b8902e87175315ab638a3d8b1115b23639", + ), + ]; + for (i, validator) in response.validators.iter().enumerate() { + assert_eq!(validator.pubkey, expected_pubkeys[i]); + } + + // Clean up the server handle + _server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +/// Tests that the SSV network fetch is handled properly when the response's +/// body is too large +async fn test_ssv_network_fetch_big_data() -> eyre::Result<()> { + // Start the mock server + let port = 30101; + let server_handle = cb_tests::mock_ssv::create_mock_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); + let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; + + // The response should fail due to content length being too big + match response { + Ok(_) => { + panic!("Expected an error due to big content length, but got a successful response") + } + Err(e) => match e.downcast_ref::() { + Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { + assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); + assert!(*content_length > MUXER_HTTP_MAX_LENGTH); + assert!(raw.is_empty()); + } + _ => panic!("Expected PayloadTooLarge error, got: {}", e), + }, + } + + // Clean up the server handle + server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +/// Tests that the SSV network fetch is handled properly when the request +/// times out +async fn test_ssv_network_fetch_timeout() -> eyre::Result<()> { + // Start the mock server + let port = 30102; + let state = SsvMockState { + validators: Arc::new(RwLock::new(vec![])), + force_timeout: Arc::new(RwLock::new(true)), + }; + let server_handle = create_mock_server(port, Some(state)).await?; + let url = Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) + .unwrap(); + let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; + + // The response should fail due to timeout + assert!(response.is_err(), "Expected timeout error, but got success"); + if let Err(e) = response { + assert!(e.to_string().contains("timed out"), "Expected timeout error, got: {}", e); + } + + // Clean up the server handle + server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +/// Tests that the SSV network fetch is handled properly when the response's +/// content-length header is missing +async fn test_ssv_network_fetch_big_data_without_content_length() -> eyre::Result<()> { + // Start the mock server + let port = 30103; + set_ignore_content_length(true); + let server_handle = create_mock_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); + let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; + + // The response should fail due to the body being too big + match response { + Ok(_) => { + panic!("Expected an error due to excessive data, but got a successful response") + } + Err(e) => match e.downcast_ref::() { + Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { + assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); + assert_eq!(*content_length, 0); + assert!(!raw.is_empty()); + } + _ => panic!("Expected PayloadTooLarge error, got: {}", e), + }, + } + + // Clean up the server handle + server_handle.abort(); + + Ok(()) +} #[tokio::test] async fn test_mux() -> Result<()> { @@ -89,3 +226,32 @@ async fn test_mux() -> Result<()> { Ok(()) } + +/* +#[tokio::test] +#[tracing_test::traced_test] +async fn test_auto_refresh() -> Result<()> { + // This test reads the log files to verify behavior, so we can't attach a global + // trace listener setup_test_env(); + + let signer = random_secret(); + let pubkey = signer.public_key(); + + let chain = Chain::Hoodi; + let pbs_port = 3710; + + // Start the mock SSV API server + let ssv_api_port = 3711; + let ssv_api_url = format!("http://localhost:{ssv_api_port}"); + let mock_ssv_state = SsvMockState { + validators: Arc::new(RwLock::new(vec![pubkey])), + force_timeout: Arc::new(RwLock::new(false)), + }; + + // Run PBS service + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + Ok(()) +} +*/ From 362820426f94afe18282a97b7ae62809c49b307d Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 7 Oct 2025 10:48:42 -0400 Subject: [PATCH 06/21] First working SSV dynamic refresh test --- crates/pbs/src/service.rs | 4 ++ tests/src/mock_ssv.rs | 2 +- tests/tests/pbs_mux.rs | 47 +++----------- tests/tests/pbs_mux_refresh.rs | 115 +++++++++++++++++++++++++++++++++ 4 files changed, 129 insertions(+), 39 deletions(-) create mode 100644 tests/tests/pbs_mux_refresh.rs diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 2a91407b..b8a1ffeb 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -138,6 +138,10 @@ impl PbsService { if new_pubkeys.is_empty() { return; } + // Log the new pubkeys + for (pubkey, runtime_config) in new_pubkeys.iter() { + info!("adding new pubkey {pubkey} to mux {}", runtime_config.id); + } { // Since config isn't an RwLock, the option with the least amount of code churn // is to just clone the whole config and replace the mux_lookup diff --git a/tests/src/mock_ssv.rs b/tests/src/mock_ssv.rs index dcaea16d..00b4de26 100644 --- a/tests/src/mock_ssv.rs +++ b/tests/src/mock_ssv.rs @@ -26,7 +26,7 @@ pub struct SsvMockState { /// Creates a simple mock server to simulate the SSV API endpoint under /// various conditions for testing. Note this ignores -pub async fn create_mock_server( +pub async fn create_mock_ssv_server( port: u16, state: Option, ) -> Result, axum::Error> { diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 731fa3be..5cde1158 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -10,7 +10,7 @@ use cb_common::{ use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ mock_relay::{MockRelayState, start_mock_relay_service}, - mock_ssv::{SsvMockState, TEST_HTTP_TIMEOUT, create_mock_server}, + mock_ssv::{SsvMockState, TEST_HTTP_TIMEOUT, create_mock_ssv_server}, mock_validator::MockValidator, utils::{ bls_pubkey_from_hex_unchecked, generate_mock_relay, get_pbs_static_config, setup_test_env, @@ -25,10 +25,10 @@ use url::Url; #[tokio::test] /// Tests that a successful SSV network fetch is handled and parsed properly -async fn test_ssv_network_fetch() -> eyre::Result<()> { +async fn test_ssv_network_fetch() -> Result<()> { // Start the mock server let port = 30100; - let _server_handle = create_mock_server(port, None).await?; + let _server_handle = create_mock_ssv_server(port, None).await?; let url = Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) .unwrap(); let response = @@ -61,10 +61,10 @@ async fn test_ssv_network_fetch() -> eyre::Result<()> { #[tokio::test] /// Tests that the SSV network fetch is handled properly when the response's /// body is too large -async fn test_ssv_network_fetch_big_data() -> eyre::Result<()> { +async fn test_ssv_network_fetch_big_data() -> Result<()> { // Start the mock server let port = 30101; - let server_handle = cb_tests::mock_ssv::create_mock_server(port, None).await?; + let server_handle = cb_tests::mock_ssv::create_mock_ssv_server(port, None).await?; let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; @@ -92,14 +92,14 @@ async fn test_ssv_network_fetch_big_data() -> eyre::Result<()> { #[tokio::test] /// Tests that the SSV network fetch is handled properly when the request /// times out -async fn test_ssv_network_fetch_timeout() -> eyre::Result<()> { +async fn test_ssv_network_fetch_timeout() -> Result<()> { // Start the mock server let port = 30102; let state = SsvMockState { validators: Arc::new(RwLock::new(vec![])), force_timeout: Arc::new(RwLock::new(true)), }; - let server_handle = create_mock_server(port, Some(state)).await?; + let server_handle = create_mock_ssv_server(port, Some(state)).await?; let url = Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) .unwrap(); let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; @@ -119,11 +119,11 @@ async fn test_ssv_network_fetch_timeout() -> eyre::Result<()> { #[tokio::test] /// Tests that the SSV network fetch is handled properly when the response's /// content-length header is missing -async fn test_ssv_network_fetch_big_data_without_content_length() -> eyre::Result<()> { +async fn test_ssv_network_fetch_big_data_without_content_length() -> Result<()> { // Start the mock server let port = 30103; set_ignore_content_length(true); - let server_handle = create_mock_server(port, None).await?; + let server_handle = create_mock_ssv_server(port, None).await?; let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; @@ -226,32 +226,3 @@ async fn test_mux() -> Result<()> { Ok(()) } - -/* -#[tokio::test] -#[tracing_test::traced_test] -async fn test_auto_refresh() -> Result<()> { - // This test reads the log files to verify behavior, so we can't attach a global - // trace listener setup_test_env(); - - let signer = random_secret(); - let pubkey = signer.public_key(); - - let chain = Chain::Hoodi; - let pbs_port = 3710; - - // Start the mock SSV API server - let ssv_api_port = 3711; - let ssv_api_url = format!("http://localhost:{ssv_api_port}"); - let mock_ssv_state = SsvMockState { - validators: Arc::new(RwLock::new(vec![pubkey])), - force_timeout: Arc::new(RwLock::new(false)), - }; - - // Run PBS service - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - Ok(()) -} -*/ diff --git a/tests/tests/pbs_mux_refresh.rs b/tests/tests/pbs_mux_refresh.rs new file mode 100644 index 00000000..185d57f1 --- /dev/null +++ b/tests/tests/pbs_mux_refresh.rs @@ -0,0 +1,115 @@ +use std::{sync::Arc, time::Duration}; + +use cb_common::{ + config::{MuxConfig, MuxKeysLoader, PbsMuxes}, + interop::ssv::types::SSVValidator, + signer::random_secret, + types::Chain, +}; +use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; +use cb_tests::{ + mock_relay::{MockRelayState, start_mock_relay_service}, + mock_ssv::{SsvMockState, create_mock_ssv_server}, + utils::{generate_mock_relay, get_pbs_static_config, to_pbs_config}, +}; +use eyre::Result; +use tokio::sync::RwLock; +use tracing::info; +use url::Url; + +#[tokio::test] +#[allow(unused_assignments)] +#[tracing_test::traced_test] +async fn test_auto_refresh() -> Result<()> { + // This test reads the log files to verify behavior, so we can't attach a global + // trace listener setup_test_env(); + + let signer = random_secret(); + let pubkey = signer.public_key(); + + let chain = Chain::Hoodi; + let pbs_port = 3710; + + // Start the mock SSV API server + let mut next_port = pbs_port + 1; + let ssv_api_port = next_port; + next_port += 1; + let ssv_api_url = Url::parse(&format!("http://localhost:{ssv_api_port}"))?; + let mock_ssv_state = SsvMockState { + validators: Arc::new(RwLock::new(vec![SSVValidator { pubkey: pubkey.clone() }])), + force_timeout: Arc::new(RwLock::new(false)), + }; + let ssv_server_handle = + create_mock_ssv_server(ssv_api_port, Some(mock_ssv_state.clone())).await?; + + // Start a mock relay to be used by the mux + let default_relay = generate_mock_relay(next_port, pubkey.clone())?; + let relay_id = default_relay.id.clone().to_string(); + let mock_state = Arc::new(MockRelayState::new(chain, signer)); + tokio::spawn(start_mock_relay_service(mock_state.clone(), next_port)); + next_port += 1; + + // Create the registry mux + let loader = MuxKeysLoader::Registry { + enable_refreshing: true, + node_operator_id: 1, + registry: cb_common::config::NORegistry::SSV, + }; + let muxes = PbsMuxes { + muxes: vec![MuxConfig { + id: relay_id.clone(), + loader: Some(loader), + late_in_slot_time_ms: Some(2000), + relays: vec![(*default_relay.config).clone()], + timeout_get_header_ms: Some(750), + validator_pubkeys: vec![], + }], + }; + + // Set up the PBS config + let mut pbs_config = get_pbs_static_config(pbs_port); + pbs_config.ssv_api_url = Some(ssv_api_url.clone()); + pbs_config.mux_registry_refresh_interval_seconds = 1; // Refresh the mux every second + let (mux_lookup, registry_muxes) = muxes.validate_and_fill(chain, &pbs_config).await?; + let relays = vec![default_relay.clone()]; + let mut config = to_pbs_config(chain, pbs_config, relays); + config.all_relays = vec![default_relay.clone()]; + config.mux_lookup = Some(mux_lookup); + config.registry_muxes = Some(registry_muxes); + + // Run PBS service + let state = PbsState::new(config); + let pbs_server = tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + info!("Started PBS server with pubkey {pubkey}"); + + // Wait for the first refresh to complete + let wait_for_refresh_time = Duration::from_secs(2); + tokio::time::sleep(wait_for_refresh_time).await; + + // Check the logs to ensure a refresh happened + assert!(logs_contain(&format!("fetched 1 pubkeys for registry mux {relay_id}"))); + assert!(!logs_contain(&format!("fetched 2 pubkeys for registry mux {relay_id}"))); + assert!(!logs_contain("adding new pubkey")); + + // Add another validator + let new_secret = random_secret(); + let new_pubkey = new_secret.public_key(); + { + let mut validators = mock_ssv_state.validators.write().await; + validators.push(SSVValidator { pubkey: new_pubkey.clone() }); + info!("Added new validator {new_pubkey} to the SSV mock server"); + } + + // Wait for the next refresh to complete + tokio::time::sleep(wait_for_refresh_time).await; + + // Check the logs to ensure the new pubkey was added + assert!(logs_contain(&format!("adding new pubkey {new_pubkey} to mux {relay_id}"))); + assert!(logs_contain(&format!("fetched 2 pubkeys for registry mux {relay_id}"))); + + // Shut down the server handles + pbs_server.abort(); + ssv_server_handle.abort(); + + Ok(()) +} From f4cd8c24964b88d7e5b08bf394dae1ea060e6592 Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 7 Oct 2025 11:19:47 -0400 Subject: [PATCH 07/21] Clippy formatting --- config.example.toml | 3 +++ tests/src/mock_ssv.rs | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/config.example.toml b/config.example.toml index 237336e4..1659275a 100644 --- a/config.example.toml +++ b/config.example.toml @@ -55,6 +55,9 @@ extra_validation_enabled = false # Execution Layer RPC url to use for extra validation # OPTIONAL # rpc_url = "https://ethereum-holesky-rpc.publicnode.com" +# URL of the SSV API server to use, if you have a mux that targets an SSV node operator +# OPTIONAL +# ssv_api_url = "https://api.ssv.network/api/v4" # Timeout for any HTTP requests sent from the PBS module to other services, in seconds # OPTIONAL, DEFAULT: 10 http_timeout_seconds = 10 diff --git a/tests/src/mock_ssv.rs b/tests/src/mock_ssv.rs index 00b4de26..3fa12546 100644 --- a/tests/src/mock_ssv.rs +++ b/tests/src/mock_ssv.rs @@ -49,7 +49,7 @@ pub async fn create_mock_ssv_server( }); let result = Ok(tokio::spawn(async move { if let Err(e) = server.await { - eprintln!("Server error: {}", e); + eprintln!("Server error: {e}"); } })); info!("Mock server started on http://localhost:{port}/"); From bccc5bb09e5172540d89ad3b77f0445fb10d423e Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 7 Oct 2025 16:43:00 -0400 Subject: [PATCH 08/21] Fixed some state setup prior to testing --- tests/tests/pbs_mux_refresh.rs | 95 ++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 26 deletions(-) diff --git a/tests/tests/pbs_mux_refresh.rs b/tests/tests/pbs_mux_refresh.rs index 185d57f1..a38215cc 100644 --- a/tests/tests/pbs_mux_refresh.rs +++ b/tests/tests/pbs_mux_refresh.rs @@ -10,9 +10,11 @@ use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ mock_relay::{MockRelayState, start_mock_relay_service}, mock_ssv::{SsvMockState, create_mock_ssv_server}, + mock_validator::MockValidator, utils::{generate_mock_relay, get_pbs_static_config, to_pbs_config}, }; use eyre::Result; +use reqwest::StatusCode; use tokio::sync::RwLock; use tracing::info; use url::Url; @@ -24,30 +26,46 @@ async fn test_auto_refresh() -> Result<()> { // This test reads the log files to verify behavior, so we can't attach a global // trace listener setup_test_env(); - let signer = random_secret(); - let pubkey = signer.public_key(); + // Generate 3 keys: one not in the mux relay, one in the relay, and one that + // hasn't been added yet but will be later. The existing key isn't used but is + // needed in the initial config since CB won't start a mux without at least one + // key. + let default_signer = random_secret(); + let default_pubkey = default_signer.public_key(); + let existing_mux_signer = random_secret(); + let existing_mux_pubkey = existing_mux_signer.public_key(); + let new_mux_signer = random_secret(); + let new_mux_pubkey = new_mux_signer.public_key(); let chain = Chain::Hoodi; let pbs_port = 3710; // Start the mock SSV API server - let mut next_port = pbs_port + 1; - let ssv_api_port = next_port; - next_port += 1; + let ssv_api_port = pbs_port + 1; let ssv_api_url = Url::parse(&format!("http://localhost:{ssv_api_port}"))?; let mock_ssv_state = SsvMockState { - validators: Arc::new(RwLock::new(vec![SSVValidator { pubkey: pubkey.clone() }])), + validators: Arc::new(RwLock::new(vec![SSVValidator { + pubkey: existing_mux_pubkey.clone(), + }])), force_timeout: Arc::new(RwLock::new(false)), }; let ssv_server_handle = create_mock_ssv_server(ssv_api_port, Some(mock_ssv_state.clone())).await?; + // Start a default relay for non-mux keys + let default_relay_port = ssv_api_port + 1; + let default_relay = generate_mock_relay(default_relay_port, default_pubkey.clone())?; + let default_relay_state = Arc::new(MockRelayState::new(chain, default_signer.clone())); + let default_relay_task = + tokio::spawn(start_mock_relay_service(default_relay_state.clone(), default_relay_port)); + // Start a mock relay to be used by the mux - let default_relay = generate_mock_relay(next_port, pubkey.clone())?; - let relay_id = default_relay.id.clone().to_string(); - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), next_port)); - next_port += 1; + let mux_relay_port = default_relay_port + 1; + let mux_relay = generate_mock_relay(mux_relay_port, default_pubkey.clone())?; + let mux_relay_id = mux_relay.id.clone().to_string(); + let mux_relay_state = Arc::new(MockRelayState::new(chain, default_signer)); + let mux_relay_task = + tokio::spawn(start_mock_relay_service(mux_relay_state.clone(), mux_relay_port)); // Create the registry mux let loader = MuxKeysLoader::Registry { @@ -57,11 +75,11 @@ async fn test_auto_refresh() -> Result<()> { }; let muxes = PbsMuxes { muxes: vec![MuxConfig { - id: relay_id.clone(), + id: mux_relay_id.clone(), loader: Some(loader), - late_in_slot_time_ms: Some(2000), - relays: vec![(*default_relay.config).clone()], - timeout_get_header_ms: Some(750), + late_in_slot_time_ms: Some(u64::MAX), + relays: vec![(*mux_relay.config).clone()], + timeout_get_header_ms: Some(u64::MAX - 1), validator_pubkeys: vec![], }], }; @@ -71,45 +89,70 @@ async fn test_auto_refresh() -> Result<()> { pbs_config.ssv_api_url = Some(ssv_api_url.clone()); pbs_config.mux_registry_refresh_interval_seconds = 1; // Refresh the mux every second let (mux_lookup, registry_muxes) = muxes.validate_and_fill(chain, &pbs_config).await?; - let relays = vec![default_relay.clone()]; + let relays = vec![default_relay.clone()]; // Default relay only let mut config = to_pbs_config(chain, pbs_config, relays); - config.all_relays = vec![default_relay.clone()]; + config.all_relays.push(mux_relay.clone()); // Add the mux relay to just this field config.mux_lookup = Some(mux_lookup); config.registry_muxes = Some(registry_muxes); // Run PBS service let state = PbsState::new(config); let pbs_server = tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - info!("Started PBS server with pubkey {pubkey}"); + info!("Started PBS server with pubkey {default_pubkey}"); + + // Wait for the server to start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Try to run a get_header on the new pubkey, which should use the default + // relay only since it hasn't been seen in the mux yet + let mock_validator = MockValidator::new(pbs_port)?; + info!("Sending get header"); + let res = mock_validator.do_get_header(Some(new_mux_pubkey.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(default_relay_state.received_get_header(), 1); // default relay was used + assert_eq!(mux_relay_state.received_get_header(), 0); // mux relay was not used // Wait for the first refresh to complete let wait_for_refresh_time = Duration::from_secs(2); tokio::time::sleep(wait_for_refresh_time).await; // Check the logs to ensure a refresh happened - assert!(logs_contain(&format!("fetched 1 pubkeys for registry mux {relay_id}"))); - assert!(!logs_contain(&format!("fetched 2 pubkeys for registry mux {relay_id}"))); + assert!(logs_contain(&format!("fetched 1 pubkeys for registry mux {mux_relay_id}"))); + assert!(!logs_contain(&format!("fetched 2 pubkeys for registry mux {mux_relay_id}"))); assert!(!logs_contain("adding new pubkey")); // Add another validator - let new_secret = random_secret(); - let new_pubkey = new_secret.public_key(); { let mut validators = mock_ssv_state.validators.write().await; - validators.push(SSVValidator { pubkey: new_pubkey.clone() }); - info!("Added new validator {new_pubkey} to the SSV mock server"); + validators.push(SSVValidator { pubkey: new_mux_pubkey.clone() }); + info!("Added new validator {new_mux_pubkey} to the SSV mock server"); } // Wait for the next refresh to complete tokio::time::sleep(wait_for_refresh_time).await; // Check the logs to ensure the new pubkey was added - assert!(logs_contain(&format!("adding new pubkey {new_pubkey} to mux {relay_id}"))); - assert!(logs_contain(&format!("fetched 2 pubkeys for registry mux {relay_id}"))); + assert!(logs_contain(&format!("adding new pubkey {new_mux_pubkey} to mux {mux_relay_id}"))); + assert!(logs_contain(&format!("fetched 2 pubkeys for registry mux {mux_relay_id}"))); + + // Try to run a get_header on the new pubkey - now it should use the mux relay + let res = mock_validator.do_get_header(Some(new_mux_pubkey.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(default_relay_state.received_get_header(), 1); // default relay was not used here + assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was used + + // Finally try to do a get_header with the old pubkey - it should only use the + // default relay + let res = mock_validator.do_get_header(Some(default_pubkey.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(default_relay_state.received_get_header(), 2); // default relay was used + assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used // Shut down the server handles pbs_server.abort(); ssv_server_handle.abort(); + default_relay_task.abort(); + mux_relay_task.abort(); Ok(()) } From d7e4ae81c70d7944de144cd062f53c92b989ff7e Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 14 Oct 2025 15:22:55 -0400 Subject: [PATCH 09/21] Clarified the example config --- config.example.toml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/config.example.toml b/config.example.toml index 1659275a..4f10861e 100644 --- a/config.example.toml +++ b/config.example.toml @@ -69,7 +69,10 @@ register_validator_retry_limit = 3 validator_registration_batch_size = "" # For any Registry-based Mux configurations that have dynamic pubkey # refreshing enabled, this is how often to refresh the list of pubkeys -# from the registry, in seconds +# from the registry, in seconds. Enabling registry refreshing is done per-mux +# with the mux's `enable_refreshing` property. If none of the muxes have it +# enabled, this value will not be used. +# OPTIONAL, DEFAULT: 384 mux_registry_refresh_interval_seconds = 384 # The PBS module needs one or more [[relays]] as defined below. From ef64d9d889ace513e945950b7e28c553726e124a Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 14 Oct 2025 15:26:03 -0400 Subject: [PATCH 10/21] Removed the Option from the SSV API URL --- crates/common/src/config/mux.rs | 6 +----- crates/common/src/config/pbs.rs | 6 +++--- tests/src/utils.rs | 3 ++- tests/tests/pbs_mux_refresh.rs | 2 +- 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index bbaff08c..8602cba4 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -211,7 +211,7 @@ impl MuxKeysLoader { &self, mux_id: &str, chain: Chain, - ssv_api_url: Option, + ssv_api_url: Url, rpc_url: Option, http_timeout: Duration, ) -> eyre::Result> { @@ -254,10 +254,6 @@ impl MuxKeysLoader { .await } NORegistry::SSV => { - let Some(ssv_api_url) = ssv_api_url else { - bail!("SSV registry requires SSV API URL to be set in the PBS config"); - }; - fetch_ssv_pubkeys( ssv_api_url, chain, diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index 27d4cfc0..b413da56 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -125,7 +125,7 @@ pub struct PbsConfig { pub rpc_url: Option, /// URL for the SSV network API #[serde(default = "default_ssv_api_url")] - pub ssv_api_url: Option, + pub ssv_api_url: Url, /// Timeout for HTTP requests in seconds #[serde(default = "default_u64::")] pub http_timeout_seconds: u64, @@ -398,6 +398,6 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC } /// Default URL for the SSV network API -fn default_ssv_api_url() -> Option { - Some(Url::parse("https://api.ssv.network/api/v4").expect("default URL is valid")) +fn default_ssv_api_url() -> Url { + Url::parse("https://api.ssv.network/api/v4").expect("default URL is valid") } diff --git a/tests/src/utils.rs b/tests/src/utils.rs index af4f80c3..58ef42cf 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -17,6 +17,7 @@ use cb_common::{ utils::{bls_pubkey_from_hex, default_host}, }; use eyre::Result; +use url::Url; pub fn get_local_address(port: u16) -> String { format!("http://0.0.0.0:{port}") @@ -78,7 +79,7 @@ pub fn get_pbs_static_config(port: u16) -> PbsConfig { min_bid_wei: U256::ZERO, late_in_slot_time_ms: u64::MAX, extra_validation_enabled: false, - ssv_api_url: None, + ssv_api_url: Url::parse("https://example.net").unwrap(), rpc_url: None, http_timeout_seconds: 10, register_validator_retry_limit: u32::MAX, diff --git a/tests/tests/pbs_mux_refresh.rs b/tests/tests/pbs_mux_refresh.rs index a38215cc..2c025a7a 100644 --- a/tests/tests/pbs_mux_refresh.rs +++ b/tests/tests/pbs_mux_refresh.rs @@ -86,7 +86,7 @@ async fn test_auto_refresh() -> Result<()> { // Set up the PBS config let mut pbs_config = get_pbs_static_config(pbs_port); - pbs_config.ssv_api_url = Some(ssv_api_url.clone()); + pbs_config.ssv_api_url = ssv_api_url.clone(); pbs_config.mux_registry_refresh_interval_seconds = 1; // Refresh the mux every second let (mux_lookup, registry_muxes) = muxes.validate_and_fill(chain, &pbs_config).await?; let relays = vec![default_relay.clone()]; // Default relay only From 3ec5bb40e8bfe51f03fb7fc8b50605fe20fc5671 Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 14 Oct 2025 15:44:46 -0400 Subject: [PATCH 11/21] Setting the registry refresh interval to 0 now errors out --- crates/pbs/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index b8a1ffeb..0b016204 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -31,7 +31,7 @@ impl PbsService { let registry_refresh_time = state.config.pbs_config.mux_registry_refresh_interval_seconds; let mut is_refreshing_required = false; if state.config.pbs_config.mux_registry_refresh_interval_seconds == 0 { - info!("registry mux refreshing interval is 0; refreshing is disabled"); + bail!("registry mux refreshing interval must be greater than 0"); } else if let Some(muxes) = &state.config.registry_muxes { is_refreshing_required = muxes.iter().any(|(loader, _)| { matches!(loader, MuxKeysLoader::Registry { enable_refreshing: true, .. }) From e9a14d573001dd3b3e30864a51a8e098652583c7 Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 14 Oct 2025 16:01:50 -0400 Subject: [PATCH 12/21] Clarified registry uniqueness in the example config --- config.example.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/config.example.toml b/config.example.toml index 4f10861e..5f55ba8a 100644 --- a/config.example.toml +++ b/config.example.toml @@ -137,6 +137,10 @@ validator_pubkeys = [ # - Lido: NodeOperatorsRegistry # - SSV: SSV API # You can toggle the 'enable_refreshing' flag to let this registry periodically query Lido or SSV and refresh the list of validator pubkeys belonging to the corresponding operator. +# Each of these registry entries must be unique: +# - There can only be one Lido entry with a given Lido node operator ID. +# - There can only be one SSV entry with a given SSV node operator ID. +# - A Lido entry can have the same node operator ID as an SSV entry if they happen to coincide; they're treated as separate entities. # # Example JSON list: # [ From e60a1f8ebbe53e64c8a1089f90fe4e9e941b9d29 Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 14 Oct 2025 16:06:39 -0400 Subject: [PATCH 13/21] Removed some verbosity from the new pubkey message --- crates/pbs/src/service.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 0b016204..f9138d01 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -140,7 +140,8 @@ impl PbsService { } // Log the new pubkeys for (pubkey, runtime_config) in new_pubkeys.iter() { - info!("adding new pubkey {pubkey} to mux {}", runtime_config.id); + debug!("adding new pubkey {pubkey} to mux {}", runtime_config.id); + info!("discovered new pubkey {pubkey} from a registry"); } { // Since config isn't an RwLock, the option with the least amount of code churn From c5534c743b2f9b10fd372dcdd79e1f0acfe8a0be Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 14 Oct 2025 17:29:09 -0400 Subject: [PATCH 14/21] Added removed key support to registry refreshing --- crates/pbs/src/service.rs | 67 +++++++++++++++++++++++++++------- tests/tests/pbs_mux_refresh.rs | 23 +++++++++++- 2 files changed, 75 insertions(+), 15 deletions(-) diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index f9138d01..6ad155fb 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; use cb_common::{ config::{MuxKeysLoader, PbsModuleConfig}, @@ -82,6 +86,7 @@ impl PbsService { async fn refresh_registry_muxes(state: PbsStateGuard) { // Read-only portion let mut new_pubkeys = HashMap::new(); + let mut removed_pubkeys = HashSet::new(); { let state = state.read().await; let config = &state.config; @@ -121,9 +126,26 @@ impl PbsService { ); // Add any new pubkeys to the new lookup table + let mut pubkey_set = HashSet::new(); for pubkey in pubkeys { - if mux_lookup.get(&pubkey).is_none() { - new_pubkeys.insert(pubkey, runtime_config.clone()); + pubkey_set.insert(pubkey.clone()); + match mux_lookup.get(&pubkey) { + Some(_) => { + // Pubkey already existed + } + None => { + // New pubkey + new_pubkeys.insert(pubkey.clone(), runtime_config.clone()); + } + } + } + + // Find any pubkeys that were removed + for (pubkey, existing_runtime) in mux_lookup.iter() { + if existing_runtime.id == runtime_config.id && + !pubkey_set.contains(pubkey) + { + removed_pubkeys.insert(pubkey.clone()); } } } @@ -134,15 +156,27 @@ impl PbsService { } } + // Report changes + let mut no_new_changes = true; + if !new_pubkeys.is_empty() { + no_new_changes = false; + info!("discovered {} new pubkeys from registries", new_pubkeys.len()); + for (pubkey, runtime_config) in new_pubkeys.iter() { + debug!("adding new pubkey {pubkey} to mux {}", runtime_config.id); + } + } + if !removed_pubkeys.is_empty() { + no_new_changes = false; + info!("registries have removed {} old pubkeys", removed_pubkeys.len()); + for pubkey in removed_pubkeys.iter() { + debug!("removing old pubkey {pubkey} from mux lookup"); + } + } + // Write portion - if new_pubkeys.is_empty() { + if no_new_changes { return; } - // Log the new pubkeys - for (pubkey, runtime_config) in new_pubkeys.iter() { - debug!("adding new pubkey {pubkey} to mux {}", runtime_config.id); - info!("discovered new pubkey {pubkey} from a registry"); - } { // Since config isn't an RwLock, the option with the least amount of code churn // is to just clone the whole config and replace the mux_lookup @@ -150,12 +184,17 @@ impl PbsService { // operation. let mut state = state.write().await; let config = state.config.as_ref(); - let new_mux_lookup = match &config.mux_lookup { - Some(existing) => { - new_pubkeys.extend(existing.iter().map(|(k, v)| (k.clone(), v.clone()))); - new_pubkeys + let new_mux_lookup = if let Some(existing) = &config.mux_lookup { + let mut map = HashMap::new(); + for (k, v) in existing.iter() { + if !removed_pubkeys.contains(k) { + map.insert(k.clone(), v.clone()); + } } - None => new_pubkeys, + map.extend(new_pubkeys); + map + } else { + new_pubkeys }; state.config = Arc::new(PbsModuleConfig { mux_lookup: Some(new_mux_lookup), ..config.clone() }); diff --git a/tests/tests/pbs_mux_refresh.rs b/tests/tests/pbs_mux_refresh.rs index 2c025a7a..024f20b1 100644 --- a/tests/tests/pbs_mux_refresh.rs +++ b/tests/tests/pbs_mux_refresh.rs @@ -141,13 +141,34 @@ async fn test_auto_refresh() -> Result<()> { assert_eq!(default_relay_state.received_get_header(), 1); // default relay was not used here assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was used - // Finally try to do a get_header with the old pubkey - it should only use the + // Now try to do a get_header with the old pubkey - it should only use the // default relay let res = mock_validator.do_get_header(Some(default_pubkey.clone())).await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 2); // default relay was used assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used + // Finally, remove the original mux pubkey from the SSV server + assert!(!logs_contain(&format!("removing old pubkey {existing_mux_pubkey} from mux lookup"))); + { + let mut validators = mock_ssv_state.validators.write().await; + validators.retain(|v| v.pubkey != existing_mux_pubkey); + info!("Removed existing validator {existing_mux_pubkey} from the SSV mock server"); + } + + // Wait for the next refresh to complete + tokio::time::sleep(wait_for_refresh_time).await; + + // Check the logs to ensure the existing pubkey was removed + assert!(logs_contain(&format!("removing old pubkey {existing_mux_pubkey} from mux lookup"))); + + // Try to do a get_header with the removed pubkey - it should only use the + // default relay + let res = mock_validator.do_get_header(Some(existing_mux_pubkey.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(default_relay_state.received_get_header(), 3); // default relay was used + assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used + // Shut down the server handles pbs_server.abort(); ssv_server_handle.abort(); From 6c788f428f4ce937d4a1eba596a96f8507c6433f Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 14 Oct 2025 17:38:55 -0400 Subject: [PATCH 15/21] Moved registry refresh interval into validation --- crates/common/src/config/pbs.rs | 5 +++++ crates/pbs/src/service.rs | 4 +--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index b413da56..c2c30d2f 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -191,6 +191,11 @@ impl PbsConfig { } } + ensure!( + self.mux_registry_refresh_interval_seconds > 0, + "registry mux refreshing interval must be greater than 0" + ); + Ok(()) } } diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 6ad155fb..7cb82703 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -34,9 +34,7 @@ impl PbsService { // Check if refreshing registry muxes is required let registry_refresh_time = state.config.pbs_config.mux_registry_refresh_interval_seconds; let mut is_refreshing_required = false; - if state.config.pbs_config.mux_registry_refresh_interval_seconds == 0 { - bail!("registry mux refreshing interval must be greater than 0"); - } else if let Some(muxes) = &state.config.registry_muxes { + if let Some(muxes) = &state.config.registry_muxes { is_refreshing_required = muxes.iter().any(|(loader, _)| { matches!(loader, MuxKeysLoader::Registry { enable_refreshing: true, .. }) }); From f7fa708bea45ddd151421792ab23cd702c2d91ba Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Tue, 14 Oct 2025 17:45:54 -0400 Subject: [PATCH 16/21] Made the default SSV URL more clear in the example config --- config.example.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.example.toml b/config.example.toml index 5f55ba8a..2bcd0efe 100644 --- a/config.example.toml +++ b/config.example.toml @@ -56,7 +56,7 @@ extra_validation_enabled = false # OPTIONAL # rpc_url = "https://ethereum-holesky-rpc.publicnode.com" # URL of the SSV API server to use, if you have a mux that targets an SSV node operator -# OPTIONAL +# OPTIONAL, DEFAULT: "https://api.ssv.network/api/v4" # ssv_api_url = "https://api.ssv.network/api/v4" # Timeout for any HTTP requests sent from the PBS module to other services, in seconds # OPTIONAL, DEFAULT: 10 From 978c573de22c2deae5b84f24d826fa8e5356fe37 Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Mon, 20 Oct 2025 08:11:59 -0400 Subject: [PATCH 17/21] Update crates/pbs/src/service.rs Co-authored-by: ltitanb <163874448+ltitanb@users.noreply.github.com> --- crates/pbs/src/service.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 7cb82703..fcfa03f2 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -33,12 +33,11 @@ impl PbsService { // Check if refreshing registry muxes is required let registry_refresh_time = state.config.pbs_config.mux_registry_refresh_interval_seconds; - let mut is_refreshing_required = false; - if let Some(muxes) = &state.config.registry_muxes { - is_refreshing_required = muxes.iter().any(|(loader, _)| { + llet is_refreshing_required = state.config.registry_muxes.as_ref().is_some_and(|muxes| { + muxes.iter().any(|(loader, _)| { matches!(loader, MuxKeysLoader::Registry { enable_refreshing: true, .. }) - }); - } + }) + }); let state: Arc>> = RwLock::new(state).into(); let app = create_app_router::(state.clone()); From 3651350858473d261b8f03cd7332998f06b81abf Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Mon, 20 Oct 2025 08:13:01 -0400 Subject: [PATCH 18/21] Update crates/pbs/src/service.rs Co-authored-by: ltitanb <163874448+ltitanb@users.noreply.github.com> --- crates/pbs/src/service.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index fcfa03f2..a25658cf 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -126,14 +126,10 @@ impl PbsService { let mut pubkey_set = HashSet::new(); for pubkey in pubkeys { pubkey_set.insert(pubkey.clone()); - match mux_lookup.get(&pubkey) { - Some(_) => { - // Pubkey already existed - } - None => { - // New pubkey - new_pubkeys.insert(pubkey.clone(), runtime_config.clone()); - } + if mux_lookup.get(&pubkey).is_none() { + // New pubkey + new_pubkeys.insert(pubkey.clone(), runtime_config.clone()); + } } } From 9bf1ea9dc609e8301c2642ba864ccac3a819926d Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Mon, 20 Oct 2025 08:16:49 -0400 Subject: [PATCH 19/21] Fixed some suggestion typos --- crates/pbs/src/service.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index a25658cf..5d732143 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -33,7 +33,7 @@ impl PbsService { // Check if refreshing registry muxes is required let registry_refresh_time = state.config.pbs_config.mux_registry_refresh_interval_seconds; - llet is_refreshing_required = state.config.registry_muxes.as_ref().is_some_and(|muxes| { + let is_refreshing_required = state.config.registry_muxes.as_ref().is_some_and(|muxes| { muxes.iter().any(|(loader, _)| { matches!(loader, MuxKeysLoader::Registry { enable_refreshing: true, .. }) }) @@ -130,7 +130,6 @@ impl PbsService { // New pubkey new_pubkeys.insert(pubkey.clone(), runtime_config.clone()); } - } } // Find any pubkeys that were removed From c9587a23ba573f8f03b594f75677e4215add5701 Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Mon, 20 Oct 2025 09:17:44 -0400 Subject: [PATCH 20/21] Removed verbose logging --- crates/pbs/src/service.rs | 6 ------ tests/tests/pbs_mux_refresh.rs | 5 ----- 2 files changed, 11 deletions(-) diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 5d732143..0c630c4c 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -153,16 +153,10 @@ impl PbsService { if !new_pubkeys.is_empty() { no_new_changes = false; info!("discovered {} new pubkeys from registries", new_pubkeys.len()); - for (pubkey, runtime_config) in new_pubkeys.iter() { - debug!("adding new pubkey {pubkey} to mux {}", runtime_config.id); - } } if !removed_pubkeys.is_empty() { no_new_changes = false; info!("registries have removed {} old pubkeys", removed_pubkeys.len()); - for pubkey in removed_pubkeys.iter() { - debug!("removing old pubkey {pubkey} from mux lookup"); - } } // Write portion diff --git a/tests/tests/pbs_mux_refresh.rs b/tests/tests/pbs_mux_refresh.rs index 024f20b1..431fd84d 100644 --- a/tests/tests/pbs_mux_refresh.rs +++ b/tests/tests/pbs_mux_refresh.rs @@ -132,7 +132,6 @@ async fn test_auto_refresh() -> Result<()> { tokio::time::sleep(wait_for_refresh_time).await; // Check the logs to ensure the new pubkey was added - assert!(logs_contain(&format!("adding new pubkey {new_mux_pubkey} to mux {mux_relay_id}"))); assert!(logs_contain(&format!("fetched 2 pubkeys for registry mux {mux_relay_id}"))); // Try to run a get_header on the new pubkey - now it should use the mux relay @@ -149,7 +148,6 @@ async fn test_auto_refresh() -> Result<()> { assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used // Finally, remove the original mux pubkey from the SSV server - assert!(!logs_contain(&format!("removing old pubkey {existing_mux_pubkey} from mux lookup"))); { let mut validators = mock_ssv_state.validators.write().await; validators.retain(|v| v.pubkey != existing_mux_pubkey); @@ -159,9 +157,6 @@ async fn test_auto_refresh() -> Result<()> { // Wait for the next refresh to complete tokio::time::sleep(wait_for_refresh_time).await; - // Check the logs to ensure the existing pubkey was removed - assert!(logs_contain(&format!("removing old pubkey {existing_mux_pubkey} from mux lookup"))); - // Try to do a get_header with the removed pubkey - it should only use the // default relay let res = mock_validator.do_get_header(Some(existing_mux_pubkey.clone())).await?; From 59311fba9510680d900fff89c20d6830f8968452 Mon Sep 17 00:00:00 2001 From: Joe Clapis Date: Mon, 20 Oct 2025 17:28:24 -0400 Subject: [PATCH 21/21] Changed tokio RwLock to parking_lot --- crates/pbs/src/routes/get_header.rs | 4 ++-- crates/pbs/src/routes/register_validator.rs | 4 ++-- crates/pbs/src/routes/reload.rs | 6 +++--- crates/pbs/src/routes/status.rs | 4 ++-- crates/pbs/src/routes/submit_block.rs | 4 ++-- crates/pbs/src/service.rs | 14 +++++++++++--- crates/pbs/src/state.rs | 2 +- examples/status_api/src/main.rs | 5 +---- 8 files changed, 24 insertions(+), 19 deletions(-) diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 4f849a69..9ed312af 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -28,14 +28,14 @@ pub async fn handle_get_header>( tracing::Span::current().record("parent_hash", tracing::field::debug(params.parent_hash)); tracing::Span::current().record("validator", tracing::field::debug(¶ms.pubkey)); - let state = state.read().await.clone(); + let state = state.read().clone(); let ua = get_user_agent(&req_headers); let ms_into_slot = ms_into_slot(params.slot, state.config.chain); info!(ua, ms_into_slot, "new request"); - match A::get_header(params, req_headers, state.clone()).await { + match A::get_header(params, req_headers, state).await { Ok(res) => { if let Some(max_bid) = res { info!(value_eth = format_ether(*max_bid.data.message.value()), block_hash =% max_bid.block_hash(), "received header"); diff --git a/crates/pbs/src/routes/register_validator.rs b/crates/pbs/src/routes/register_validator.rs index ba5b3466..51c8ce6e 100644 --- a/crates/pbs/src/routes/register_validator.rs +++ b/crates/pbs/src/routes/register_validator.rs @@ -16,7 +16,7 @@ pub async fn handle_register_validator>( req_headers: HeaderMap, Json(registrations): Json>, ) -> Result { - let state = state.read().await.clone(); + let state = state.read().clone(); trace!(?registrations); @@ -24,7 +24,7 @@ pub async fn handle_register_validator>( info!(ua, num_registrations = registrations.len(), "new request"); - if let Err(err) = A::register_validator(registrations, req_headers, state.clone()).await { + if let Err(err) = A::register_validator(registrations, req_headers, state).await { error!(%err, "all relays failed registration"); let err = PbsClientError::NoResponse; diff --git a/crates/pbs/src/routes/reload.rs b/crates/pbs/src/routes/reload.rs index 4f7e6d5e..aa031d47 100644 --- a/crates/pbs/src/routes/reload.rs +++ b/crates/pbs/src/routes/reload.rs @@ -14,17 +14,17 @@ pub async fn handle_reload>( req_headers: HeaderMap, State(state): State>, ) -> Result { - let prev_state = state.read().await.clone(); + let prev_state = state.read().clone(); let ua = get_user_agent(&req_headers); info!(ua, relay_check = prev_state.config.pbs_config.relay_check); - match A::reload(prev_state.clone()).await { + match A::reload(prev_state).await { Ok(new_state) => { info!("config reload successful"); - *state.write().await = new_state; + *state.write() = new_state; BEACON_NODE_STATUS.with_label_values(&["200", RELOAD_ENDPOINT_TAG]).inc(); Ok((StatusCode::OK, "OK")) diff --git a/crates/pbs/src/routes/status.rs b/crates/pbs/src/routes/status.rs index b6ac0f91..52fd3e2f 100644 --- a/crates/pbs/src/routes/status.rs +++ b/crates/pbs/src/routes/status.rs @@ -15,13 +15,13 @@ pub async fn handle_get_status>( req_headers: HeaderMap, State(state): State>, ) -> Result { - let state = state.read().await.clone(); + let state = state.read().clone(); let ua = get_user_agent(&req_headers); info!(ua, relay_check = state.config.pbs_config.relay_check, "new request"); - match A::get_status(req_headers, state.clone()).await { + match A::get_status(req_headers, state).await { Ok(_) => { info!("relay check successful"); diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index f336e487..4784e6b1 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -55,7 +55,7 @@ async fn handle_submit_block_impl>( tracing::Span::current() .record("parent_hash", tracing::field::debug(signed_blinded_block.parent_hash())); - let state = state.read().await.clone(); + let state = state.read().clone(); let now = utcnow_ms(); let slot = signed_blinded_block.slot(); @@ -65,7 +65,7 @@ async fn handle_submit_block_impl>( info!(ua, ms_into_slot = now.saturating_sub(slot_start_ms), "new request"); - match A::submit_block(signed_blinded_block, req_headers, state.clone(), &api_version).await { + match A::submit_block(signed_blinded_block, req_headers, state, &api_version).await { Ok(res) => match res { Some(block_response) => { trace!(?block_response); diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 0c630c4c..6659ae85 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -12,8 +12,9 @@ use cb_common::{ }; use cb_metrics::provider::MetricsProvider; use eyre::{Context, Result, bail}; +use parking_lot::RwLock; use prometheus::core::Collector; -use tokio::{net::TcpListener, sync::RwLock}; +use tokio::net::TcpListener; use tracing::{debug, info, warn}; use url::Url; @@ -61,9 +62,16 @@ impl PbsService { // Run the registry refresher task if is_refreshing_required { let mut interval = tokio::time::interval(Duration::from_secs(registry_refresh_time)); + let state = state.clone(); tokio::spawn(async move { + let mut is_first_tick = true; loop { interval.tick().await; + if is_first_tick { + // Don't run immediately on the first tick, since it was just initialized + is_first_tick = false; + continue; + } Self::refresh_registry_muxes(state.clone()).await; } }); @@ -85,7 +93,7 @@ impl PbsService { let mut new_pubkeys = HashMap::new(); let mut removed_pubkeys = HashSet::new(); { - let state = state.read().await; + let state = state.read().clone(); let config = &state.config; // Short circuit if there aren't any registry muxes with dynamic refreshing @@ -168,7 +176,7 @@ impl PbsService { // is to just clone the whole config and replace the mux_lookup // field. Cloning the config may be expensive, but this should be a fairly rare // operation. - let mut state = state.write().await; + let mut state = state.write(); let config = state.config.as_ref(); let new_mux_lookup = if let Some(existing) = &config.mux_lookup { let mut map = HashMap::new(); diff --git a/crates/pbs/src/state.rs b/crates/pbs/src/state.rs index 63b3027a..dd0e118e 100644 --- a/crates/pbs/src/state.rs +++ b/crates/pbs/src/state.rs @@ -5,7 +5,7 @@ use cb_common::{ pbs::RelayClient, types::BlsPublicKey, }; -use tokio::sync::RwLock; +use parking_lot::RwLock; pub trait BuilderApiState: Clone + Sync + Send + 'static {} impl BuilderApiState for () {} diff --git a/examples/status_api/src/main.rs b/examples/status_api/src/main.rs index 748fd77f..7ad9b533 100644 --- a/examples/status_api/src/main.rs +++ b/examples/status_api/src/main.rs @@ -81,10 +81,7 @@ impl BuilderApi for MyBuilderApi { } async fn handle_check(State(state): State>) -> Response { - ( - StatusCode::OK, - format!("Received {count} status requests!", count = state.read().await.data.get()), - ) + (StatusCode::OK, format!("Received {count} status requests!", count = state.read().data.get())) .into_response() }