diff --git a/Cargo.lock b/Cargo.lock index e3fc4fa8..906a8507 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1732,6 +1732,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "tracing-test", "tree_hash", "types", "url", @@ -2747,7 +2748,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.0", ] [[package]] @@ -5451,7 +5452,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.0", ] [[package]] @@ -6271,7 +6272,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.0", ] [[package]] @@ -6809,6 +6810,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.106", +] + [[package]] name = "tree_hash" version = "0.10.0" diff --git a/config.example.toml b/config.example.toml index ad0c5340..2bcd0efe 100644 --- a/config.example.toml +++ b/config.example.toml @@ -55,6 +55,9 @@ extra_validation_enabled = false # Execution Layer RPC url to use for extra validation # OPTIONAL # rpc_url = "https://ethereum-holesky-rpc.publicnode.com" +# URL of the SSV API server to use, if you have a mux that targets an SSV node operator +# OPTIONAL, DEFAULT: "https://api.ssv.network/api/v4" +# ssv_api_url = "https://api.ssv.network/api/v4" # Timeout for any HTTP requests sent from the PBS module to other services, in seconds # OPTIONAL, DEFAULT: 10 http_timeout_seconds = 10 @@ -64,6 +67,13 @@ register_validator_retry_limit = 3 # Maximum number of validators to register in a single request. # OPTIONAL, DEFAULT: "" (unlimited) validator_registration_batch_size = "" +# For any Registry-based Mux configurations that have dynamic pubkey +# refreshing enabled, this is how often to refresh the list of pubkeys +# from the registry, in seconds. Enabling registry refreshing is done per-mux +# with the mux's `enable_refreshing` property. If none of the muxes have it +# enabled, this value will not be used. +# OPTIONAL, DEFAULT: 384 +mux_registry_refresh_interval_seconds = 384 # The PBS module needs one or more [[relays]] as defined below. [[relays]] @@ -126,6 +136,11 @@ validator_pubkeys = [ # - Registry: details of a registry to load keys from. Supported registries: # - Lido: NodeOperatorsRegistry # - SSV: SSV API +# You can toggle the 'enable_refreshing' flag to let this registry periodically query Lido or SSV and refresh the list of validator pubkeys belonging to the corresponding operator. +# Each of these registry entries must be unique: +# - There can only be one Lido entry with a given Lido node operator ID. +# - There can only be one SSV entry with a given SSV node operator ID. +# - A Lido entry can have the same node operator ID as an SSV entry if they happen to coincide; they're treated as separate entities. # # Example JSON list: # [ @@ -135,8 +150,8 @@ validator_pubkeys = [ # OPTIONAL loader = "./tests/data/mux_keys.example.json" # loader = { url = "http://localhost:8000/keys" } -# loader = { registry = "lido", node_operator_id = 8 } -# loader = { registry = "ssv", node_operator_id = 8 } +# loader = { registry = "lido", node_operator_id = 8, enable_refreshing = false } +# loader = { registry = "ssv", node_operator_id = 8, enable_refreshing = false } late_in_slot_time_ms = 1500 timeout_get_header_ms = 900 # For each mux, one or more [[mux.relays]] can be defined, which will be used for the matching validator pubkeys diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index ea5c0199..57a5fbb3 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -5,6 +5,9 @@ publish = false rust-version.workspace = true version.workspace = true +[features] +testing-flags = [] + [dependencies] aes.workspace = true alloy.workspace = true diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 346eaf78..8602cba4 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -1,5 +1,6 @@ use std::{ collections::{HashMap, HashSet}, + hash::Hash, path::{Path, PathBuf}, sync::Arc, time::Duration, @@ -14,18 +15,20 @@ use alloy::{ }; use eyre::{Context, bail, ensure}; use reqwest::Client; -use serde::{Deserialize, Deserializer, Serialize}; +use serde::{Deserialize, Serialize}; use tracing::{debug, info, warn}; use url::Url; use super::{MUX_PATH_ENV, PbsConfig, RelayConfig, load_optional_env_var}; use crate::{ config::{remove_duplicate_keys, safe_read_http_response}, + interop::ssv::utils::fetch_ssv_pubkeys_from_url, pbs::RelayClient, types::{BlsPublicKey, Chain}, + utils::default_bool, }; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct PbsMuxes { /// List of PBS multiplexers #[serde(rename = "mux")] @@ -44,7 +47,10 @@ impl PbsMuxes { self, chain: Chain, default_pbs: &PbsConfig, - ) -> eyre::Result> { + ) -> eyre::Result<( + HashMap, + HashMap, + )> { let http_timeout = Duration::from_secs(default_pbs.http_timeout_seconds); let mut muxes = self.muxes; @@ -53,8 +59,15 @@ impl PbsMuxes { ensure!(!mux.relays.is_empty(), "mux config {} must have at least one relay", mux.id); if let Some(loader) = &mux.loader { - let extra_keys = - loader.load(&mux.id, chain, default_pbs.rpc_url.clone(), http_timeout).await?; + let extra_keys = loader + .load( + &mux.id, + chain, + default_pbs.ssv_api_url.clone(), + default_pbs.rpc_url.clone(), + http_timeout, + ) + .await?; mux.validator_pubkeys.extend(extra_keys); } @@ -76,6 +89,7 @@ impl PbsMuxes { } let mut configs = HashMap::new(); + let mut registry_muxes = HashMap::new(); // fill the configs using the default pbs config and relay entries for mux in muxes { info!( @@ -102,18 +116,30 @@ impl PbsMuxes { config.validate(chain).await?; let config = Arc::new(config); + // Build the map of pubkeys to mux configs let runtime_config = RuntimeMuxConfig { id: mux.id, config, relays: relay_clients }; for pubkey in mux.validator_pubkeys.into_iter() { configs.insert(pubkey, runtime_config.clone()); } + + // Track registry muxes with refreshing enabled + if let Some(loader) = &mux.loader && + let MuxKeysLoader::Registry { enable_refreshing: true, .. } = loader + { + info!( + "mux {} uses registry loader with dynamic refreshing enabled", + runtime_config.id + ); + registry_muxes.insert(loader.clone(), runtime_config.clone()); + } } - Ok(configs) + Ok((configs, registry_muxes)) } } /// Configuration for the PBS Multiplexer -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct MuxConfig { /// Identifier for this mux config pub id: String, @@ -156,7 +182,7 @@ impl MuxConfig { } } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Hash)] #[serde(untagged)] pub enum MuxKeysLoader { /// A file containing a list of validator pubkeys @@ -167,10 +193,12 @@ pub enum MuxKeysLoader { Registry { registry: NORegistry, node_operator_id: u64, + #[serde(default = "default_bool::")] + enable_refreshing: bool, }, } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Hash)] pub enum NORegistry { #[serde(alias = "lido")] Lido, @@ -183,6 +211,7 @@ impl MuxKeysLoader { &self, mux_id: &str, chain: Chain, + ssv_api_url: Url, rpc_url: Option, http_timeout: Duration, ) -> eyre::Result> { @@ -210,7 +239,7 @@ impl MuxKeysLoader { .wrap_err("failed to fetch mux keys from HTTP endpoint") } - Self::Registry { registry, node_operator_id } => match registry { + Self::Registry { registry, node_operator_id, enable_refreshing: _ } => match registry { NORegistry::Lido => { let Some(rpc_url) = rpc_url else { bail!("Lido registry requires RPC URL to be set in the PBS config"); @@ -225,7 +254,13 @@ impl MuxKeysLoader { .await } NORegistry::SSV => { - fetch_ssv_pubkeys(chain, U256::from(*node_operator_id), http_timeout).await + fetch_ssv_pubkeys( + ssv_api_url, + chain, + U256::from(*node_operator_id), + http_timeout, + ) + .await } }, }?; @@ -334,6 +369,7 @@ async fn fetch_lido_registry_keys( } async fn fetch_ssv_pubkeys( + api_url: Url, chain: Chain, node_operator_id: U256, http_timeout: Duration, @@ -351,11 +387,12 @@ async fn fetch_ssv_pubkeys( let mut page = 1; loop { - let url = format!( - "https://api.ssv.network/api/v4/{chain_name}/validators/in_operator/{node_operator_id}?perPage={MAX_PER_PAGE}&page={page}", + let route = format!( + "{chain_name}/validators/in_operator/{node_operator_id}?perPage={MAX_PER_PAGE}&page={page}", ); + let url = api_url.join(&route).wrap_err("failed to construct SSV API URL")?; - let response = fetch_ssv_pubkeys_from_url(&url, http_timeout).await?; + let response = fetch_ssv_pubkeys_from_url(url, http_timeout).await?; let fetched = response.validators.len(); pubkeys.extend( response.validators.into_iter().map(|v| v.pubkey).collect::>(), @@ -376,74 +413,12 @@ async fn fetch_ssv_pubkeys( Ok(pubkeys) } -async fn fetch_ssv_pubkeys_from_url( - url: &str, - http_timeout: Duration, -) -> eyre::Result { - let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?; - let response = client.get(url).send().await.map_err(|e| { - if e.is_timeout() { - eyre::eyre!("Request to SSV network API timed out: {e}") - } else { - eyre::eyre!("Error sending request to SSV network API: {e}") - } - })?; - - // Parse the response as JSON - let body_bytes = safe_read_http_response(response).await?; - serde_json::from_slice::(&body_bytes).wrap_err("failed to parse SSV response") -} - -#[derive(Deserialize)] -struct SSVResponse { - validators: Vec, - pagination: SSVPagination, -} - -struct SSVValidator { - pubkey: BlsPublicKey, -} - -impl<'de> Deserialize<'de> for SSVValidator { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - #[derive(Deserialize)] - struct SSVValidator { - public_key: String, - } - - let s = SSVValidator::deserialize(deserializer)?; - let bytes = alloy::hex::decode(&s.public_key).map_err(serde::de::Error::custom)?; - let pubkey = BlsPublicKey::deserialize(&bytes) - .map_err(|e| serde::de::Error::custom(format!("invalid BLS public key: {e:?}")))?; - - Ok(Self { pubkey }) - } -} - -#[derive(Deserialize)] -struct SSVPagination { - total: usize, -} - #[cfg(test)] mod tests { - use std::net::SocketAddr; - use alloy::{primitives::U256, providers::ProviderBuilder}; - use axum::{response::Response, routing::get}; - use tokio::{net::TcpListener, task::JoinHandle}; use url::Url; use super::*; - use crate::{ - config::{HTTP_TIMEOUT_SECONDS_DEFAULT, MUXER_HTTP_MAX_LENGTH}, - utils::{ResponseReadError, bls_pubkey_from_hex_unchecked, set_ignore_content_length}, - }; - - const TEST_HTTP_TIMEOUT: u64 = 2; #[tokio::test] async fn test_lido_registry_address() -> eyre::Result<()> { @@ -479,185 +454,4 @@ mod tests { Ok(()) } - - #[tokio::test] - /// Tests that a successful SSV network fetch is handled and parsed properly - async fn test_ssv_network_fetch() -> eyre::Result<()> { - // Start the mock server - let port = 30100; - let _server_handle = create_mock_server(port).await?; - let url = format!("http://localhost:{port}/ssv"); - let response = - fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)) - .await?; - - // Make sure the response is correct - // NOTE: requires that ssv_data.json dpesn't change - assert_eq!(response.validators.len(), 3); - let expected_pubkeys = [ - bls_pubkey_from_hex_unchecked( - "967ba17a3e7f82a25aa5350ec34d6923e28ad8237b5a41efe2c5e325240d74d87a015bf04634f21900963539c8229b2a", - ), - bls_pubkey_from_hex_unchecked( - "ac769e8cec802e8ffee34de3253be8f438a0c17ee84bdff0b6730280d24b5ecb77ebc9c985281b41ee3bda8663b6658c", - ), - bls_pubkey_from_hex_unchecked( - "8c866a5a05f3d45c49b457e29365259021a509c5daa82e124f9701a960ee87b8902e87175315ab638a3d8b1115b23639", - ), - ]; - for (i, validator) in response.validators.iter().enumerate() { - assert_eq!(validator.pubkey, expected_pubkeys[i]); - } - - // Clean up the server handle - _server_handle.abort(); - - Ok(()) - } - - #[tokio::test] - /// Tests that the SSV network fetch is handled properly when the response's - /// body is too large - async fn test_ssv_network_fetch_big_data() -> eyre::Result<()> { - // Start the mock server - let port = 30101; - let _server_handle = create_mock_server(port).await?; - let url = format!("http://localhost:{port}/big_data"); - let response = fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(120)).await; - - // The response should fail due to content length being too big - match response { - Ok(_) => { - panic!("Expected an error due to big content length, but got a successful response") - } - Err(e) => match e.downcast_ref::() { - Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { - assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); - assert!(*content_length > MUXER_HTTP_MAX_LENGTH); - assert!(raw.is_empty()); - } - _ => panic!("Expected PayloadTooLarge error, got: {}", e), - }, - } - - // Clean up the server handle - _server_handle.abort(); - - Ok(()) - } - - #[tokio::test] - /// Tests that the SSV network fetch is handled properly when the request - /// times out - async fn test_ssv_network_fetch_timeout() -> eyre::Result<()> { - // Start the mock server - let port = 30102; - let _server_handle = create_mock_server(port).await?; - let url = format!("http://localhost:{port}/timeout"); - let response = - fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; - - // The response should fail due to timeout - assert!(response.is_err(), "Expected timeout error, but got success"); - if let Err(e) = response { - assert!(e.to_string().contains("timed out"), "Expected timeout error, got: {}", e); - } - - // Clean up the server handle - _server_handle.abort(); - - Ok(()) - } - - #[tokio::test] - /// Tests that the SSV network fetch is handled properly when the response's - /// content-length header is missing - async fn test_ssv_network_fetch_big_data_without_content_length() -> eyre::Result<()> { - // Start the mock server - let port = 30103; - set_ignore_content_length(true); - let _server_handle = create_mock_server(port).await?; - let url = format!("http://localhost:{port}/big_data"); - let response = fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(120)).await; - - // The response should fail due to the body being too big - match response { - Ok(_) => { - panic!("Expected an error due to excessive data, but got a successful response") - } - Err(e) => match e.downcast_ref::() { - Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { - assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); - assert_eq!(*content_length, 0); - assert!(!raw.is_empty()); - } - _ => panic!("Expected PayloadTooLarge error, got: {}", e), - }, - } - - // Clean up the server handle - _server_handle.abort(); - - Ok(()) - } - - /// Creates a simple mock server to simulate the SSV API endpoint under - /// various conditions for testing - async fn create_mock_server(port: u16) -> Result, axum::Error> { - let router = axum::Router::new() - .route("/ssv", get(handle_ssv)) - .route("/big_data", get(handle_big_data)) - .route("/timeout", get(handle_timeout)) - .into_make_service(); - - let address = SocketAddr::from(([127, 0, 0, 1], port)); - let listener = TcpListener::bind(address).await.map_err(axum::Error::new)?; - let server = axum::serve(listener, router).with_graceful_shutdown(async { - tokio::signal::ctrl_c().await.expect("Failed to listen for shutdown signal"); - }); - let result = Ok(tokio::spawn(async move { - if let Err(e) = server.await { - eprintln!("Server error: {}", e); - } - })); - info!("Mock server started on http://localhost:{port}/"); - result - } - - /// Sends the good SSV JSON data to the client - async fn handle_ssv() -> Response { - // Read the JSON data - let data = include_str!("../../../../tests/data/ssv_valid.json"); - - // Create a valid response - Response::builder() - .status(200) - .header("Content-Type", "application/json") - .body(data.into()) - .unwrap() - } - - /// Sends a response with a large body - larger than the maximum allowed. - /// Note that hyper overwrites the content-length header automatically, so - /// setting it here wouldn't actually change the value that ultimately - /// gets sent to the server. - async fn handle_big_data() -> Response { - let body = "f".repeat(2 * MUXER_HTTP_MAX_LENGTH); - Response::builder() - .status(200) - .header("Content-Type", "application/text") - .body(body.into()) - .unwrap() - } - - /// Simulates a timeout by sleeping for a long time - async fn handle_timeout() -> Response { - // Sleep for a long time to simulate a timeout - tokio::time::sleep(std::time::Duration::from_secs(2 * TEST_HTTP_TIMEOUT)).await; - Response::builder() - .status(200) - .header("Content-Type", "application/text") - .body("Timeout response".into()) - .unwrap() - } } diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index 3e2c0a14..c2c30d2f 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -21,12 +21,12 @@ use super::{ use crate::{ commit::client::SignerClient, config::{ - CONFIG_ENV, MODULE_JWT_ENV, PBS_MODULE_NAME, PbsMuxes, SIGNER_URL_ENV, load_env_var, - load_file_from_env, + CONFIG_ENV, MODULE_JWT_ENV, MuxKeysLoader, PBS_MODULE_NAME, PbsMuxes, SIGNER_URL_ENV, + load_env_var, load_file_from_env, }, pbs::{ - DEFAULT_PBS_PORT, DefaultTimeout, LATE_IN_SLOT_TIME_MS, REGISTER_VALIDATOR_RETRY_LIMIT, - RelayClient, RelayEntry, + DEFAULT_PBS_PORT, DEFAULT_REGISTRY_REFRESH_SECONDS, DefaultTimeout, LATE_IN_SLOT_TIME_MS, + REGISTER_VALIDATOR_RETRY_LIMIT, RelayClient, RelayEntry, }, types::{BlsPublicKey, Chain, Jwt, ModuleId}, utils::{ @@ -123,6 +123,9 @@ pub struct PbsConfig { pub extra_validation_enabled: bool, /// Execution Layer RPC url to use for extra validation pub rpc_url: Option, + /// URL for the SSV network API + #[serde(default = "default_ssv_api_url")] + pub ssv_api_url: Url, /// Timeout for HTTP requests in seconds #[serde(default = "default_u64::")] pub http_timeout_seconds: u64, @@ -133,6 +136,11 @@ pub struct PbsConfig { /// request #[serde(deserialize_with = "empty_string_as_none", default)] pub validator_registration_batch_size: Option, + /// For any Registry-based Mux configurations that have dynamic pubkey + /// refreshing enabled, this is how often to refresh the list of pubkeys + /// from the registry, in seconds + #[serde(default = "default_u64::<{ DEFAULT_REGISTRY_REFRESH_SECONDS }>")] + pub mux_registry_refresh_interval_seconds: u64, } impl PbsConfig { @@ -183,6 +191,11 @@ impl PbsConfig { } } + ensure!( + self.mux_registry_refresh_interval_seconds > 0, + "registry mux refreshing interval must be greater than 0" + ); + Ok(()) } } @@ -213,13 +226,15 @@ pub struct PbsModuleConfig { /// List of default relays pub relays: Vec, /// List of all default relays plus additional relays from muxes (based on - /// URL) DO NOT use this for get_header calls, use `relays` or `muxes` + /// URL) DO NOT use this for get_header calls, use `relays` or `mux_lookup` /// instead pub all_relays: Vec, /// Signer client to call Signer API pub signer_client: Option, - /// Muxes config - pub muxes: Option>, + /// List of raw mux details configured, if any + pub registry_muxes: Option>, + /// Lookup of pubkey to mux config + pub mux_lookup: Option>, } fn default_pbs() -> String { @@ -246,19 +261,23 @@ pub async fn load_pbs_config() -> Result { SocketAddr::from((config.pbs.pbs_config.host, config.pbs.pbs_config.port)) }; - let muxes = match config.muxes { - Some(muxes) => { - let mux_configs = muxes.validate_and_fill(config.chain, &config.pbs.pbs_config).await?; - Some(mux_configs) - } - None => None, - }; - + // Get the list of relays from the default config let relay_clients = config.relays.into_iter().map(RelayClient::new).collect::>>()?; let mut all_relays = HashMap::with_capacity(relay_clients.len()); - if let Some(muxes) = &muxes { + // Validate the muxes and build the lookup tables + let (mux_lookup, registry_muxes) = match config.muxes { + Some(muxes) => { + let (mux_lookup, registry_muxes) = + muxes.validate_and_fill(config.chain, &config.pbs.pbs_config).await?; + (Some(mux_lookup), Some(registry_muxes)) + } + None => (None, None), + }; + + // Build the list of all relays, starting with muxes + if let Some(muxes) = &mux_lookup { for (_, mux) in muxes.iter() { for relay in mux.relays.iter() { all_relays.insert(&relay.config.entry.url, relay.clone()); @@ -283,7 +302,8 @@ pub async fn load_pbs_config() -> Result { relays: relay_clients, all_relays, signer_client: None, - muxes, + registry_muxes, + mux_lookup, }) } @@ -319,20 +339,24 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC )) }; - let muxes = match cb_config.muxes { - Some(muxes) => Some( - muxes - .validate_and_fill(cb_config.chain, &cb_config.pbs.static_config.pbs_config) - .await?, - ), - None => None, - }; - + // Get the list of relays from the default config let relay_clients = cb_config.relays.into_iter().map(RelayClient::new).collect::>>()?; let mut all_relays = HashMap::with_capacity(relay_clients.len()); - if let Some(muxes) = &muxes { + // Validate the muxes and build the lookup tables + let (mux_lookup, registry_muxes) = match cb_config.muxes { + Some(muxes) => { + let (mux_lookup, registry_muxes) = muxes + .validate_and_fill(cb_config.chain, &cb_config.pbs.static_config.pbs_config) + .await?; + (Some(mux_lookup), Some(registry_muxes)) + } + None => (None, None), + }; + + // Build the list of all relays, starting with muxes + if let Some(muxes) = &mux_lookup { for (_, mux) in muxes.iter() { for relay in mux.relays.iter() { all_relays.insert(&relay.config.entry.url, relay.clone()); @@ -371,8 +395,14 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC relays: relay_clients, all_relays, signer_client, - muxes, + registry_muxes, + mux_lookup, }, cb_config.pbs.extra, )) } + +/// Default URL for the SSV network API +fn default_ssv_api_url() -> Url { + Url::parse("https://api.ssv.network/api/v4").expect("default URL is valid") +} diff --git a/crates/common/src/interop/mod.rs b/crates/common/src/interop/mod.rs new file mode 100644 index 00000000..42502f6f --- /dev/null +++ b/crates/common/src/interop/mod.rs @@ -0,0 +1 @@ +pub mod ssv; diff --git a/crates/common/src/interop/ssv/mod.rs b/crates/common/src/interop/ssv/mod.rs new file mode 100644 index 00000000..b4ab6a6a --- /dev/null +++ b/crates/common/src/interop/ssv/mod.rs @@ -0,0 +1,2 @@ +pub mod types; +pub mod utils; diff --git a/crates/common/src/interop/ssv/types.rs b/crates/common/src/interop/ssv/types.rs new file mode 100644 index 00000000..b8ac2e23 --- /dev/null +++ b/crates/common/src/interop/ssv/types.rs @@ -0,0 +1,61 @@ +use serde::{Deserialize, Deserializer, Serialize}; + +use crate::types::BlsPublicKey; + +/// Response from the SSV API for validators +#[derive(Deserialize, Serialize)] +pub struct SSVResponse { + /// List of validators returned by the SSV API + pub validators: Vec, + + /// Pagination information + pub pagination: SSVPagination, +} + +/// Representation of a validator in the SSV API +#[derive(Clone)] +pub struct SSVValidator { + /// The public key of the validator + pub pubkey: BlsPublicKey, +} + +impl<'de> Deserialize<'de> for SSVValidator { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + struct SSVValidator { + public_key: String, + } + + let s = SSVValidator::deserialize(deserializer)?; + let bytes = alloy::hex::decode(&s.public_key).map_err(serde::de::Error::custom)?; + let pubkey = BlsPublicKey::deserialize(&bytes) + .map_err(|e| serde::de::Error::custom(format!("invalid BLS public key: {e:?}")))?; + + Ok(Self { pubkey }) + } +} + +impl Serialize for SSVValidator { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + #[derive(Serialize)] + struct SSVValidator { + public_key: String, + } + + let s = SSVValidator { public_key: self.pubkey.as_hex_string() }; + s.serialize(serializer) + } +} + +/// Pagination information from the SSV API +#[derive(Deserialize, Serialize)] +pub struct SSVPagination { + /// Total number of validators available + pub total: usize, +} diff --git a/crates/common/src/interop/ssv/utils.rs b/crates/common/src/interop/ssv/utils.rs new file mode 100644 index 00000000..e443e018 --- /dev/null +++ b/crates/common/src/interop/ssv/utils.rs @@ -0,0 +1,24 @@ +use std::time::Duration; + +use eyre::Context; +use url::Url; + +use crate::{config::safe_read_http_response, interop::ssv::types::SSVResponse}; + +pub async fn fetch_ssv_pubkeys_from_url( + url: Url, + http_timeout: Duration, +) -> eyre::Result { + let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?; + let response = client.get(url).send().await.map_err(|e| { + if e.is_timeout() { + eyre::eyre!("Request to SSV network API timed out: {e}") + } else { + eyre::eyre!("Error sending request to SSV network API: {e}") + } + })?; + + // Parse the response as JSON + let body_bytes = safe_read_http_response(response).await?; + serde_json::from_slice::(&body_bytes).wrap_err("failed to parse SSV response") +} diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 1fe1f26a..462dcec1 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -3,6 +3,7 @@ use std::time::Duration; pub mod commit; pub mod config; pub mod constants; +pub mod interop; pub mod pbs; pub mod signature; pub mod signer; diff --git a/crates/common/src/pbs/constants.rs b/crates/common/src/pbs/constants.rs index 533da137..bbe20b0d 100644 --- a/crates/common/src/pbs/constants.rs +++ b/crates/common/src/pbs/constants.rs @@ -35,3 +35,5 @@ pub const LATE_IN_SLOT_TIME_MS: u64 = 2000; // Maximum number of retries for validator registration request per relay pub const REGISTER_VALIDATOR_RETRY_LIMIT: u32 = 3; + +pub const DEFAULT_REGISTRY_REFRESH_SECONDS: u64 = 12 * 32; // One epoch diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index d972477b..764ab188 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -1,4 +1,4 @@ -#[cfg(test)] +#[cfg(feature = "testing-flags")] use std::cell::Cell; use std::{ net::Ipv4Addr, @@ -43,17 +43,18 @@ pub enum ResponseReadError { ReqwestError(#[from] reqwest::Error), } -#[cfg(test)] +#[cfg(feature = "testing-flags")] thread_local! { static IGNORE_CONTENT_LENGTH: Cell = const { Cell::new(false) }; } -#[cfg(test)] +#[cfg(feature = "testing-flags")] pub fn set_ignore_content_length(val: bool) { IGNORE_CONTENT_LENGTH.with(|f| f.set(val)); } -#[cfg(test)] +#[cfg(feature = "testing-flags")] +#[allow(dead_code)] fn should_ignore_content_length() -> bool { IGNORE_CONTENT_LENGTH.with(|f| f.get()) } @@ -65,13 +66,13 @@ pub async fn read_chunked_body_with_max( max_size: usize, ) -> Result, ResponseReadError> { // Get the content length from the response headers - #[cfg(not(test))] + #[cfg(not(feature = "testing-flags"))] let content_length = res.content_length(); - #[cfg(test)] + #[cfg(feature = "testing-flags")] let mut content_length = res.content_length(); - #[cfg(test)] + #[cfg(feature = "testing-flags")] if should_ignore_content_length() { // Used for testing purposes to ignore content length content_length = None; diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 98a411f7..9ed312af 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -35,7 +35,7 @@ pub async fn handle_get_header>( info!(ua, ms_into_slot, "new request"); - match A::get_header(params, req_headers, state.clone()).await { + match A::get_header(params, req_headers, state).await { Ok(res) => { if let Some(max_bid) = res { info!(value_eth = format_ether(*max_bid.data.message.value()), block_hash =% max_bid.block_hash(), "received header"); diff --git a/crates/pbs/src/routes/register_validator.rs b/crates/pbs/src/routes/register_validator.rs index dad85d27..51c8ce6e 100644 --- a/crates/pbs/src/routes/register_validator.rs +++ b/crates/pbs/src/routes/register_validator.rs @@ -24,7 +24,7 @@ pub async fn handle_register_validator>( info!(ua, num_registrations = registrations.len(), "new request"); - if let Err(err) = A::register_validator(registrations, req_headers, state.clone()).await { + if let Err(err) = A::register_validator(registrations, req_headers, state).await { error!(%err, "all relays failed registration"); let err = PbsClientError::NoResponse; diff --git a/crates/pbs/src/routes/reload.rs b/crates/pbs/src/routes/reload.rs index 86c09273..aa031d47 100644 --- a/crates/pbs/src/routes/reload.rs +++ b/crates/pbs/src/routes/reload.rs @@ -20,7 +20,7 @@ pub async fn handle_reload>( info!(ua, relay_check = prev_state.config.pbs_config.relay_check); - match A::reload(prev_state.clone()).await { + match A::reload(prev_state).await { Ok(new_state) => { info!("config reload successful"); diff --git a/crates/pbs/src/routes/status.rs b/crates/pbs/src/routes/status.rs index d0f68ac7..52fd3e2f 100644 --- a/crates/pbs/src/routes/status.rs +++ b/crates/pbs/src/routes/status.rs @@ -21,7 +21,7 @@ pub async fn handle_get_status>( info!(ua, relay_check = state.config.pbs_config.relay_check, "new request"); - match A::get_status(req_headers, state.clone()).await { + match A::get_status(req_headers, state).await { Ok(_) => { info!("relay check successful"); diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index b3e98553..4784e6b1 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -65,7 +65,7 @@ async fn handle_submit_block_impl>( info!(ua, ms_into_slot = now.saturating_sub(slot_start_ms), "new request"); - match A::submit_block(signed_blinded_block, req_headers, state.clone(), &api_version).await { + match A::submit_block(signed_blinded_block, req_headers, state, &api_version).await { Ok(res) => match res { Some(block_response) => { trace!(?block_response); diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 16afb6a7..6659ae85 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -1,6 +1,11 @@ -use std::time::Duration; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; use cb_common::{ + config::{MuxKeysLoader, PbsModuleConfig}, constants::{COMMIT_BOOST_COMMIT, COMMIT_BOOST_VERSION}, pbs::{BUILDER_V1_API_PATH, GET_STATUS_PATH}, types::Chain, @@ -10,14 +15,14 @@ use eyre::{Context, Result, bail}; use parking_lot::RwLock; use prometheus::core::Collector; use tokio::net::TcpListener; -use tracing::info; +use tracing::{debug, info, warn}; use url::Url; use crate::{ api::BuilderApi, metrics::PBS_METRICS_REGISTRY, routes::create_app_router, - state::{BuilderApiState, PbsState}, + state::{BuilderApiState, PbsState, PbsStateGuard}, }; pub struct PbsService; @@ -27,7 +32,16 @@ impl PbsService { let addr = state.config.endpoint; info!(version = COMMIT_BOOST_VERSION, commit_hash = COMMIT_BOOST_COMMIT, ?addr, chain =? state.config.chain, "starting PBS service"); - let app = create_app_router::(RwLock::new(state).into()); + // Check if refreshing registry muxes is required + let registry_refresh_time = state.config.pbs_config.mux_registry_refresh_interval_seconds; + let is_refreshing_required = state.config.registry_muxes.as_ref().is_some_and(|muxes| { + muxes.iter().any(|(loader, _)| { + matches!(loader, MuxKeysLoader::Registry { enable_refreshing: true, .. }) + }) + }); + + let state: Arc>> = RwLock::new(state).into(); + let app = create_app_router::(state.clone()); let listener = TcpListener::bind(addr).await?; let task = @@ -45,6 +59,24 @@ impl PbsService { bail!("PBS server failed to start. Are the relays properly configured?"); } + // Run the registry refresher task + if is_refreshing_required { + let mut interval = tokio::time::interval(Duration::from_secs(registry_refresh_time)); + let state = state.clone(); + tokio::spawn(async move { + let mut is_first_tick = true; + loop { + interval.tick().await; + if is_first_tick { + // Don't run immediately on the first tick, since it was just initialized + is_first_tick = false; + continue; + } + Self::refresh_registry_muxes(state.clone()).await; + } + }); + } + task.await? } @@ -55,4 +87,111 @@ impl PbsService { pub fn init_metrics(network: Chain) -> Result<()> { MetricsProvider::load_and_run(network, PBS_METRICS_REGISTRY.clone()) } + + async fn refresh_registry_muxes(state: PbsStateGuard) { + // Read-only portion + let mut new_pubkeys = HashMap::new(); + let mut removed_pubkeys = HashSet::new(); + { + let state = state.read().clone(); + let config = &state.config; + + // Short circuit if there aren't any registry muxes with dynamic refreshing + let registry_muxes = match &config.registry_muxes { + Some(muxes) => muxes, + None => return, + }; + + // Initialize an empty lookup if the config doesn't have one yet + let mux_lookup = match &config.mux_lookup { + Some(lookup) => lookup, + None => &HashMap::new(), + }; + + // Go through each registry mux and refresh its pubkeys + let default_pbs = &config.pbs_config; + let http_timeout = Duration::from_secs(default_pbs.http_timeout_seconds); + for (loader, runtime_config) in registry_muxes.iter() { + debug!("refreshing pubkeys for registry mux {}", runtime_config.id); + match loader + .load( + &runtime_config.id, + config.chain, + default_pbs.ssv_api_url.clone(), + default_pbs.rpc_url.clone(), + http_timeout, + ) + .await + { + Ok(pubkeys) => { + debug!( + "fetched {} pubkeys for registry mux {}", + pubkeys.len(), + runtime_config.id + ); + + // Add any new pubkeys to the new lookup table + let mut pubkey_set = HashSet::new(); + for pubkey in pubkeys { + pubkey_set.insert(pubkey.clone()); + if mux_lookup.get(&pubkey).is_none() { + // New pubkey + new_pubkeys.insert(pubkey.clone(), runtime_config.clone()); + } + } + + // Find any pubkeys that were removed + for (pubkey, existing_runtime) in mux_lookup.iter() { + if existing_runtime.id == runtime_config.id && + !pubkey_set.contains(pubkey) + { + removed_pubkeys.insert(pubkey.clone()); + } + } + } + Err(err) => { + warn!(%err, "failed to refresh pubkeys for registry mux {}", runtime_config.id); + } + } + } + } + + // Report changes + let mut no_new_changes = true; + if !new_pubkeys.is_empty() { + no_new_changes = false; + info!("discovered {} new pubkeys from registries", new_pubkeys.len()); + } + if !removed_pubkeys.is_empty() { + no_new_changes = false; + info!("registries have removed {} old pubkeys", removed_pubkeys.len()); + } + + // Write portion + if no_new_changes { + return; + } + { + // Since config isn't an RwLock, the option with the least amount of code churn + // is to just clone the whole config and replace the mux_lookup + // field. Cloning the config may be expensive, but this should be a fairly rare + // operation. + let mut state = state.write(); + let config = state.config.as_ref(); + let new_mux_lookup = if let Some(existing) = &config.mux_lookup { + let mut map = HashMap::new(); + for (k, v) in existing.iter() { + if !removed_pubkeys.contains(k) { + map.insert(k.clone(), v.clone()); + } + } + map.extend(new_pubkeys); + map + } else { + new_pubkeys + }; + state.config = + Arc::new(PbsModuleConfig { mux_lookup: Some(new_mux_lookup), ..config.clone() }); + } + } } diff --git a/crates/pbs/src/state.rs b/crates/pbs/src/state.rs index 4403b348..dd0e118e 100644 --- a/crates/pbs/src/state.rs +++ b/crates/pbs/src/state.rs @@ -56,7 +56,7 @@ where &self, pubkey: &BlsPublicKey, ) -> (&PbsConfig, &[RelayClient], Option<&str>) { - match self.config.muxes.as_ref().and_then(|muxes| muxes.get(pubkey)) { + match self.config.mux_lookup.as_ref().and_then(|muxes| muxes.get(pubkey)) { Some(mux) => (&mux.config, mux.relays.as_slice(), Some(&mux.id)), // return only the default relays if there's no match None => (self.pbs_config(), &self.config.relays, None), diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 3195cc11..5e8e1596 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -18,5 +18,9 @@ tempfile.workspace = true tokio.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +tracing-test.workspace = true tree_hash.workspace = true url.workspace = true + +[dev-dependencies] +cb-common = { path = "../crates/common", features = ["testing-flags"] } \ No newline at end of file diff --git a/tests/src/lib.rs b/tests/src/lib.rs index a4fbbb6a..42e36a8e 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -1,3 +1,4 @@ pub mod mock_relay; +pub mod mock_ssv; pub mod mock_validator; pub mod utils; diff --git a/tests/src/mock_ssv.rs b/tests/src/mock_ssv.rs new file mode 100644 index 00000000..3fa12546 --- /dev/null +++ b/tests/src/mock_ssv.rs @@ -0,0 +1,108 @@ +use std::{net::SocketAddr, sync::Arc}; + +use axum::{ + extract::{Path, State}, + response::Response, + routing::get, +}; +use cb_common::{ + config::MUXER_HTTP_MAX_LENGTH, + interop::ssv::types::{SSVPagination, SSVResponse, SSVValidator}, +}; +use tokio::{net::TcpListener, sync::RwLock, task::JoinHandle}; +use tracing::info; + +pub const TEST_HTTP_TIMEOUT: u64 = 2; + +/// State for the mock server +#[derive(Clone)] +pub struct SsvMockState { + /// List of pubkeys for the mock server to return + pub validators: Arc>>, + + /// Whether to force a timeout response to simulate a server error + pub force_timeout: Arc>, +} + +/// Creates a simple mock server to simulate the SSV API endpoint under +/// various conditions for testing. Note this ignores +pub async fn create_mock_ssv_server( + port: u16, + state: Option, +) -> Result, axum::Error> { + let data = include_str!("../../tests/data/ssv_valid.json"); + let response = serde_json::from_str::(data).expect("failed to parse test data"); + let state = state.unwrap_or(SsvMockState { + validators: Arc::new(RwLock::new(response.validators)), + force_timeout: Arc::new(RwLock::new(false)), + }); + let router = axum::Router::new() + .route("/{chain_name}/validators/in_operator/{node_operator_id}", get(handle_validators)) + .route("/big_data", get(handle_big_data)) + .with_state(state) + .into_make_service(); + + let address = SocketAddr::from(([127, 0, 0, 1], port)); + let listener = TcpListener::bind(address).await.map_err(axum::Error::new)?; + let server = axum::serve(listener, router).with_graceful_shutdown(async { + tokio::signal::ctrl_c().await.expect("Failed to listen for shutdown signal"); + }); + let result = Ok(tokio::spawn(async move { + if let Err(e) = server.await { + eprintln!("Server error: {e}"); + } + })); + info!("Mock server started on http://localhost:{port}/"); + result +} + +/// Returns a valid SSV validators response, or a timeout if requested in +/// the server state +async fn handle_validators( + State(state): State, + Path((_, _)): Path<(String, u64)>, +) -> Response { + // Time out if requested + if *state.force_timeout.read().await { + return handle_timeout().await; + } + + // Generate the response based on the current validators + let response: SSVResponse; + { + let validators = state.validators.read().await; + let pagination = SSVPagination { total: validators.len() }; + response = SSVResponse { validators: validators.clone(), pagination }; + } + + // Create a valid response + Response::builder() + .status(200) + .header("Content-Type", "application/json") + .body(serde_json::to_string(&response).unwrap().into()) + .unwrap() +} + +/// Sends a response with a large body - larger than the maximum allowed. +/// Note that hyper overwrites the content-length header automatically, so +/// setting it here wouldn't actually change the value that ultimately +/// gets sent to the server. +async fn handle_big_data() -> Response { + let body = "f".repeat(2 * MUXER_HTTP_MAX_LENGTH); + Response::builder() + .status(200) + .header("Content-Type", "application/text") + .body(body.into()) + .unwrap() +} + +/// Simulates a timeout by sleeping for a long time +async fn handle_timeout() -> Response { + // Sleep for a long time to simulate a timeout + tokio::time::sleep(std::time::Duration::from_secs(2 * TEST_HTTP_TIMEOUT)).await; + Response::builder() + .status(200) + .header("Content-Type", "application/text") + .body("Timeout response".into()) + .unwrap() +} diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 1c83e0eb..58ef42cf 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -14,9 +14,10 @@ use cb_common::{ pbs::{RelayClient, RelayEntry}, signer::SignerLoader, types::{BlsPublicKey, Chain, ModuleId}, - utils::default_host, + utils::{bls_pubkey_from_hex, default_host}, }; use eyre::Result; +use url::Url; pub fn get_local_address(port: u16) -> String { format!("http://0.0.0.0:{port}") @@ -78,10 +79,12 @@ pub fn get_pbs_static_config(port: u16) -> PbsConfig { min_bid_wei: U256::ZERO, late_in_slot_time_ms: u64::MAX, extra_validation_enabled: false, + ssv_api_url: Url::parse("https://example.net").unwrap(), rpc_url: None, http_timeout_seconds: 10, register_validator_retry_limit: u32::MAX, validator_registration_batch_size: None, + mux_registry_refresh_interval_seconds: 5, } } @@ -97,7 +100,8 @@ pub fn to_pbs_config( signer_client: None, all_relays: relays.clone(), relays, - muxes: None, + registry_muxes: None, + mux_lookup: None, } } @@ -131,3 +135,7 @@ pub fn get_start_signer_config( _ => panic!("Only local signers are supported in tests"), } } + +pub fn bls_pubkey_from_hex_unchecked(hex: &str) -> BlsPublicKey { + bls_pubkey_from_hex(hex).unwrap() +} diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 28a3d369..5cde1158 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -1,15 +1,152 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; -use cb_common::{config::RuntimeMuxConfig, signer::random_secret, types::Chain}; +use cb_common::{ + config::{HTTP_TIMEOUT_SECONDS_DEFAULT, MUXER_HTTP_MAX_LENGTH, RuntimeMuxConfig}, + interop::ssv::utils::fetch_ssv_pubkeys_from_url, + signer::random_secret, + types::Chain, + utils::{ResponseReadError, set_ignore_content_length}, +}; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ mock_relay::{MockRelayState, start_mock_relay_service}, + mock_ssv::{SsvMockState, TEST_HTTP_TIMEOUT, create_mock_ssv_server}, mock_validator::MockValidator, - utils::{generate_mock_relay, get_pbs_static_config, setup_test_env, to_pbs_config}, + utils::{ + bls_pubkey_from_hex_unchecked, generate_mock_relay, get_pbs_static_config, setup_test_env, + to_pbs_config, + }, }; use eyre::Result; use reqwest::StatusCode; +use tokio::sync::RwLock; use tracing::info; +use url::Url; + +#[tokio::test] +/// Tests that a successful SSV network fetch is handled and parsed properly +async fn test_ssv_network_fetch() -> Result<()> { + // Start the mock server + let port = 30100; + let _server_handle = create_mock_ssv_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) + .unwrap(); + let response = + fetch_ssv_pubkeys_from_url(url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)).await?; + + // Make sure the response is correct + // NOTE: requires that ssv_data.json dpesn't change + assert_eq!(response.validators.len(), 3); + let expected_pubkeys = [ + bls_pubkey_from_hex_unchecked( + "967ba17a3e7f82a25aa5350ec34d6923e28ad8237b5a41efe2c5e325240d74d87a015bf04634f21900963539c8229b2a", + ), + bls_pubkey_from_hex_unchecked( + "ac769e8cec802e8ffee34de3253be8f438a0c17ee84bdff0b6730280d24b5ecb77ebc9c985281b41ee3bda8663b6658c", + ), + bls_pubkey_from_hex_unchecked( + "8c866a5a05f3d45c49b457e29365259021a509c5daa82e124f9701a960ee87b8902e87175315ab638a3d8b1115b23639", + ), + ]; + for (i, validator) in response.validators.iter().enumerate() { + assert_eq!(validator.pubkey, expected_pubkeys[i]); + } + + // Clean up the server handle + _server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +/// Tests that the SSV network fetch is handled properly when the response's +/// body is too large +async fn test_ssv_network_fetch_big_data() -> Result<()> { + // Start the mock server + let port = 30101; + let server_handle = cb_tests::mock_ssv::create_mock_ssv_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); + let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; + + // The response should fail due to content length being too big + match response { + Ok(_) => { + panic!("Expected an error due to big content length, but got a successful response") + } + Err(e) => match e.downcast_ref::() { + Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { + assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); + assert!(*content_length > MUXER_HTTP_MAX_LENGTH); + assert!(raw.is_empty()); + } + _ => panic!("Expected PayloadTooLarge error, got: {}", e), + }, + } + + // Clean up the server handle + server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +/// Tests that the SSV network fetch is handled properly when the request +/// times out +async fn test_ssv_network_fetch_timeout() -> Result<()> { + // Start the mock server + let port = 30102; + let state = SsvMockState { + validators: Arc::new(RwLock::new(vec![])), + force_timeout: Arc::new(RwLock::new(true)), + }; + let server_handle = create_mock_ssv_server(port, Some(state)).await?; + let url = Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1")) + .unwrap(); + let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; + + // The response should fail due to timeout + assert!(response.is_err(), "Expected timeout error, but got success"); + if let Err(e) = response { + assert!(e.to_string().contains("timed out"), "Expected timeout error, got: {}", e); + } + + // Clean up the server handle + server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +/// Tests that the SSV network fetch is handled properly when the response's +/// content-length header is missing +async fn test_ssv_network_fetch_big_data_without_content_length() -> Result<()> { + // Start the mock server + let port = 30103; + set_ignore_content_length(true); + let server_handle = create_mock_ssv_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); + let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; + + // The response should fail due to the body being too big + match response { + Ok(_) => { + panic!("Expected an error due to excessive data, but got a successful response") + } + Err(e) => match e.downcast_ref::() { + Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { + assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); + assert_eq!(*content_length, 0); + assert!(!raw.is_empty()); + } + _ => panic!("Expected PayloadTooLarge error, got: {}", e), + }, + } + + // Clean up the server handle + server_handle.abort(); + + Ok(()) +} #[tokio::test] async fn test_mux() -> Result<()> { @@ -44,7 +181,7 @@ async fn test_mux() -> Result<()> { // Bind mux to a specific validator key let validator_pubkey = random_secret().public_key(); - config.muxes = Some(HashMap::from([(validator_pubkey.clone(), mux)])); + config.mux_lookup = Some(HashMap::from([(validator_pubkey.clone(), mux)])); // Run PBS service let state = PbsState::new(config); diff --git a/tests/tests/pbs_mux_refresh.rs b/tests/tests/pbs_mux_refresh.rs new file mode 100644 index 00000000..431fd84d --- /dev/null +++ b/tests/tests/pbs_mux_refresh.rs @@ -0,0 +1,174 @@ +use std::{sync::Arc, time::Duration}; + +use cb_common::{ + config::{MuxConfig, MuxKeysLoader, PbsMuxes}, + interop::ssv::types::SSVValidator, + signer::random_secret, + types::Chain, +}; +use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; +use cb_tests::{ + mock_relay::{MockRelayState, start_mock_relay_service}, + mock_ssv::{SsvMockState, create_mock_ssv_server}, + mock_validator::MockValidator, + utils::{generate_mock_relay, get_pbs_static_config, to_pbs_config}, +}; +use eyre::Result; +use reqwest::StatusCode; +use tokio::sync::RwLock; +use tracing::info; +use url::Url; + +#[tokio::test] +#[allow(unused_assignments)] +#[tracing_test::traced_test] +async fn test_auto_refresh() -> Result<()> { + // This test reads the log files to verify behavior, so we can't attach a global + // trace listener setup_test_env(); + + // Generate 3 keys: one not in the mux relay, one in the relay, and one that + // hasn't been added yet but will be later. The existing key isn't used but is + // needed in the initial config since CB won't start a mux without at least one + // key. + let default_signer = random_secret(); + let default_pubkey = default_signer.public_key(); + let existing_mux_signer = random_secret(); + let existing_mux_pubkey = existing_mux_signer.public_key(); + let new_mux_signer = random_secret(); + let new_mux_pubkey = new_mux_signer.public_key(); + + let chain = Chain::Hoodi; + let pbs_port = 3710; + + // Start the mock SSV API server + let ssv_api_port = pbs_port + 1; + let ssv_api_url = Url::parse(&format!("http://localhost:{ssv_api_port}"))?; + let mock_ssv_state = SsvMockState { + validators: Arc::new(RwLock::new(vec![SSVValidator { + pubkey: existing_mux_pubkey.clone(), + }])), + force_timeout: Arc::new(RwLock::new(false)), + }; + let ssv_server_handle = + create_mock_ssv_server(ssv_api_port, Some(mock_ssv_state.clone())).await?; + + // Start a default relay for non-mux keys + let default_relay_port = ssv_api_port + 1; + let default_relay = generate_mock_relay(default_relay_port, default_pubkey.clone())?; + let default_relay_state = Arc::new(MockRelayState::new(chain, default_signer.clone())); + let default_relay_task = + tokio::spawn(start_mock_relay_service(default_relay_state.clone(), default_relay_port)); + + // Start a mock relay to be used by the mux + let mux_relay_port = default_relay_port + 1; + let mux_relay = generate_mock_relay(mux_relay_port, default_pubkey.clone())?; + let mux_relay_id = mux_relay.id.clone().to_string(); + let mux_relay_state = Arc::new(MockRelayState::new(chain, default_signer)); + let mux_relay_task = + tokio::spawn(start_mock_relay_service(mux_relay_state.clone(), mux_relay_port)); + + // Create the registry mux + let loader = MuxKeysLoader::Registry { + enable_refreshing: true, + node_operator_id: 1, + registry: cb_common::config::NORegistry::SSV, + }; + let muxes = PbsMuxes { + muxes: vec![MuxConfig { + id: mux_relay_id.clone(), + loader: Some(loader), + late_in_slot_time_ms: Some(u64::MAX), + relays: vec![(*mux_relay.config).clone()], + timeout_get_header_ms: Some(u64::MAX - 1), + validator_pubkeys: vec![], + }], + }; + + // Set up the PBS config + let mut pbs_config = get_pbs_static_config(pbs_port); + pbs_config.ssv_api_url = ssv_api_url.clone(); + pbs_config.mux_registry_refresh_interval_seconds = 1; // Refresh the mux every second + let (mux_lookup, registry_muxes) = muxes.validate_and_fill(chain, &pbs_config).await?; + let relays = vec![default_relay.clone()]; // Default relay only + let mut config = to_pbs_config(chain, pbs_config, relays); + config.all_relays.push(mux_relay.clone()); // Add the mux relay to just this field + config.mux_lookup = Some(mux_lookup); + config.registry_muxes = Some(registry_muxes); + + // Run PBS service + let state = PbsState::new(config); + let pbs_server = tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + info!("Started PBS server with pubkey {default_pubkey}"); + + // Wait for the server to start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Try to run a get_header on the new pubkey, which should use the default + // relay only since it hasn't been seen in the mux yet + let mock_validator = MockValidator::new(pbs_port)?; + info!("Sending get header"); + let res = mock_validator.do_get_header(Some(new_mux_pubkey.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(default_relay_state.received_get_header(), 1); // default relay was used + assert_eq!(mux_relay_state.received_get_header(), 0); // mux relay was not used + + // Wait for the first refresh to complete + let wait_for_refresh_time = Duration::from_secs(2); + tokio::time::sleep(wait_for_refresh_time).await; + + // Check the logs to ensure a refresh happened + assert!(logs_contain(&format!("fetched 1 pubkeys for registry mux {mux_relay_id}"))); + assert!(!logs_contain(&format!("fetched 2 pubkeys for registry mux {mux_relay_id}"))); + assert!(!logs_contain("adding new pubkey")); + + // Add another validator + { + let mut validators = mock_ssv_state.validators.write().await; + validators.push(SSVValidator { pubkey: new_mux_pubkey.clone() }); + info!("Added new validator {new_mux_pubkey} to the SSV mock server"); + } + + // Wait for the next refresh to complete + tokio::time::sleep(wait_for_refresh_time).await; + + // Check the logs to ensure the new pubkey was added + assert!(logs_contain(&format!("fetched 2 pubkeys for registry mux {mux_relay_id}"))); + + // Try to run a get_header on the new pubkey - now it should use the mux relay + let res = mock_validator.do_get_header(Some(new_mux_pubkey.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(default_relay_state.received_get_header(), 1); // default relay was not used here + assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was used + + // Now try to do a get_header with the old pubkey - it should only use the + // default relay + let res = mock_validator.do_get_header(Some(default_pubkey.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(default_relay_state.received_get_header(), 2); // default relay was used + assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used + + // Finally, remove the original mux pubkey from the SSV server + { + let mut validators = mock_ssv_state.validators.write().await; + validators.retain(|v| v.pubkey != existing_mux_pubkey); + info!("Removed existing validator {existing_mux_pubkey} from the SSV mock server"); + } + + // Wait for the next refresh to complete + tokio::time::sleep(wait_for_refresh_time).await; + + // Try to do a get_header with the removed pubkey - it should only use the + // default relay + let res = mock_validator.do_get_header(Some(existing_mux_pubkey.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(default_relay_state.received_get_header(), 3); // default relay was used + assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used + + // Shut down the server handles + pbs_server.abort(); + ssv_server_handle.abort(); + default_relay_task.abort(); + mux_relay_task.abort(); + + Ok(()) +}