From 784e25b8b51991742ad5df855c8b556a77916e2b Mon Sep 17 00:00:00 2001 From: elizabeth Date: Thu, 16 Oct 2025 18:52:41 -0400 Subject: [PATCH] fix clippy, add unreachable_pub and other lints --- Cargo.toml | 9 + crates/rproxy/Cargo.toml | 3 + .../src/circuit_breaker/circuit_breaker.rs | 202 ---------- crates/rproxy/src/circuit_breaker/mod.rs | 204 +++++++++- .../config/{config_authrpc.rs => authrpc.rs} | 12 +- ..._circuit_breaker.rs => circuit_breaker.rs} | 4 +- crates/rproxy/src/config/config.rs | 206 ---------- .../{config_flashblocks.rs => flashblocks.rs} | 4 +- .../config/{config_logging.rs => logging.rs} | 4 +- .../config/{config_metrics.rs => metrics.rs} | 0 crates/rproxy/src/config/mod.rs | 244 +++++++++++- .../{config_proxy_http.rs => proxy_http.rs} | 0 .../{config_proxy_ws.rs => proxy_ws.rs} | 0 .../src/config/{config_rpc.rs => rpc.rs} | 12 +- .../src/config/{config_tls.rs => tls.rs} | 33 +- crates/rproxy/src/jrpc/jrpc.rs | 100 ----- crates/rproxy/src/jrpc/mod.rs | 102 ++++- ...{metrics_candlestick.rs => candlestick.rs} | 0 .../metrics/{metrics_labels.rs => labels.rs} | 0 crates/rproxy/src/metrics/metrics.rs | 369 ----------------- crates/rproxy/src/metrics/mod.rs | 375 +++++++++++++++++- crates/rproxy/src/proxy/mod.rs | 140 ++++++- crates/rproxy/src/proxy/proxy.rs | 138 ------- .../src/proxy_http/{proxy_http.rs => http.rs} | 213 +++++----- .../{proxy_http_inner.rs => inner.rs} | 0 ...http_inner_authrpc.rs => inner_authrpc.rs} | 7 +- .../{proxy_http_inner_rpc.rs => inner_rpc.rs} | 8 +- crates/rproxy/src/proxy_http/mod.rs | 21 +- ...proxy_ws_flashblocks.rs => flashblocks.rs} | 0 .../proxy_ws/{proxy_ws_inner.rs => inner.rs} | 0 crates/rproxy/src/proxy_ws/mod.rs | 12 +- .../src/proxy_ws/{proxy_ws.rs => ws.rs} | 160 ++++---- crates/rproxy/src/server/mod.rs | 237 ++++++++++- crates/rproxy/src/server/server.rs | 238 ----------- crates/rproxy/src/utils/utils_compression.rs | 4 +- crates/rproxy/src/utils/utils_http.rs | 2 +- crates/rproxy/src/utils/utils_loggable.rs | 11 +- crates/rproxy/src/utils/utils_op_stack.rs | 17 +- 38 files changed, 1512 insertions(+), 1579 deletions(-) delete mode 100644 crates/rproxy/src/circuit_breaker/circuit_breaker.rs rename crates/rproxy/src/config/{config_authrpc.rs => authrpc.rs} (96%) rename crates/rproxy/src/config/{config_circuit_breaker.rs => circuit_breaker.rs} (98%) delete mode 100644 crates/rproxy/src/config/config.rs rename crates/rproxy/src/config/{config_flashblocks.rs => flashblocks.rs} (98%) rename crates/rproxy/src/config/{config_logging.rs => logging.rs} (97%) rename crates/rproxy/src/config/{config_metrics.rs => metrics.rs} (100%) rename crates/rproxy/src/config/{config_proxy_http.rs => proxy_http.rs} (100%) rename crates/rproxy/src/config/{config_proxy_ws.rs => proxy_ws.rs} (100%) rename crates/rproxy/src/config/{config_rpc.rs => rpc.rs} (96%) rename crates/rproxy/src/config/{config_tls.rs => tls.rs} (90%) delete mode 100644 crates/rproxy/src/jrpc/jrpc.rs rename crates/rproxy/src/metrics/{metrics_candlestick.rs => candlestick.rs} (100%) rename crates/rproxy/src/metrics/{metrics_labels.rs => labels.rs} (100%) delete mode 100644 crates/rproxy/src/metrics/metrics.rs delete mode 100644 crates/rproxy/src/proxy/proxy.rs rename crates/rproxy/src/proxy_http/{proxy_http.rs => http.rs} (88%) rename crates/rproxy/src/proxy_http/{proxy_http_inner.rs => inner.rs} (100%) rename crates/rproxy/src/proxy_http/{proxy_http_inner_authrpc.rs => inner_authrpc.rs} (91%) rename crates/rproxy/src/proxy_http/{proxy_http_inner_rpc.rs => inner_rpc.rs} (95%) rename crates/rproxy/src/proxy_ws/{proxy_ws_flashblocks.rs => flashblocks.rs} (100%) rename crates/rproxy/src/proxy_ws/{proxy_ws_inner.rs => inner.rs} (100%) rename crates/rproxy/src/proxy_ws/{proxy_ws.rs => ws.rs} (90%) delete mode 100644 crates/rproxy/src/server/server.rs diff --git a/Cargo.toml b/Cargo.toml index d825a03..74148d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,3 +6,12 @@ default-members = ["crates/rproxy"] members = [ "crates/rproxy", ] + +[workspace.lints.rust] +unreachable_pub = "deny" + +[workspace.lints.clippy] +match_same_arms = "warn" +unused_async = "warn" +uninlined_format_args = "warn" +manual_let_else = "warn" diff --git a/crates/rproxy/Cargo.toml b/crates/rproxy/Cargo.toml index a6aa97d..6abb033 100644 --- a/crates/rproxy/Cargo.toml +++ b/crates/rproxy/Cargo.toml @@ -8,6 +8,9 @@ default-run = "rproxy" name = "rproxy" path = "src/bin/main.rs" +[lints] +workspace = true + [dependencies] actix = "0.13.5" actix-http = { version = "3.11.1", features = ["ws"] } diff --git a/crates/rproxy/src/circuit_breaker/circuit_breaker.rs b/crates/rproxy/src/circuit_breaker/circuit_breaker.rs deleted file mode 100644 index 8e734ae..0000000 --- a/crates/rproxy/src/circuit_breaker/circuit_breaker.rs +++ /dev/null @@ -1,202 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use awc::{ - Client, - Connector, - http::{self, Method, header}, -}; -use parking_lot::Mutex; -use tokio::sync::broadcast; -use tracing::{debug, error, warn}; - -use crate::config::ConfigCircuitBreaker; - -// CircuitBreakerInner ------------------------------------------------- - -struct CircuitBreakerInner { - curr_status: Status, - last_status: Status, - - streak_length: usize, -} - -// CircuitBreaker ------------------------------------------------------ - -pub(crate) struct CircuitBreaker { - config: ConfigCircuitBreaker, - inner: Arc>, - client: Client, -} - -impl CircuitBreaker { - pub(crate) fn new(config: ConfigCircuitBreaker) -> Self { - let client = Self::client(&config); - - Self { - config, - client, - inner: Arc::new(Mutex::new(CircuitBreakerInner { - curr_status: Status::Healthy, - last_status: Status::Healthy, - - streak_length: 0, - })), - } - } - - #[inline] - pub(crate) fn name() -> &'static str { - "circuit-breaker" - } - - #[inline] - fn timeout(config: &ConfigCircuitBreaker) -> Duration { - std::cmp::min(Duration::from_secs(5), config.poll_interval * 3 / 4) - } - - #[inline] - fn max_threshold(config: &ConfigCircuitBreaker) -> usize { - std::cmp::max(config.threshold_healthy, config.threshold_unhealthy) + 1 - } - - #[inline] - fn client(config: &ConfigCircuitBreaker) -> Client { - let host = config - .url() - .host() - .unwrap() // safety: verified on start - .to_string(); - let timeout = Self::timeout(config); - - Client::builder() - .add_default_header((header::HOST, host)) - .connector(Connector::new().timeout(timeout).handshake_timeout(timeout)) - .timeout(timeout) - .finish() - } - - pub(crate) async fn run( - self, - canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, - ) { - let canceller = canceller.clone(); - let resetter = resetter.clone(); - - let mut ticker = tokio::time::interval(self.config.poll_interval); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - // spawning locally b/c actix client is thread-local by design - if let Err(err) = tokio::task::spawn_local(async move { - loop { - let resetter = resetter.clone(); - - tokio::select! { - _ = ticker.tick() => { - self.poll(resetter).await; - } - - _ = canceller.cancelled() => { - break - } - } - } - }) - .await - { - warn!( - service = Self::name(), - error = ?err, - "Failure while running circuit-breaker", - ); - } - } - - async fn poll(&self, resetter: broadcast::Sender<()>) { - let req = self - .client - .request(Method::GET, self.config.url.clone()) - .timeout(Self::timeout(&self.config)); - - let status = match req.send().await { - Ok(res) => match res.status() { - http::StatusCode::OK => Status::Healthy, - _ => { - debug!( - service = Self::name(), - status = %res.status(), - "Unexpected backend status", - ); - Status::Unhealthy - } - }, - - Err(err) => { - debug!( - service = Self::name(), - error = ?err, - "Failed to poll health-status of the backend", - ); - Status::Unhealthy - } - }; - - let mut this = self.inner.lock(); - - if this.last_status == status { - if this.streak_length < Self::max_threshold(&self.config) { - this.streak_length += 1; // prevent overflow on long healthy runs - } - } else { - this.streak_length = 1 - } - this.last_status = status; - - match (this.curr_status.clone(), this.last_status.clone()) { - (Status::Healthy, Status::Healthy) => {} - - (Status::Healthy, Status::Unhealthy) => { - if this.streak_length < self.config.threshold_unhealthy { - return; - } - this.curr_status = Status::Unhealthy; - - warn!(service = Self::name(), "Backend became unhealthy, resetting..."); - - if let Err(err) = resetter.send(()) { - error!( - from = Self::name(), - error = ?err, - "Failed to broadcast reset signal", - ); - } - } - - (Status::Unhealthy, Status::Unhealthy) => { - warn!(service = Self::name(), "Backend is still unhealthy, resetting..."); - - if let Err(err) = resetter.send(()) { - error!( - from = Self::name(), - error = ?err, - "Failed to broadcast reset signal", - ); - } - } - - (Status::Unhealthy, Status::Healthy) => { - if this.streak_length == self.config.threshold_healthy { - this.curr_status = Status::Healthy; - } - } - } - } -} - -// Status -------------------------------------------------------------- - -#[derive(Clone, PartialEq)] -enum Status { - Healthy, - Unhealthy, -} diff --git a/crates/rproxy/src/circuit_breaker/mod.rs b/crates/rproxy/src/circuit_breaker/mod.rs index 9a13f67..8e734ae 100644 --- a/crates/rproxy/src/circuit_breaker/mod.rs +++ b/crates/rproxy/src/circuit_breaker/mod.rs @@ -1,2 +1,202 @@ -mod circuit_breaker; -pub(crate) use circuit_breaker::CircuitBreaker; +use std::{sync::Arc, time::Duration}; + +use awc::{ + Client, + Connector, + http::{self, Method, header}, +}; +use parking_lot::Mutex; +use tokio::sync::broadcast; +use tracing::{debug, error, warn}; + +use crate::config::ConfigCircuitBreaker; + +// CircuitBreakerInner ------------------------------------------------- + +struct CircuitBreakerInner { + curr_status: Status, + last_status: Status, + + streak_length: usize, +} + +// CircuitBreaker ------------------------------------------------------ + +pub(crate) struct CircuitBreaker { + config: ConfigCircuitBreaker, + inner: Arc>, + client: Client, +} + +impl CircuitBreaker { + pub(crate) fn new(config: ConfigCircuitBreaker) -> Self { + let client = Self::client(&config); + + Self { + config, + client, + inner: Arc::new(Mutex::new(CircuitBreakerInner { + curr_status: Status::Healthy, + last_status: Status::Healthy, + + streak_length: 0, + })), + } + } + + #[inline] + pub(crate) fn name() -> &'static str { + "circuit-breaker" + } + + #[inline] + fn timeout(config: &ConfigCircuitBreaker) -> Duration { + std::cmp::min(Duration::from_secs(5), config.poll_interval * 3 / 4) + } + + #[inline] + fn max_threshold(config: &ConfigCircuitBreaker) -> usize { + std::cmp::max(config.threshold_healthy, config.threshold_unhealthy) + 1 + } + + #[inline] + fn client(config: &ConfigCircuitBreaker) -> Client { + let host = config + .url() + .host() + .unwrap() // safety: verified on start + .to_string(); + let timeout = Self::timeout(config); + + Client::builder() + .add_default_header((header::HOST, host)) + .connector(Connector::new().timeout(timeout).handshake_timeout(timeout)) + .timeout(timeout) + .finish() + } + + pub(crate) async fn run( + self, + canceller: tokio_util::sync::CancellationToken, + resetter: broadcast::Sender<()>, + ) { + let canceller = canceller.clone(); + let resetter = resetter.clone(); + + let mut ticker = tokio::time::interval(self.config.poll_interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // spawning locally b/c actix client is thread-local by design + if let Err(err) = tokio::task::spawn_local(async move { + loop { + let resetter = resetter.clone(); + + tokio::select! { + _ = ticker.tick() => { + self.poll(resetter).await; + } + + _ = canceller.cancelled() => { + break + } + } + } + }) + .await + { + warn!( + service = Self::name(), + error = ?err, + "Failure while running circuit-breaker", + ); + } + } + + async fn poll(&self, resetter: broadcast::Sender<()>) { + let req = self + .client + .request(Method::GET, self.config.url.clone()) + .timeout(Self::timeout(&self.config)); + + let status = match req.send().await { + Ok(res) => match res.status() { + http::StatusCode::OK => Status::Healthy, + _ => { + debug!( + service = Self::name(), + status = %res.status(), + "Unexpected backend status", + ); + Status::Unhealthy + } + }, + + Err(err) => { + debug!( + service = Self::name(), + error = ?err, + "Failed to poll health-status of the backend", + ); + Status::Unhealthy + } + }; + + let mut this = self.inner.lock(); + + if this.last_status == status { + if this.streak_length < Self::max_threshold(&self.config) { + this.streak_length += 1; // prevent overflow on long healthy runs + } + } else { + this.streak_length = 1 + } + this.last_status = status; + + match (this.curr_status.clone(), this.last_status.clone()) { + (Status::Healthy, Status::Healthy) => {} + + (Status::Healthy, Status::Unhealthy) => { + if this.streak_length < self.config.threshold_unhealthy { + return; + } + this.curr_status = Status::Unhealthy; + + warn!(service = Self::name(), "Backend became unhealthy, resetting..."); + + if let Err(err) = resetter.send(()) { + error!( + from = Self::name(), + error = ?err, + "Failed to broadcast reset signal", + ); + } + } + + (Status::Unhealthy, Status::Unhealthy) => { + warn!(service = Self::name(), "Backend is still unhealthy, resetting..."); + + if let Err(err) = resetter.send(()) { + error!( + from = Self::name(), + error = ?err, + "Failed to broadcast reset signal", + ); + } + } + + (Status::Unhealthy, Status::Healthy) => { + if this.streak_length == self.config.threshold_healthy { + this.curr_status = Status::Healthy; + } + } + } + } +} + +// Status -------------------------------------------------------------- + +#[derive(Clone, PartialEq)] +enum Status { + Healthy, + Unhealthy, +} diff --git a/crates/rproxy/src/config/config_authrpc.rs b/crates/rproxy/src/config/authrpc.rs similarity index 96% rename from crates/rproxy/src/config/config_authrpc.rs rename to crates/rproxy/src/config/authrpc.rs index e50e7c0..09d08be 100644 --- a/crates/rproxy/src/config/config_authrpc.rs +++ b/crates/rproxy/src/config/authrpc.rs @@ -169,7 +169,7 @@ impl ConfigAuthrpc { // backend_url match Url::parse(&self.backend_url) { Ok(url) => { - if let None = url.host() { + if url.host().is_none() { errs.push(ConfigAuthrpcError::BackendUrlMissesHost { url: self.backend_url.clone(), }); @@ -194,9 +194,9 @@ impl ConfigAuthrpc { // mirroring_peer_urls for peer_url in self.mirroring_peer_urls.iter() { - match Url::parse(&peer_url) { + match Url::parse(peer_url) { Ok(url) => { - if let None = url.host() { + if url.host().is_none() { errs.push(ConfigAuthrpcError::PeerUrlMissesHost { url: peer_url.clone() }); } } @@ -222,7 +222,7 @@ impl ConfigAuthrpc { let backend_url = Url::parse(&self.backend_url.clone()).expect(ALREADY_VALIDATED); let backend_host = backend_url.host_str().expect(ALREADY_VALIDATED); - let backend_ips: Vec = match format!("{}:0", backend_host).to_socket_addrs() { + let backend_ips: Vec = match format!("{backend_host}:0").to_socket_addrs() { Ok(res) => res, Err(err) => { warn!(host = backend_host, error = ?err, "Failed to resolve backend host"); @@ -235,7 +235,7 @@ impl ConfigAuthrpc { let local_ips = get_all_local_ip_addresses(); self.mirroring_peer_urls.retain(|url| { - let peer_url = Url::parse(&url).expect(ALREADY_VALIDATED); + let peer_url = Url::parse(url).expect(ALREADY_VALIDATED); let peer_host = peer_url.host_str().expect(ALREADY_VALIDATED); if !peer_url.port().eq(&backend_url.port()) { @@ -248,7 +248,7 @@ impl ConfigAuthrpc { return false; } - let peer_ips: Vec = match format!("{}:0", peer_host).to_socket_addrs() { + let peer_ips: Vec = match format!("{peer_host}:0").to_socket_addrs() { Ok(res) => res, Err(err) => { warn!(host = peer_host, error = ?err, "Failed to resolve peer host"); diff --git a/crates/rproxy/src/config/config_circuit_breaker.rs b/crates/rproxy/src/config/circuit_breaker.rs similarity index 98% rename from crates/rproxy/src/config/config_circuit_breaker.rs rename to crates/rproxy/src/config/circuit_breaker.rs index 44127e1..c8d782a 100644 --- a/crates/rproxy/src/config/config_circuit_breaker.rs +++ b/crates/rproxy/src/config/circuit_breaker.rs @@ -90,10 +90,10 @@ impl ConfigCircuitBreaker { } // url - if self.url != "" { + if !self.url.is_empty() { match Url::parse(&self.url) { Ok(url) => { - if let None = url.host() { + if url.host().is_none() { errs.push(ConfigCircuitBreakerError::UrlMissesHost { url: self.url.clone(), }); diff --git a/crates/rproxy/src/config/config.rs b/crates/rproxy/src/config/config.rs deleted file mode 100644 index e715e73..0000000 --- a/crates/rproxy/src/config/config.rs +++ /dev/null @@ -1,206 +0,0 @@ -use std::{process, sync::LazyLock}; - -use clap::Parser; -use thiserror::Error; - -use crate::config::{ - ConfigAuthrpc, - ConfigAuthrpcError, - ConfigCircuitBreaker, - ConfigCircuitBreakerError, - ConfigFlashblocks, - ConfigFlashblocksError, - ConfigLogError, - ConfigLogging, - ConfigMetrics, - ConfigMetricsError, - ConfigRpc, - ConfigRpcError, - ConfigTls, - ConfigTlsError, -}; - -pub(crate) const ALREADY_VALIDATED: &str = "parameter must have been validated already"; - -pub(crate) static PARALLELISM: LazyLock = - LazyLock::new(|| std::thread::available_parallelism().map_or(2, std::num::NonZero::get)); - -pub(crate) static PARALLELISM_STRING: LazyLock = LazyLock::new(|| PARALLELISM.to_string()); - -// Config -------------------------------------------------------------- - -#[derive(Clone, Parser)] -#[command(about, author, long_about = None, term_width = 90, version)] -pub struct Config { - #[command(flatten)] - pub(crate) authrpc: ConfigAuthrpc, - - #[command(flatten)] - pub(crate) circuit_breaker: ConfigCircuitBreaker, - - #[command(flatten)] - pub(crate) flashblocks: ConfigFlashblocks, - - #[command(flatten)] - pub(crate) logging: ConfigLogging, - - #[command(flatten)] - pub(crate) metrics: ConfigMetrics, - - #[command(flatten)] - pub(crate) rpc: ConfigRpc, - - #[command(flatten)] - pub(crate) tls: ConfigTls, -} - -impl Config { - pub fn setup() -> Self { - let mut res = Config::parse(); - - if let Some(errs) = res.clone().validate() { - for err in errs.iter() { - eprintln!("fatal: {}", err); - } - process::exit(1); - }; - - res.logging.setup_logging(); - - res.preprocess(); - - res - } - - pub(crate) fn validate(self) -> Option> { - let mut errs: Vec = vec![]; - - // authrpc proxy - if self.rpc.enabled { - if let Some(_errs) = self.authrpc.validate() { - errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); - } - } - - // circuit-breaker - if let Some(_errs) = self.circuit_breaker.validate() { - errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); - } - - // flashblocks proxy - if self.flashblocks.enabled { - if let Some(_errs) = self.flashblocks.validate() { - errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); - } - } - - // logging - if let Some(_errs) = self.logging.validate() { - errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); - } - - // metrics - if let Some(_errs) = self.metrics.validate() { - errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); - } - - // rpc proxy - if self.rpc.enabled { - if let Some(_errs) = self.rpc.validate() { - errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); - } - } - - // tls - if self.tls.certificate != "" || self.tls.key != "" { - if let Some(_errs) = self.tls.validate() { - errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); - } - } - - if !self.authrpc.enabled && !self.flashblocks.enabled && !self.rpc.enabled { - errs.push(ConfigError::NoEnabledProxies); - } - - match errs.len() { - 0 => None, - _ => Some(errs), - } - } - - pub(crate) fn preprocess(&mut self) { - self.authrpc.preprocess(); - self.rpc.preprocess(); - } -} - -// ConfigError --------------------------------------------------------- - -#[derive(Debug, Error)] -pub(crate) enum ConfigError { - #[error("invalid authrpc proxy configuration: {0}")] - ConfigAuthrpcInvalid(ConfigAuthrpcError), - - #[error("invalid circuit-breaker configuration: {0}")] - ConfigCircuitBreakerInvalid(ConfigCircuitBreakerError), - - #[error("invalid flashblocks proxy configuration: {0}")] - ConfigFlashblocksInvalid(ConfigFlashblocksError), - - #[error("invalid logging configuration: {0}")] - ConfigLoggingInvalid(ConfigLogError), - - #[error("invalid metrics configuration: {0}")] - ConfigMetricsInvalid(ConfigMetricsError), - - #[error("invalid rpc proxy configuration: {0}")] - ConfigRpcInvalid(ConfigRpcError), - - #[error("invalid tls configuration: {0}")] - ConfigTlsInvalid(ConfigTlsError), - - #[error("no enabled proxies")] - NoEnabledProxies, -} - -impl From for ConfigError { - fn from(err: ConfigAuthrpcError) -> Self { - Self::ConfigAuthrpcInvalid(err) - } -} - -impl From for ConfigError { - fn from(err: ConfigCircuitBreakerError) -> Self { - Self::ConfigCircuitBreakerInvalid(err) - } -} - -impl From for ConfigError { - fn from(err: ConfigFlashblocksError) -> Self { - Self::ConfigFlashblocksInvalid(err) - } -} - -impl From for ConfigError { - fn from(err: ConfigLogError) -> Self { - Self::ConfigLoggingInvalid(err) - } -} - -impl From for ConfigError { - fn from(err: ConfigMetricsError) -> Self { - Self::ConfigMetricsInvalid(err) - } -} - -impl From for ConfigError { - fn from(err: ConfigRpcError) -> Self { - Self::ConfigRpcInvalid(err) - } -} - -impl From for ConfigError { - fn from(err: ConfigTlsError) -> Self { - Self::ConfigTlsInvalid(err) - } -} diff --git a/crates/rproxy/src/config/config_flashblocks.rs b/crates/rproxy/src/config/flashblocks.rs similarity index 98% rename from crates/rproxy/src/config/config_flashblocks.rs rename to crates/rproxy/src/config/flashblocks.rs index 929e94f..c693523 100644 --- a/crates/rproxy/src/config/config_flashblocks.rs +++ b/crates/rproxy/src/config/flashblocks.rs @@ -98,13 +98,13 @@ impl ConfigFlashblocks { // backend_url match self.backend_url.parse::() { Ok(uri) => { - if let None = uri.authority() { + if uri.authority().is_none() { errs.push(ConfigFlashblocksError::BackendUrlMissesHost { url: self.backend_url.clone(), }); } - if let None = uri.host() { + if uri.host().is_none() { errs.push(ConfigFlashblocksError::BackendUrlMissesHost { url: self.backend_url.clone(), }); diff --git a/crates/rproxy/src/config/config_logging.rs b/crates/rproxy/src/config/logging.rs similarity index 97% rename from crates/rproxy/src/config/config_logging.rs rename to crates/rproxy/src/config/logging.rs index a46a265..055e6aa 100644 --- a/crates/rproxy/src/config/config_logging.rs +++ b/crates/rproxy/src/config/logging.rs @@ -45,7 +45,7 @@ impl ConfigLogging { pub(crate) fn setup_logging(&self) { match self.format { - ConfigLogFormat::JSON => { + ConfigLogFormat::Json => { tracing_subscriber::registry() .with(EnvFilter::from(self.level.clone())) .with(fmt::layer().json().flatten_event(true)) @@ -66,7 +66,7 @@ impl ConfigLogging { #[derive(Clone, Debug, clap::ValueEnum)] pub(crate) enum ConfigLogFormat { - JSON, + Json, Text, } diff --git a/crates/rproxy/src/config/config_metrics.rs b/crates/rproxy/src/config/metrics.rs similarity index 100% rename from crates/rproxy/src/config/config_metrics.rs rename to crates/rproxy/src/config/metrics.rs diff --git a/crates/rproxy/src/config/mod.rs b/crates/rproxy/src/config/mod.rs index 4202988..d16e070 100644 --- a/crates/rproxy/src/config/mod.rs +++ b/crates/rproxy/src/config/mod.rs @@ -1,30 +1,232 @@ -mod config_authrpc; -pub(crate) use config_authrpc::*; +mod authrpc; +pub(crate) use authrpc::*; -mod config_circuit_breaker; -pub(crate) use config_circuit_breaker::*; +mod circuit_breaker; +pub(crate) use circuit_breaker::*; -mod config_flashblocks; -pub(crate) use config_flashblocks::*; +mod flashblocks; +pub(crate) use flashblocks::*; -mod config_logging; -pub(crate) use config_logging::*; +mod logging; +pub(crate) use logging::*; -mod config_metrics; -pub(crate) use config_metrics::*; +mod metrics; +pub(crate) use metrics::*; -mod config_proxy_http; -pub(crate) use config_proxy_http::*; +mod proxy_http; +pub(crate) use proxy_http::*; -mod config_proxy_ws; -pub(crate) use config_proxy_ws::*; +mod proxy_ws; +pub(crate) use proxy_ws::*; -mod config_rpc; -pub(crate) use config_rpc::*; +mod rpc; +pub(crate) use rpc::*; -mod config_tls; -pub(crate) use config_tls::*; +mod tls; +use std::{process, sync::LazyLock}; -mod config; -pub use config::Config; -pub(crate) use config::*; +use clap::Parser; +use thiserror::Error; +pub(crate) use tls::*; + +pub(crate) use crate::config::{ + ConfigAuthrpc, + ConfigAuthrpcError, + ConfigCircuitBreaker, + ConfigCircuitBreakerError, + ConfigFlashblocks, + ConfigFlashblocksError, + ConfigLogError, + ConfigLogging, + ConfigMetrics, + ConfigMetricsError, + ConfigRpc, + ConfigRpcError, + ConfigTls, + ConfigTlsError, +}; + +pub(crate) const ALREADY_VALIDATED: &str = "parameter must have been validated already"; + +pub(crate) static PARALLELISM: LazyLock = + LazyLock::new(|| std::thread::available_parallelism().map_or(2, std::num::NonZero::get)); + +pub(crate) static PARALLELISM_STRING: LazyLock = LazyLock::new(|| PARALLELISM.to_string()); + +// Config -------------------------------------------------------------- + +#[derive(Clone, Parser)] +#[command(about, author, long_about = None, term_width = 90, version)] +pub struct Config { + #[command(flatten)] + pub(crate) authrpc: ConfigAuthrpc, + + #[command(flatten)] + pub(crate) circuit_breaker: ConfigCircuitBreaker, + + #[command(flatten)] + pub(crate) flashblocks: ConfigFlashblocks, + + #[command(flatten)] + pub(crate) logging: ConfigLogging, + + #[command(flatten)] + pub(crate) metrics: ConfigMetrics, + + #[command(flatten)] + pub(crate) rpc: ConfigRpc, + + #[command(flatten)] + pub(crate) tls: ConfigTls, +} + +impl Config { + pub fn setup() -> Self { + let mut res = Config::parse(); + + if let Some(errs) = res.clone().validate() { + for err in errs.iter() { + eprintln!("fatal: {err}"); + } + process::exit(1); + }; + + res.logging.setup_logging(); + + res.preprocess(); + + res + } + + pub(crate) fn validate(self) -> Option> { + let mut errs: Vec = vec![]; + + // authrpc proxy + if self.rpc.enabled && + let Some(_errs) = self.authrpc.validate() + { + errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); + } + + // circuit-breaker + if let Some(_errs) = self.circuit_breaker.validate() { + errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); + } + + // flashblocks proxy + if self.flashblocks.enabled && + let Some(_errs) = self.flashblocks.validate() + { + errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); + } + + // logging + if let Some(_errs) = self.logging.validate() { + errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); + } + + // metrics + if let Some(_errs) = self.metrics.validate() { + errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); + } + + // rpc proxy + if self.rpc.enabled && + let Some(_errs) = self.rpc.validate() + { + errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); + } + + // tls + if (!self.tls.certificate.is_empty() || !self.tls.key.is_empty()) && + let Some(_errs) = self.tls.validate() + { + errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); + } + + if !self.authrpc.enabled && !self.flashblocks.enabled && !self.rpc.enabled { + errs.push(ConfigError::NoEnabledProxies); + } + + match errs.len() { + 0 => None, + _ => Some(errs), + } + } + + pub(crate) fn preprocess(&mut self) { + self.authrpc.preprocess(); + self.rpc.preprocess(); + } +} + +// ConfigError --------------------------------------------------------- + +#[derive(Debug, Error)] +pub(crate) enum ConfigError { + #[error("invalid authrpc proxy configuration: {0}")] + ConfigAuthrpcInvalid(ConfigAuthrpcError), + + #[error("invalid circuit-breaker configuration: {0}")] + ConfigCircuitBreakerInvalid(ConfigCircuitBreakerError), + + #[error("invalid flashblocks proxy configuration: {0}")] + ConfigFlashblocksInvalid(ConfigFlashblocksError), + + #[error("invalid logging configuration: {0}")] + ConfigLoggingInvalid(ConfigLogError), + + #[error("invalid metrics configuration: {0}")] + ConfigMetricsInvalid(ConfigMetricsError), + + #[error("invalid rpc proxy configuration: {0}")] + ConfigRpcInvalid(ConfigRpcError), + + #[error("invalid tls configuration: {0}")] + ConfigTlsInvalid(ConfigTlsError), + + #[error("no enabled proxies")] + NoEnabledProxies, +} + +impl From for ConfigError { + fn from(err: ConfigAuthrpcError) -> Self { + Self::ConfigAuthrpcInvalid(err) + } +} + +impl From for ConfigError { + fn from(err: ConfigCircuitBreakerError) -> Self { + Self::ConfigCircuitBreakerInvalid(err) + } +} + +impl From for ConfigError { + fn from(err: ConfigFlashblocksError) -> Self { + Self::ConfigFlashblocksInvalid(err) + } +} + +impl From for ConfigError { + fn from(err: ConfigLogError) -> Self { + Self::ConfigLoggingInvalid(err) + } +} + +impl From for ConfigError { + fn from(err: ConfigMetricsError) -> Self { + Self::ConfigMetricsInvalid(err) + } +} + +impl From for ConfigError { + fn from(err: ConfigRpcError) -> Self { + Self::ConfigRpcInvalid(err) + } +} + +impl From for ConfigError { + fn from(err: ConfigTlsError) -> Self { + Self::ConfigTlsInvalid(err) + } +} diff --git a/crates/rproxy/src/config/config_proxy_http.rs b/crates/rproxy/src/config/proxy_http.rs similarity index 100% rename from crates/rproxy/src/config/config_proxy_http.rs rename to crates/rproxy/src/config/proxy_http.rs diff --git a/crates/rproxy/src/config/config_proxy_ws.rs b/crates/rproxy/src/config/proxy_ws.rs similarity index 100% rename from crates/rproxy/src/config/config_proxy_ws.rs rename to crates/rproxy/src/config/proxy_ws.rs diff --git a/crates/rproxy/src/config/config_rpc.rs b/crates/rproxy/src/config/rpc.rs similarity index 96% rename from crates/rproxy/src/config/config_rpc.rs rename to crates/rproxy/src/config/rpc.rs index 3f5d4f6..9ea9a47 100644 --- a/crates/rproxy/src/config/config_rpc.rs +++ b/crates/rproxy/src/config/rpc.rs @@ -183,7 +183,7 @@ impl ConfigRpc { // backend_url match Url::parse(&self.backend_url) { Ok(url) => { - if let None = url.host() { + if url.host().is_none() { errs.push(ConfigRpcError::BackendUrlMissesHost { url: self.backend_url.clone(), }); @@ -205,9 +205,9 @@ impl ConfigRpc { // mirroring_peer_urls for peer_url in self.mirroring_peer_urls.iter() { - match Url::parse(&peer_url) { + match Url::parse(peer_url) { Ok(url) => { - if let None = url.host() { + if url.host().is_none() { errs.push(ConfigRpcError::PeerUrlMissesHost { url: peer_url.clone() }); } } @@ -233,7 +233,7 @@ impl ConfigRpc { let backend_url = Url::parse(&self.backend_url.clone()).expect(ALREADY_VALIDATED); let backend_host = backend_url.host_str().expect(ALREADY_VALIDATED); - let backend_ips: Vec = match format!("{}:0", backend_host).to_socket_addrs() { + let backend_ips: Vec = match format!("{backend_host}:0").to_socket_addrs() { Ok(res) => res, Err(err) => { warn!(host = backend_host, error = ?err, "Failed to resolve backend host"); @@ -246,7 +246,7 @@ impl ConfigRpc { let local_ips = get_all_local_ip_addresses(); self.mirroring_peer_urls.retain(|url| { - let peer_url = Url::parse(&url).expect(ALREADY_VALIDATED); + let peer_url = Url::parse(url).expect(ALREADY_VALIDATED); let peer_host = peer_url.host_str().expect(ALREADY_VALIDATED); if !peer_url.port().eq(&backend_url.port()) { @@ -259,7 +259,7 @@ impl ConfigRpc { return false; } - let peer_ips: Vec = match format!("{}:0", peer_host).to_socket_addrs() { + let peer_ips: Vec = match format!("{peer_host}:0").to_socket_addrs() { Ok(res) => res, Err(err) => { warn!(host = peer_host, error = ?err, "Failed to resolve peer host"); diff --git a/crates/rproxy/src/config/config_tls.rs b/crates/rproxy/src/config/tls.rs similarity index 90% rename from crates/rproxy/src/config/config_tls.rs rename to crates/rproxy/src/config/tls.rs index fdccd27..4aadf0c 100644 --- a/crates/rproxy/src/config/config_tls.rs +++ b/crates/rproxy/src/config/tls.rs @@ -53,11 +53,11 @@ impl ConfigTls { // certificate { - if self.certificate == "" && self.key != "" { + if self.certificate.is_empty() && !self.key.is_empty() { errs.push(ConfigTlsError::MissingCertificate); } - if self.certificate != "" { + if !self.certificate.is_empty() { match File::open(self.certificate.clone()) { Err(err) => { errs.push(ConfigTlsError::InvalidCertificateFile { @@ -104,11 +104,11 @@ impl ConfigTls { // key { - if self.certificate != "" && self.key == "" { + if !self.certificate.is_empty() && self.key.is_empty() { errs.push(ConfigTlsError::MissingKey); } - if self.key != "" { + if !self.key.is_empty() { match File::open(self.key.clone()) { Err(err) => { errs.push(ConfigTlsError::InvalidKeyFile { @@ -155,20 +155,15 @@ impl ConfigTls { // certificate + key { - match (cert, key) { - (Some(cert), Some(key)) => { - if let Err(err) = - ServerConfig::builder().with_no_client_auth().with_single_cert(cert, key) - { - errs.push(ConfigTlsError::InvalidPair { - path_cert: self.certificate.clone(), - path_key: self.key.clone(), - err: err.to_string(), - }); - } - } - - (_, _) => {} + if let (Some(cert), Some(key)) = (cert, key) && + let Err(err) = + ServerConfig::builder().with_no_client_auth().with_single_cert(cert, key) + { + errs.push(ConfigTlsError::InvalidPair { + path_cert: self.certificate.clone(), + path_key: self.key.clone(), + err: err.to_string(), + }); } } @@ -179,7 +174,7 @@ impl ConfigTls { } pub(crate) fn enabled(&self) -> bool { - self.certificate != "" && self.key != "" + !self.certificate.is_empty() && !self.key.is_empty() } pub(crate) fn key(&self) -> &PrivateKeyDer<'static> { diff --git a/crates/rproxy/src/jrpc/jrpc.rs b/crates/rproxy/src/jrpc/jrpc.rs deleted file mode 100644 index b2f7d61..0000000 --- a/crates/rproxy/src/jrpc/jrpc.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::borrow::Cow; - -use serde::Deserialize; - -// JrpcError ----------------------------------------------------------- - -#[derive(Clone, Deserialize)] -pub(crate) struct JrpcError { - // pub(crate) code: i64, - // pub(crate) message: String, -} - -// JrpcRequestMeta ----------------------------------------------------- - -const JRPC_METHOD_FCUV1_WITH_PAYLOAD: Cow<'static, str> = - Cow::Borrowed("engine_forkchoiceUpdatedV1_withPayload"); -const JRPC_METHOD_FCUV2_WITH_PAYLOAD: Cow<'static, str> = - Cow::Borrowed("engine_forkchoiceUpdatedV2_withPayload"); -const JRPC_METHOD_FCUV3_WITH_PAYLOAD: Cow<'static, str> = - Cow::Borrowed("engine_forkchoiceUpdatedV3_withPayload"); - -pub(crate) struct JrpcRequestMeta { - method: Cow<'static, str>, - method_enriched: Cow<'static, str>, -} - -impl JrpcRequestMeta { - #[inline] - pub(crate) fn method(&self) -> Cow<'static, str> { - self.method.clone() - } - - #[inline] - pub(crate) fn method_enriched(&self) -> Cow<'static, str> { - self.method_enriched.clone() - } -} - -impl<'a> Deserialize<'a> for JrpcRequestMeta { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'a>, - { - #[derive(Deserialize)] - struct JrpcRequestMetaWire { - method: Cow<'static, str>, - params: Vec, - } - - let wire = JrpcRequestMetaWire::deserialize(deserializer)?; - - let mut params_count = 0; - for param in wire.params.iter() { - if !param.is_null() { - params_count += 1; - } - } - - if params_count < 2 { - return Ok(Self { method: wire.method.clone(), method_enriched: wire.method.clone() }); - } - - let method_enriched = match wire.method.as_ref() { - "engine_forkchoiceUpdatedV1" => JRPC_METHOD_FCUV1_WITH_PAYLOAD.clone(), - "engine_forkchoiceUpdatedV2" => JRPC_METHOD_FCUV2_WITH_PAYLOAD.clone(), - "engine_forkchoiceUpdatedV3" => JRPC_METHOD_FCUV3_WITH_PAYLOAD.clone(), - - _ => wire.method.clone(), - }; - - Ok(Self { method: wire.method, method_enriched }) - } -} - -// JrpcRequestMetaMaybeBatch ------------------------------------------- - -const JRPC_METHOD_BATCH: Cow<'static, str> = Cow::Borrowed("batch"); - -#[derive(Deserialize)] -#[serde(untagged)] -pub(crate) enum JrpcRequestMetaMaybeBatch { - Single(JrpcRequestMeta), - Batch(Vec), -} - -impl JrpcRequestMetaMaybeBatch { - pub(crate) fn method_enriched(&self) -> Cow<'static, str> { - match self { - Self::Single(jrpc) => jrpc.method_enriched.clone(), - Self::Batch(_) => JRPC_METHOD_BATCH.clone(), - } - } -} - -// JrpcResponseMeta ---------------------------------------------------- - -#[derive(Clone, Deserialize)] -pub(crate) struct JrpcResponseMeta { - pub(crate) error: Option, -} diff --git a/crates/rproxy/src/jrpc/mod.rs b/crates/rproxy/src/jrpc/mod.rs index 5742f20..b2f7d61 100644 --- a/crates/rproxy/src/jrpc/mod.rs +++ b/crates/rproxy/src/jrpc/mod.rs @@ -1,2 +1,100 @@ -mod jrpc; -pub(crate) use jrpc::*; +use std::borrow::Cow; + +use serde::Deserialize; + +// JrpcError ----------------------------------------------------------- + +#[derive(Clone, Deserialize)] +pub(crate) struct JrpcError { + // pub(crate) code: i64, + // pub(crate) message: String, +} + +// JrpcRequestMeta ----------------------------------------------------- + +const JRPC_METHOD_FCUV1_WITH_PAYLOAD: Cow<'static, str> = + Cow::Borrowed("engine_forkchoiceUpdatedV1_withPayload"); +const JRPC_METHOD_FCUV2_WITH_PAYLOAD: Cow<'static, str> = + Cow::Borrowed("engine_forkchoiceUpdatedV2_withPayload"); +const JRPC_METHOD_FCUV3_WITH_PAYLOAD: Cow<'static, str> = + Cow::Borrowed("engine_forkchoiceUpdatedV3_withPayload"); + +pub(crate) struct JrpcRequestMeta { + method: Cow<'static, str>, + method_enriched: Cow<'static, str>, +} + +impl JrpcRequestMeta { + #[inline] + pub(crate) fn method(&self) -> Cow<'static, str> { + self.method.clone() + } + + #[inline] + pub(crate) fn method_enriched(&self) -> Cow<'static, str> { + self.method_enriched.clone() + } +} + +impl<'a> Deserialize<'a> for JrpcRequestMeta { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + #[derive(Deserialize)] + struct JrpcRequestMetaWire { + method: Cow<'static, str>, + params: Vec, + } + + let wire = JrpcRequestMetaWire::deserialize(deserializer)?; + + let mut params_count = 0; + for param in wire.params.iter() { + if !param.is_null() { + params_count += 1; + } + } + + if params_count < 2 { + return Ok(Self { method: wire.method.clone(), method_enriched: wire.method.clone() }); + } + + let method_enriched = match wire.method.as_ref() { + "engine_forkchoiceUpdatedV1" => JRPC_METHOD_FCUV1_WITH_PAYLOAD.clone(), + "engine_forkchoiceUpdatedV2" => JRPC_METHOD_FCUV2_WITH_PAYLOAD.clone(), + "engine_forkchoiceUpdatedV3" => JRPC_METHOD_FCUV3_WITH_PAYLOAD.clone(), + + _ => wire.method.clone(), + }; + + Ok(Self { method: wire.method, method_enriched }) + } +} + +// JrpcRequestMetaMaybeBatch ------------------------------------------- + +const JRPC_METHOD_BATCH: Cow<'static, str> = Cow::Borrowed("batch"); + +#[derive(Deserialize)] +#[serde(untagged)] +pub(crate) enum JrpcRequestMetaMaybeBatch { + Single(JrpcRequestMeta), + Batch(Vec), +} + +impl JrpcRequestMetaMaybeBatch { + pub(crate) fn method_enriched(&self) -> Cow<'static, str> { + match self { + Self::Single(jrpc) => jrpc.method_enriched.clone(), + Self::Batch(_) => JRPC_METHOD_BATCH.clone(), + } + } +} + +// JrpcResponseMeta ---------------------------------------------------- + +#[derive(Clone, Deserialize)] +pub(crate) struct JrpcResponseMeta { + pub(crate) error: Option, +} diff --git a/crates/rproxy/src/metrics/metrics_candlestick.rs b/crates/rproxy/src/metrics/candlestick.rs similarity index 100% rename from crates/rproxy/src/metrics/metrics_candlestick.rs rename to crates/rproxy/src/metrics/candlestick.rs diff --git a/crates/rproxy/src/metrics/metrics_labels.rs b/crates/rproxy/src/metrics/labels.rs similarity index 100% rename from crates/rproxy/src/metrics/metrics_labels.rs rename to crates/rproxy/src/metrics/labels.rs diff --git a/crates/rproxy/src/metrics/metrics.rs b/crates/rproxy/src/metrics/metrics.rs deleted file mode 100644 index 223f28b..0000000 --- a/crates/rproxy/src/metrics/metrics.rs +++ /dev/null @@ -1,369 +0,0 @@ -use std::{net::TcpListener, sync::Arc, time::Duration}; - -use actix_web::{ - App, - HttpRequest, - HttpResponse, - HttpServer, - middleware::{NormalizePath, TrailingSlash}, - web, -}; -use awc::http::Method; -use prometheus_client::{ - metrics::{counter::Counter, family::Family, gauge::Gauge}, - registry::{Registry, Unit}, -}; -use socket2::{SockAddr, Socket, TcpKeepalive}; -use tracing::{error, info}; - -use crate::{ - config::ConfigMetrics, - metrics::{ - Candlestick, - LabelsProxy, - LabelsProxyClientInfo, - LabelsProxyHttpJrpc, - LabelsProxyWs, - }, -}; - -// Metrics ------------------------------------------------------------- - -pub(crate) struct Metrics { - config: ConfigMetrics, - registry: Registry, - - pub(crate) client_connections_active_count: Family, - pub(crate) client_connections_established_count: Family, - pub(crate) client_connections_closed_count: Family, - pub(crate) client_info: Family, - - pub(crate) http_latency_backend: Family, - pub(crate) http_latency_delta: Family, - pub(crate) http_latency_total: Family, - - pub(crate) http_mirror_success_count: Family, - pub(crate) http_mirror_failure_count: Family, - - pub(crate) http_proxy_success_count: Family, - pub(crate) http_proxy_failure_count: Family, - - pub(crate) http_request_size: Family, - pub(crate) http_response_size: Family, - - pub(crate) http_request_decompressed_size: Family, - pub(crate) http_response_decompressed_size: Family, - - pub(crate) tls_certificate_valid_not_before: Gauge, - pub(crate) tls_certificate_valid_not_after: Gauge, - - pub(crate) ws_latency_backend: Family, - pub(crate) ws_latency_client: Family, - pub(crate) ws_latency_proxy: Family, - - pub(crate) ws_message_size: Family, - - pub(crate) ws_proxy_success_count: Family, - pub(crate) ws_proxy_failure_count: Family, -} - -impl Metrics { - pub(crate) fn new(config: ConfigMetrics) -> Self { - let mut this = Metrics { - config, - registry: Registry::with_prefix("rproxy"), - - client_connections_active_count: Family::default(), - client_connections_established_count: Family::default(), - client_connections_closed_count: Family::default(), - - client_info: Family::default(), - - http_latency_backend: Family::default(), - http_latency_delta: Family::default(), - http_latency_total: Family::default(), - - http_mirror_success_count: Family::default(), - http_mirror_failure_count: Family::default(), - - http_proxy_success_count: Family::default(), - http_proxy_failure_count: Family::default(), - - http_request_size: Family::default(), - http_response_size: Family::default(), - - http_request_decompressed_size: Family::default(), - http_response_decompressed_size: Family::default(), - - tls_certificate_valid_not_before: Gauge::default(), - tls_certificate_valid_not_after: Gauge::default(), - - ws_latency_backend: Family::default(), - ws_latency_client: Family::default(), - ws_latency_proxy: Family::default(), - - ws_message_size: Family::default(), - - ws_proxy_success_count: Family::default(), - ws_proxy_failure_count: Family::default(), - }; - - this.registry.register( - "client_connections_active_count", - "count of active client connections", - this.client_connections_active_count.clone(), - ); - - this.registry.register( - "client_connections_established_count", - "count of client connections established", - this.client_connections_established_count.clone(), - ); - - this.registry.register( - "client_info", - "general information about the client", - this.client_info.clone(), - ); - - this.registry.register( - "client_connections_closed_count", - "count of client connections closed", - this.client_connections_closed_count.clone(), - ); - - this.registry.register_with_unit( - "http_latency_backend", - "latency of backend http responses (interval b/w end of client's request and begin of backend's response)", - Unit::Other(String::from("nanoseconds")), - this.http_latency_backend.clone(), - ); - - this.registry.register_with_unit( - "http_latency_delta", - "latency delta (http_latency_total - http_latency_backend)", - Unit::Other(String::from("nanoseconds")), - this.http_latency_delta.clone(), - ); - - this.registry.register_with_unit( - "http_latency_total", - "overall latency of http requests (interval b/w begin of client's request and end of forwarded response)", - Unit::Other(String::from("nanoseconds")), - this.http_latency_total.clone(), - ); - - this.registry.register( - "http_mirror_success_count", - "count of successfully mirrored http requests/responses", - this.http_mirror_success_count.clone(), - ); - - this.registry.register( - "http_mirror_failure_count", - "count of failures to mirror http request/response", - this.http_mirror_failure_count.clone(), - ); - - this.registry.register( - "http_proxy_success_count", - "count of successfully proxied http requests/responses", - this.http_proxy_success_count.clone(), - ); - - this.registry.register( - "http_proxy_failure_count", - "count of failures to proxy http request/response", - this.http_proxy_failure_count.clone(), - ); - - this.registry.register_with_unit( - "http_request_size", - "sizes of incoming http requests", - Unit::Bytes, - this.http_request_size.clone(), - ); - - this.registry.register_with_unit( - "http_response_size", - "sizes of proxied http responses", - Unit::Bytes, - this.http_response_size.clone(), - ); - - this.registry.register_with_unit( - "http_request_decompressed_size", - "decompressed sizes of incoming http requests", - Unit::Bytes, - this.http_request_decompressed_size.clone(), - ); - - this.registry.register_with_unit( - "http_response_decompressed_size", - "decompressed sizes of proxied http responses", - Unit::Bytes, - this.http_response_decompressed_size.clone(), - ); - - this.registry.register( - "tls_certificate_valid_not_before", - "tls certificate's not-valid-before timestamp", - this.tls_certificate_valid_not_before.clone(), - ); - - this.registry.register( - "tls_certificate_valid_not_after", - "tls certificate's not-valid-after timestamp", - this.tls_certificate_valid_not_after.clone(), - ); - - this.registry.register_with_unit( - "ws_latency_backend", - "round-trip-time of websocket pings to backend divided by 2", - Unit::Other(String::from("nanoseconds")), - this.ws_latency_backend.clone(), - ); - - this.registry.register_with_unit( - "ws_latency_client", - "round-trip-time of websocket pings to backend divided by 2", - Unit::Other(String::from("nanoseconds")), - this.ws_latency_client.clone(), - ); - - this.registry.register_with_unit( - "ws_latency_proxy", - "time to process the websocket message by the proxy", - Unit::Other(String::from("nanoseconds")), - this.ws_latency_proxy.clone(), - ); - - this.registry.register_with_unit( - "ws_message_size", - "sizes of proxied websocket messages", - Unit::Bytes, - this.ws_message_size.clone(), - ); - - this.registry.register( - "ws_proxy_success_count", - "count of successfully proxied websocket messages", - this.ws_proxy_success_count.clone(), - ); - - this.registry.register( - "ws_proxy_failure_count", - "count of failures to proxy websocket message", - this.ws_proxy_failure_count.clone(), - ); - - this - } - - #[inline] - pub(crate) fn name() -> &'static str { - "metrics" - } - - pub(crate) async fn run( - self: Arc, - canceller: tokio_util::sync::CancellationToken, - ) -> Result<(), Box> { - let listen_address = self.config.listen_address().clone(); - - let listener = match self.listen() { - Ok(listener) => listener, - Err(err) => { - error!( - service = Self::name(), - addr = %&self.config.listen_address(), - error = ?err, - "Failed to initialise a socket" - ); - return Err(Box::new(err)); - } - }; - - let server = match HttpServer::new(move || { - App::new() - .app_data(web::Data::new(self.clone())) - .wrap(NormalizePath::new(TrailingSlash::Trim)) - .default_service(web::route().to(Self::receive)) - }) - .workers(1) - .shutdown_signal(canceller.cancelled_owned()) - .listen(listener) - { - Ok(metrics) => metrics, - Err(err) => { - error!(service = Self::name(), error = ?err, "Failed to initialise http service"); - return Err(Box::new(err)); - } - }; - - info!( - service = Self::name(), - listen_address = %listen_address, - "Starting http service...", - ); - - if let Err(err) = server.run().await { - error!(service = Self::name(), error = ?err, "Failure while running http service") - } - - info!(service = Self::name(), "Stopped http service"); - - Ok(()) - } - - fn listen(&self) -> std::io::Result { - let socket = Socket::new( - socket2::Domain::for_address(self.config.listen_address()), - socket2::Type::STREAM, - Some(socket2::Protocol::TCP), - )?; - - // must use non-blocking with tokio - socket.set_nonblocking(true)?; - - // allow time to flush buffers on close - socket.set_linger(Some(Duration::from_secs(1)))?; - - // allow binding to the socket whlie there are still TIME_WAIT conns - socket.set_reuse_address(true)?; - - socket.set_tcp_keepalive( - &TcpKeepalive::new() - .with_time(Duration::from_secs(15)) - .with_interval(Duration::from_secs(15)) - .with_retries(4), - )?; - - socket.bind(&SockAddr::from(self.config.listen_address()))?; - - socket.listen(16)?; - - Ok(socket.into()) - } - - async fn receive( - req: HttpRequest, - _: web::Payload, - this: web::Data>, - ) -> Result { - if req.method() != Method::GET { - return Ok(HttpResponse::BadRequest().finish()); - } - - let mut body = String::new(); - - if let Err(err) = prometheus_client::encoding::text::encode(&mut body, &this.registry) { - error!(service = Self::name(), error = ?err, "Failed to encode metrics"); - return Ok(HttpResponse::InternalServerError().finish()); - } - - Ok(HttpResponse::Ok() - .content_type("application/openmetrics-text; version=1.0.0; charset=utf-8") - .body(body)) - } -} diff --git a/crates/rproxy/src/metrics/mod.rs b/crates/rproxy/src/metrics/mod.rs index 3cf2ed5..3fbc356 100644 --- a/crates/rproxy/src/metrics/mod.rs +++ b/crates/rproxy/src/metrics/mod.rs @@ -1,8 +1,371 @@ -mod metrics_candlestick; -pub(crate) use metrics_candlestick::Candlestick; +mod candlestick; +pub(crate) use candlestick::Candlestick; -mod metrics_labels; -pub(crate) use metrics_labels::*; +mod labels; +use std::{net::TcpListener, sync::Arc, time::Duration}; -mod metrics; -pub(crate) use metrics::Metrics; +use actix_web::{ + App, + HttpRequest, + HttpResponse, + HttpServer, + middleware::{NormalizePath, TrailingSlash}, + web, +}; +use awc::http::Method; +use prometheus_client::{ + metrics::{counter::Counter, family::Family, gauge::Gauge}, + registry::{Registry, Unit}, +}; +use socket2::{SockAddr, Socket, TcpKeepalive}; +use tracing::{error, info}; + +use crate::config::ConfigMetrics; +pub(crate) use crate::metrics::labels::{ + LabelsProxy, + LabelsProxyClientInfo, + LabelsProxyHttpJrpc, + LabelsProxyWs, +}; + +// Metrics ------------------------------------------------------------- + +pub(crate) struct Metrics { + config: ConfigMetrics, + registry: Registry, + + pub(crate) client_connections_active_count: Family, + pub(crate) client_connections_established_count: Family, + pub(crate) client_connections_closed_count: Family, + pub(crate) client_info: Family, + + pub(crate) http_latency_backend: Family, + pub(crate) http_latency_delta: Family, + pub(crate) http_latency_total: Family, + + pub(crate) http_mirror_success_count: Family, + pub(crate) http_mirror_failure_count: Family, + + pub(crate) http_proxy_success_count: Family, + pub(crate) http_proxy_failure_count: Family, + + pub(crate) http_request_size: Family, + pub(crate) http_response_size: Family, + + pub(crate) http_request_decompressed_size: Family, + pub(crate) http_response_decompressed_size: Family, + + pub(crate) tls_certificate_valid_not_before: Gauge, + pub(crate) tls_certificate_valid_not_after: Gauge, + + pub(crate) ws_latency_backend: Family, + pub(crate) ws_latency_client: Family, + pub(crate) ws_latency_proxy: Family, + + pub(crate) ws_message_size: Family, + + pub(crate) ws_proxy_success_count: Family, + pub(crate) ws_proxy_failure_count: Family, +} + +impl Metrics { + pub(crate) fn new(config: ConfigMetrics) -> Self { + let mut this = Metrics { + config, + registry: Registry::with_prefix("rproxy"), + + client_connections_active_count: Family::default(), + client_connections_established_count: Family::default(), + client_connections_closed_count: Family::default(), + + client_info: Family::default(), + + http_latency_backend: Family::default(), + http_latency_delta: Family::default(), + http_latency_total: Family::default(), + + http_mirror_success_count: Family::default(), + http_mirror_failure_count: Family::default(), + + http_proxy_success_count: Family::default(), + http_proxy_failure_count: Family::default(), + + http_request_size: Family::default(), + http_response_size: Family::default(), + + http_request_decompressed_size: Family::default(), + http_response_decompressed_size: Family::default(), + + tls_certificate_valid_not_before: Gauge::default(), + tls_certificate_valid_not_after: Gauge::default(), + + ws_latency_backend: Family::default(), + ws_latency_client: Family::default(), + ws_latency_proxy: Family::default(), + + ws_message_size: Family::default(), + + ws_proxy_success_count: Family::default(), + ws_proxy_failure_count: Family::default(), + }; + + this.registry.register( + "client_connections_active_count", + "count of active client connections", + this.client_connections_active_count.clone(), + ); + + this.registry.register( + "client_connections_established_count", + "count of client connections established", + this.client_connections_established_count.clone(), + ); + + this.registry.register( + "client_info", + "general information about the client", + this.client_info.clone(), + ); + + this.registry.register( + "client_connections_closed_count", + "count of client connections closed", + this.client_connections_closed_count.clone(), + ); + + this.registry.register_with_unit( + "http_latency_backend", + "latency of backend http responses (interval b/w end of client's request and begin of backend's response)", + Unit::Other(String::from("nanoseconds")), + this.http_latency_backend.clone(), + ); + + this.registry.register_with_unit( + "http_latency_delta", + "latency delta (http_latency_total - http_latency_backend)", + Unit::Other(String::from("nanoseconds")), + this.http_latency_delta.clone(), + ); + + this.registry.register_with_unit( + "http_latency_total", + "overall latency of http requests (interval b/w begin of client's request and end of forwarded response)", + Unit::Other(String::from("nanoseconds")), + this.http_latency_total.clone(), + ); + + this.registry.register( + "http_mirror_success_count", + "count of successfully mirrored http requests/responses", + this.http_mirror_success_count.clone(), + ); + + this.registry.register( + "http_mirror_failure_count", + "count of failures to mirror http request/response", + this.http_mirror_failure_count.clone(), + ); + + this.registry.register( + "http_proxy_success_count", + "count of successfully proxied http requests/responses", + this.http_proxy_success_count.clone(), + ); + + this.registry.register( + "http_proxy_failure_count", + "count of failures to proxy http request/response", + this.http_proxy_failure_count.clone(), + ); + + this.registry.register_with_unit( + "http_request_size", + "sizes of incoming http requests", + Unit::Bytes, + this.http_request_size.clone(), + ); + + this.registry.register_with_unit( + "http_response_size", + "sizes of proxied http responses", + Unit::Bytes, + this.http_response_size.clone(), + ); + + this.registry.register_with_unit( + "http_request_decompressed_size", + "decompressed sizes of incoming http requests", + Unit::Bytes, + this.http_request_decompressed_size.clone(), + ); + + this.registry.register_with_unit( + "http_response_decompressed_size", + "decompressed sizes of proxied http responses", + Unit::Bytes, + this.http_response_decompressed_size.clone(), + ); + + this.registry.register( + "tls_certificate_valid_not_before", + "tls certificate's not-valid-before timestamp", + this.tls_certificate_valid_not_before.clone(), + ); + + this.registry.register( + "tls_certificate_valid_not_after", + "tls certificate's not-valid-after timestamp", + this.tls_certificate_valid_not_after.clone(), + ); + + this.registry.register_with_unit( + "ws_latency_backend", + "round-trip-time of websocket pings to backend divided by 2", + Unit::Other(String::from("nanoseconds")), + this.ws_latency_backend.clone(), + ); + + this.registry.register_with_unit( + "ws_latency_client", + "round-trip-time of websocket pings to backend divided by 2", + Unit::Other(String::from("nanoseconds")), + this.ws_latency_client.clone(), + ); + + this.registry.register_with_unit( + "ws_latency_proxy", + "time to process the websocket message by the proxy", + Unit::Other(String::from("nanoseconds")), + this.ws_latency_proxy.clone(), + ); + + this.registry.register_with_unit( + "ws_message_size", + "sizes of proxied websocket messages", + Unit::Bytes, + this.ws_message_size.clone(), + ); + + this.registry.register( + "ws_proxy_success_count", + "count of successfully proxied websocket messages", + this.ws_proxy_success_count.clone(), + ); + + this.registry.register( + "ws_proxy_failure_count", + "count of failures to proxy websocket message", + this.ws_proxy_failure_count.clone(), + ); + + this + } + + #[inline] + pub(crate) fn name() -> &'static str { + "metrics" + } + + pub(crate) async fn run( + self: Arc, + canceller: tokio_util::sync::CancellationToken, + ) -> Result<(), Box> { + let listen_address = self.config.listen_address(); + + let listener = match self.listen() { + Ok(listener) => listener, + Err(err) => { + error!( + service = Self::name(), + addr = %&self.config.listen_address(), + error = ?err, + "Failed to initialise a socket" + ); + return Err(Box::new(err)); + } + }; + + let server = match HttpServer::new(move || { + App::new() + .app_data(web::Data::new(self.clone())) + .wrap(NormalizePath::new(TrailingSlash::Trim)) + .default_service(web::route().to(Self::receive)) + }) + .workers(1) + .shutdown_signal(canceller.cancelled_owned()) + .listen(listener) + { + Ok(metrics) => metrics, + Err(err) => { + error!(service = Self::name(), error = ?err, "Failed to initialise http service"); + return Err(Box::new(err)); + } + }; + + info!( + service = Self::name(), + listen_address = %listen_address, + "Starting http service...", + ); + + if let Err(err) = server.run().await { + error!(service = Self::name(), error = ?err, "Failure while running http service") + } + + info!(service = Self::name(), "Stopped http service"); + + Ok(()) + } + + fn listen(&self) -> std::io::Result { + let socket = Socket::new( + socket2::Domain::for_address(self.config.listen_address()), + socket2::Type::STREAM, + Some(socket2::Protocol::TCP), + )?; + + // must use non-blocking with tokio + socket.set_nonblocking(true)?; + + // allow time to flush buffers on close + socket.set_linger(Some(Duration::from_secs(1)))?; + + // allow binding to the socket whlie there are still TIME_WAIT conns + socket.set_reuse_address(true)?; + + socket.set_tcp_keepalive( + &TcpKeepalive::new() + .with_time(Duration::from_secs(15)) + .with_interval(Duration::from_secs(15)) + .with_retries(4), + )?; + + socket.bind(&SockAddr::from(self.config.listen_address()))?; + + socket.listen(16)?; + + Ok(socket.into()) + } + + #[expect(clippy::unused_async, reason = "required by the actix framework")] + async fn receive( + req: HttpRequest, + _: web::Payload, + this: web::Data>, + ) -> Result { + if req.method() != Method::GET { + return Ok(HttpResponse::BadRequest().finish()); + } + + let mut body = String::new(); + + if let Err(err) = prometheus_client::encoding::text::encode(&mut body, &this.registry) { + error!(service = Self::name(), error = ?err, "Failed to encode metrics"); + return Ok(HttpResponse::InternalServerError().finish()); + } + + Ok(HttpResponse::Ok() + .content_type("application/openmetrics-text; version=1.0.0; charset=utf-8") + .body(body)) + } +} diff --git a/crates/rproxy/src/proxy/mod.rs b/crates/rproxy/src/proxy/mod.rs index c677ef9..6678460 100644 --- a/crates/rproxy/src/proxy/mod.rs +++ b/crates/rproxy/src/proxy/mod.rs @@ -1,2 +1,138 @@ -mod proxy; -pub(crate) use proxy::{Proxy, ProxyConnectionGuard, ProxyInner}; +use std::{ + any::Any, + sync::{ + Arc, + atomic::{AtomicI64, Ordering}, + }, +}; + +use actix_web::dev::Extensions; +use tracing::{debug, warn}; +use uuid::Uuid; + +use crate::metrics::{LabelsProxy, Metrics}; + +// Proxy --------------------------------------------------------------- + +pub(crate) trait Proxy

+where + P: ProxyInner, +{ + fn on_connect( + metrics: Arc, + client_connections_count: Arc, + ) -> impl Fn(&dyn Any, &mut Extensions) { + move |connection, extensions| { + { + let val = client_connections_count.fetch_add(1, Ordering::Relaxed) + 1; + let metric_labels = LabelsProxy { proxy: P::name() }; + + metrics.client_connections_active_count.get_or_create(&metric_labels).set(val); + metrics.client_connections_established_count.get_or_create(&metric_labels).inc(); + } + + let stream: Option<&actix_web::rt::net::TcpStream> = if let Some(stream) = connection.downcast_ref::>() { + let (stream, _) = stream.get_ref(); + Some(stream) + } else if let Some(stream) = connection.downcast_ref::() { + Some(stream) + } else { + warn!("Unexpected stream type"); + None + }; + + if let Some(stream) = stream { + let id = Uuid::now_v7(); + + let remote_addr = match stream.peer_addr() { + Ok(local_addr) => Some(local_addr.to_string()), + Err(err) => { + warn!(proxy = P::name(), error = ?err, "Failed to get remote address"); + None + } + }; + let local_addr = match stream.local_addr() { + Ok(local_addr) => Some(local_addr.to_string()), + Err(err) => { + warn!(proxy = P::name(), error = ?err, "Failed to get remote address"); + None + } + }; + + debug!( + proxy = P::name(), + connection_id = %id, + remote_addr = remote_addr.as_ref().map_or("unknown", |v| v.as_str()), + local_addr = local_addr.as_ref().map_or("unknown", |v| v.as_str()), + "Client connection open" + ); + + extensions.insert(ProxyConnectionGuard::new( + id, + P::name(), + remote_addr, + local_addr, + &metrics, + client_connections_count.clone(), + )); + } + } + } +} + +// ProxyInner ---------------------------------------------------------- + +pub(crate) trait ProxyInner: 'static { + fn name() -> &'static str; +} + +// ProxyConnectionGuard ------------------------------------------------ + +pub(crate) struct ProxyConnectionGuard { + pub id: Uuid, + pub remote_addr: Option, + pub local_addr: Option, + + proxy_name: &'static str, + metrics: Arc, + client_connections_count: Arc, +} + +impl ProxyConnectionGuard { + fn new( + id: Uuid, + proxy_name: &'static str, + remote_addr: Option, + local_addr: Option, + metrics: &Arc, + client_connections_count: Arc, + ) -> Self { + Self { + id, + remote_addr, + local_addr, + proxy_name, + metrics: metrics.clone(), + client_connections_count, + } + } +} + +impl Drop for ProxyConnectionGuard { + fn drop(&mut self) { + let val = self.client_connections_count.fetch_sub(1, Ordering::Relaxed) - 1; + + let metric_labels = LabelsProxy { proxy: self.proxy_name }; + + self.metrics.client_connections_active_count.get_or_create(&metric_labels).set(val); + self.metrics.client_connections_closed_count.get_or_create(&metric_labels).inc(); + + debug!( + proxy = self.proxy_name, + connection_id = %self.id, + remote_addr = self.remote_addr.as_ref().map_or("unknown", |v| v.as_str()), + local_addr = self.local_addr.as_ref().map_or("unknown", |v| v.as_str()), + "Client connection closed" + ); + } +} diff --git a/crates/rproxy/src/proxy/proxy.rs b/crates/rproxy/src/proxy/proxy.rs deleted file mode 100644 index b4283cb..0000000 --- a/crates/rproxy/src/proxy/proxy.rs +++ /dev/null @@ -1,138 +0,0 @@ -use std::{ - any::Any, - sync::{ - Arc, - atomic::{AtomicI64, Ordering}, - }, -}; - -use actix_web::dev::Extensions; -use tracing::{debug, warn}; -use uuid::Uuid; - -use crate::metrics::{LabelsProxy, Metrics}; - -// Proxy --------------------------------------------------------------- - -pub(crate) trait Proxy

-where - P: ProxyInner, -{ - fn on_connect( - metrics: Arc, - client_connections_count: Arc, - ) -> impl Fn(&dyn Any, &mut Extensions) { - move |connection, extensions| { - { - let val = client_connections_count.fetch_add(1, Ordering::Relaxed) + 1; - let metric_labels = LabelsProxy { proxy: P::name() }; - - metrics.client_connections_active_count.get_or_create(&metric_labels).set(val); - metrics.client_connections_established_count.get_or_create(&metric_labels).inc(); - } - - let stream: Option<&actix_web::rt::net::TcpStream> = if let Some(stream) = connection.downcast_ref::>() { - let (stream, _) = stream.get_ref(); - Some(stream) - } else if let Some(stream) = connection.downcast_ref::() { - Some(stream) - } else { - warn!("Unexpected stream type"); - None - }; - - if let Some(stream) = stream { - let id = Uuid::now_v7(); - - let remote_addr = match stream.peer_addr() { - Ok(local_addr) => Some(local_addr.to_string()), - Err(err) => { - warn!(proxy = P::name(), error = ?err, "Failed to get remote address"); - None - } - }; - let local_addr = match stream.local_addr() { - Ok(local_addr) => Some(local_addr.to_string()), - Err(err) => { - warn!(proxy = P::name(), error = ?err, "Failed to get remote address"); - None - } - }; - - debug!( - proxy = P::name(), - connection_id = %id, - remote_addr = remote_addr.as_ref().map_or("unknown", |v| v.as_str()), - local_addr = local_addr.as_ref().map_or("unknown", |v| v.as_str()), - "Client connection open" - ); - - extensions.insert(ProxyConnectionGuard::new( - id, - P::name(), - remote_addr, - local_addr, - &metrics, - client_connections_count.clone(), - )); - } - } - } -} - -// ProxyInner ---------------------------------------------------------- - -pub(crate) trait ProxyInner: 'static { - fn name() -> &'static str; -} - -// ProxyConnectionGuard ------------------------------------------------ - -pub struct ProxyConnectionGuard { - pub id: Uuid, - pub remote_addr: Option, - pub local_addr: Option, - - proxy_name: &'static str, - metrics: Arc, - client_connections_count: Arc, -} - -impl ProxyConnectionGuard { - fn new( - id: Uuid, - proxy_name: &'static str, - remote_addr: Option, - local_addr: Option, - metrics: &Arc, - client_connections_count: Arc, - ) -> Self { - Self { - id, - remote_addr, - local_addr, - proxy_name, - metrics: metrics.clone(), - client_connections_count, - } - } -} - -impl Drop for ProxyConnectionGuard { - fn drop(&mut self) { - let val = self.client_connections_count.fetch_sub(1, Ordering::Relaxed) - 1; - - let metric_labels = LabelsProxy { proxy: &self.proxy_name }; - - self.metrics.client_connections_active_count.get_or_create(&metric_labels).set(val); - self.metrics.client_connections_closed_count.get_or_create(&metric_labels).inc(); - - debug!( - proxy = self.proxy_name, - connection_id = %self.id, - remote_addr = self.remote_addr.as_ref().map_or("unknown", |v| v.as_str()), - local_addr = self.local_addr.as_ref().map_or("unknown", |v| v.as_str()), - "Client connection closed" - ); - } -} diff --git a/crates/rproxy/src/proxy_http/proxy_http.rs b/crates/rproxy/src/proxy_http/http.rs similarity index 88% rename from crates/rproxy/src/proxy_http/proxy_http.rs rename to crates/rproxy/src/proxy_http/http.rs index 87871b0..65d15c9 100644 --- a/crates/rproxy/src/proxy_http/proxy_http.rs +++ b/crates/rproxy/src/proxy_http/http.rs @@ -88,7 +88,7 @@ where let backend = ProxyHttpBackendEndpoint::new( inner.clone(), - id.clone(), + id, shared.metrics.clone(), config.backend_url(), connections_limit, @@ -102,7 +102,7 @@ where .map(|peer_url| { ProxyHttpBackendEndpoint::new( shared.inner(), - id.clone(), + id, shared.metrics.clone(), peer_url.to_owned(), config.backend_max_concurrent_requests(), @@ -114,7 +114,7 @@ where ); let postprocessor = ProxyHttpPostprocessor:: { - worker_id: id.clone(), + worker_id: id, inner: inner.clone(), metrics: shared.metrics.clone(), mirroring_peers: peers.clone(), @@ -132,7 +132,7 @@ where canceller: tokio_util::sync::CancellationToken, resetter: broadcast::Sender<()>, ) -> Result<(), Box> { - let listen_address = config.listen_address().clone(); + let listen_address = config.listen_address(); let listener = match Self::listen(&config) { Ok(listener) => listener, @@ -215,7 +215,7 @@ where let handler = server.handle(); let mut resetter = resetter.subscribe(); tokio::spawn(async move { - if let Ok(_) = resetter.recv().await { + if resetter.recv().await.is_ok() { info!(proxy = P::name(), "Reset signal received, stopping http-proxy..."); handler.stop(true).await; } @@ -306,8 +306,8 @@ where let info = ProxyHttpRequestInfo::new(&cli_req, cli_req.conn_data::()); - let id = info.id.clone(); - let connection_id = info.connection_id.clone(); + let id = info.id; + let connection_id = info.connection_id; let bck_req = this.backend.new_backend_request(&info); let bck_req_body = ProxyHttpRequestBody::new(this.clone(), info, cli_req_body, timestamp); @@ -329,7 +329,7 @@ where .http_proxy_failure_count .get_or_create(&LabelsProxy { proxy: P::name() }) .inc(); - return Ok(HttpResponse::BadGateway().body(format!("Backend error: {:?}", err))); + return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } }; @@ -350,10 +350,10 @@ where } fn postprocess_client_request(&self, req: ProxiedHttpRequest) { - let id = req.info.id.clone(); - let connection_id = req.info.connection_id.clone(); + let id = req.info.id; + let connection_id = req.info.connection_id; - if let Err(_) = self.requests.insert_sync(id, req) { + if self.requests.insert_sync(id, req).is_err() { error!( proxy = P::name(), request_id = %id, @@ -365,17 +365,14 @@ where } fn postprocess_backend_response(&self, bck_res: ProxiedHttpResponse) { - let cli_req = match self.requests.remove_sync(&bck_res.info.id) { - Some((_, req)) => req, - None => { - error!( - proxy = P::name(), - request_id = %bck_res.info.id, - worker_id = %self.id, - "Proxied http response for unmatching request", - ); - return; - } + let Some((_, cli_req)) = self.requests.remove_sync(&bck_res.info.id) else { + error!( + proxy = P::name(), + request_id = %bck_res.info.id, + worker_id = %self.id, + "Proxied http response for unmatching request", + ); + return; }; // hand over to postprocessor asynchronously so that we can return the @@ -575,31 +572,26 @@ where return; } - let message = match message.as_object_mut() { - Some(message) => message, - None => return, + let Some(message) = message.as_object_mut() else { + return; }; - let method = match match message.get_key_value("method") { + let method = (match message.get_key_value("method") { Some((_, method)) => method.as_str(), None => None, - } { - Some(method) => method, - None => "", - } - .to_owned(); + }) + .unwrap_or_default() + .to_string(); - if method != "" { + if !method.is_empty() { // single-shot request - let params = match match message.get_mut("params") { + let Some(params) = match message.get_mut("params") { Some(params) => params, None => return, } - .as_array_mut() - { - Some(params) => params, - None => return, + .as_array_mut() else { + return; }; match method.as_str() { @@ -608,19 +600,16 @@ where return; } - let execution_payload = match params[1].as_object_mut() { - Some(execution_payload) => execution_payload, - None => return, + let Some(execution_payload) = params[1].as_object_mut() else { + return; }; - let transactions = match match execution_payload.get_mut("transactions") { + let Some(transactions) = match execution_payload.get_mut("transactions") { Some(transactions) => transactions, None => return, } - .as_array_mut() - { - Some(transactions) => transactions, - None => return, + .as_array_mut() else { + return; }; for transaction in transactions { @@ -629,23 +618,20 @@ where } "engine_newPayloadV4" => { - if params.len() < 1 { + if params.is_empty() { return; } - let execution_payload = match params[0].as_object_mut() { - Some(execution_payload) => execution_payload, - None => return, + let Some(execution_payload) = params[0].as_object_mut() else { + return; }; - let transactions = match match execution_payload.get_mut("transactions") { + let Some(transactions) = match execution_payload.get_mut("transactions") { Some(transactions) => transactions, None => return, } - .as_array_mut() - { - Some(transactions) => transactions, - None => return, + .as_array_mut() else { + return; }; for transaction in transactions { @@ -654,23 +640,20 @@ where } "eth_sendBundle" => { - if params.len() < 1 { + if params.is_empty() { return; } - let execution_payload = match params[0].as_object_mut() { - Some(execution_payload) => execution_payload, - None => return, + let Some(execution_payload) = params[0].as_object_mut() else { + return; }; - let transactions = match match execution_payload.get_mut("txs") { + let Some(transactions) = match execution_payload.get_mut("txs") { Some(transactions) => transactions, None => return, } - .as_array_mut() - { - Some(transactions) => transactions, - None => return, + .as_array_mut() else { + return; }; for transaction in transactions { @@ -690,23 +673,21 @@ where } } - let result = match match message.get_mut("result") { + let Some(result) = (match message.get_mut("result") { Some(result) => result.as_object_mut(), None => return, - } { - Some(result) => result, - None => return, + }) else { + return; }; - if let Some(execution_payload) = result.get_mut("executionPayload") { - if let Some(transactions) = execution_payload.get_mut("transactions") { - if let Some(transactions) = transactions.as_array_mut() { - // engine_getPayloadV4 + if let Some(execution_payload) = result.get_mut("executionPayload") && + let Some(transactions) = execution_payload.get_mut("transactions") && + let Some(transactions) = transactions.as_array_mut() + { + // engine_getPayloadV4 - for transaction in transactions { - raw_transaction_to_hash(transaction); - } - } + for transaction in transactions { + raw_transaction_to_hash(transaction); } } } @@ -890,7 +871,7 @@ where fn handle(&mut self, msg: ProxiedHttpCombo, ctx: &mut Self::Context) -> Self::Result { let inner = self.inner.clone(); let metrics = self.metrics.clone(); - let worker_id = self.worker_id.clone(); + let worker_id = self.worker_id; let mirroring_peers = self.mirroring_peers.clone(); let mut mirroring_peer_round_robin_index = self.mirroring_peer_round_robin_index.load(Ordering::Relaxed); @@ -1000,7 +981,7 @@ where let start = UtcDateTime::now(); let inner = self.inner.clone(); - let worker_id = self.worker_id.clone(); + let worker_id = self.worker_id; let metrics = self.metrics.clone(); let mrr_req = self.new_backend_request(&cli_req.info); @@ -1016,8 +997,7 @@ where Ok(mrr_res_body) => { let size = match mrr_res_body.size() { BodySize::Sized(size) => size, // Body is always sized - BodySize::None => 0, - BodySize::Stream => 0, + BodySize::None | BodySize::Stream => 0, }; let info = ProxyHttpResponseInfo::new( cli_req.info.id, @@ -1100,11 +1080,11 @@ impl ProxyHttpRequestInfo { // append remote ip to x-forwarded-for if let Some(peer_addr) = req.connection_info().peer_addr() { let mut forwarded_for = String::new(); - if let Some(ff) = req.headers().get(header::X_FORWARDED_FOR) { - if let Ok(ff) = ff.to_str() { - forwarded_for.push_str(ff); - forwarded_for.push_str(", "); - } + if let Some(ff) = req.headers().get(header::X_FORWARDED_FOR) && + let Ok(ff) = ff.to_str() + { + forwarded_for.push_str(ff); + forwarded_for.push_str(", "); } forwarded_for.push_str(peer_addr); if let Ok(forwarded_for) = HeaderValue::from_str(&forwarded_for) { @@ -1113,21 +1093,19 @@ impl ProxyHttpRequestInfo { } // set x-forwarded-proto if it's not already set - if req.connection_info().scheme() != "" { - if None == req.headers().get(header::X_FORWARDED_PROTO) { - if let Ok(forwarded_proto) = HeaderValue::from_str(req.connection_info().scheme()) { - headers.insert(header::X_FORWARDED_PROTO, forwarded_proto); - } - } + if req.connection_info().scheme() != "" && + req.headers().get(header::X_FORWARDED_PROTO).is_none() && + let Ok(forwarded_proto) = HeaderValue::from_str(req.connection_info().scheme()) + { + headers.insert(header::X_FORWARDED_PROTO, forwarded_proto); } // set x-forwarded-host if it's not already set - if req.connection_info().scheme() != "" { - if None == req.headers().get(header::X_FORWARDED_HOST) { - if let Ok(forwarded_host) = HeaderValue::from_str(req.connection_info().scheme()) { - headers.insert(header::X_FORWARDED_HOST, forwarded_host); - } - } + if req.connection_info().scheme() != "" && + req.headers().get(header::X_FORWARDED_HOST).is_none() && + let Ok(forwarded_host) = HeaderValue::from_str(req.connection_info().scheme()) + { + headers.insert(header::X_FORWARDED_HOST, forwarded_host); } // remote address from the guard has port, and connection info has ip @@ -1148,7 +1126,7 @@ impl ProxyHttpRequestInfo { let path_and_query = match req.query_string() { "" => path.clone(), - val => format!("{}?{}", path, val), + val => format!("{path}?{val}"), }; Self { @@ -1165,12 +1143,12 @@ impl ProxyHttpRequestInfo { #[inline] pub(crate) fn id(&self) -> Uuid { - self.id.clone() + self.id } #[inline] pub(crate) fn connection_id(&self) -> Uuid { - self.connection_id.clone() + self.connection_id } #[inline] @@ -1183,12 +1161,12 @@ impl ProxyHttpRequestInfo { } #[inline] - pub fn path_and_query(&self) -> &str { + pub(crate) fn path_and_query(&self) -> &str { &self.path_and_query } #[inline] - pub fn remote_addr(&self) -> &Option { + pub(crate) fn remote_addr(&self) -> &Option { &self.remote_addr } } @@ -1209,7 +1187,7 @@ impl ProxyHttpResponseInfo { #[inline] pub(crate) fn id(&self) -> Uuid { - self.id.clone() + self.id } fn content_encoding(&self) -> String { @@ -1233,7 +1211,7 @@ where info: Option, start: UtcDateTime, - body: Box>, + body: Vec, #[pin] stream: S, @@ -1255,7 +1233,7 @@ where info: Some(info), stream: body, start: timestamp, - body: Box::new(Vec::new()), // TODO: preallocate reasonable size + body: Vec::new(), // TODO: preallocate reasonable size } } } @@ -1306,12 +1284,7 @@ where if let Some(info) = mem::take(this.info) { let proxy = this.proxy.clone(); - let req = ProxiedHttpRequest::new( - info, - mem::take(this.body), - this.start.clone(), - end, - ); + let req = ProxiedHttpRequest::new(info, mem::take(this.body), *this.start, end); proxy.postprocess_client_request(req); } @@ -1334,7 +1307,7 @@ where info: Option, start: UtcDateTime, - body: Box>, + body: Vec, #[pin] stream: S, @@ -1357,7 +1330,7 @@ where proxy, stream: body, start: timestamp, - body: Box::new(Vec::new()), // TODO: preallocate reasonable size + body: Vec::new(), // TODO: preallocate reasonable size info: Some(ProxyHttpResponseInfo::new(id, status, headers)), } } @@ -1408,12 +1381,8 @@ where if let Some(info) = mem::take(this.info) { let proxy = this.proxy.clone(); - let res = ProxiedHttpResponse::new( - info, - mem::take(this.body), - this.start.clone(), - end, - ); + let res = + ProxiedHttpResponse::new(info, mem::take(this.body), *this.start, end); proxy.postprocess_backend_response(res); } @@ -1441,14 +1410,14 @@ pub(crate) struct ProxiedHttpRequest { impl ProxiedHttpRequest { pub(crate) fn new( info: ProxyHttpRequestInfo, - body: Box>, + body: Vec, start: UtcDateTime, end: UtcDateTime, ) -> Self { let size = body.len(); Self { info, - body: Bytes::from(*body), + body: Bytes::from(body), size, decompressed_body: Bytes::new(), decompressed_size: 0, @@ -1490,14 +1459,14 @@ pub(crate) struct ProxiedHttpResponse { impl ProxiedHttpResponse { pub(crate) fn new( info: ProxyHttpResponseInfo, - body: Box>, + body: Vec, start: UtcDateTime, end: UtcDateTime, ) -> Self { let size = body.len(); Self { info, - body: Bytes::from(*body), + body: Bytes::from(body), size, decompressed_body: Bytes::new(), decompressed_size: 0, diff --git a/crates/rproxy/src/proxy_http/proxy_http_inner.rs b/crates/rproxy/src/proxy_http/inner.rs similarity index 100% rename from crates/rproxy/src/proxy_http/proxy_http_inner.rs rename to crates/rproxy/src/proxy_http/inner.rs diff --git a/crates/rproxy/src/proxy_http/proxy_http_inner_authrpc.rs b/crates/rproxy/src/proxy_http/inner_authrpc.rs similarity index 91% rename from crates/rproxy/src/proxy_http/proxy_http_inner_authrpc.rs rename to crates/rproxy/src/proxy_http/inner_authrpc.rs index 831d086..8cf9c8c 100644 --- a/crates/rproxy/src/proxy_http/proxy_http_inner_authrpc.rs +++ b/crates/rproxy/src/proxy_http/inner_authrpc.rs @@ -39,14 +39,13 @@ impl ProxyHttpInner for ProxyHttpInnerAuthrpc { fn should_mirror(jrpc_req: &JrpcRequestMeta) -> bool { let method = jrpc_req.method(); - if true && - !method.starts_with("engine_forkchoiceUpdated") && + if !method.starts_with("engine_forkchoiceUpdated") && !method.starts_with("engine_newPayload") && !method.starts_with("miner_setMaxDASize") { return false; } - return true; + true } match jrpc_req { @@ -58,7 +57,7 @@ impl ProxyHttpInner for ProxyHttpInnerAuthrpc { return true; } } - return false; + false } } } diff --git a/crates/rproxy/src/proxy_http/proxy_http_inner_rpc.rs b/crates/rproxy/src/proxy_http/inner_rpc.rs similarity index 95% rename from crates/rproxy/src/proxy_http/proxy_http_inner_rpc.rs rename to crates/rproxy/src/proxy_http/inner_rpc.rs index f5da0c8..9be9051 100644 --- a/crates/rproxy/src/proxy_http/proxy_http_inner_rpc.rs +++ b/crates/rproxy/src/proxy_http/inner_rpc.rs @@ -47,7 +47,7 @@ impl ProxyHttpInner for ProxyHttpInnerRpc { return false; } - return mirror_errored_requests || jrpc_res.error.is_none() + mirror_errored_requests || jrpc_res.error.is_none() } match jrpc_req { @@ -67,11 +67,11 @@ impl ProxyHttpInner for ProxyHttpInnerRpc { } }; - return should_mirror( + should_mirror( jrpc_req_single, &jrpc_res_single, self.config.mirror_errored_requests, - ); + ) } JrpcRequestMetaMaybeBatch::Batch(jrpc_req_batch) => { @@ -106,7 +106,7 @@ impl ProxyHttpInner for ProxyHttpInnerRpc { return true; } } - return false; + false } } } diff --git a/crates/rproxy/src/proxy_http/mod.rs b/crates/rproxy/src/proxy_http/mod.rs index 93d77b6..6cc59c9 100644 --- a/crates/rproxy/src/proxy_http/mod.rs +++ b/crates/rproxy/src/proxy_http/mod.rs @@ -1,16 +1,11 @@ -mod proxy_http_inner_authrpc; -pub(crate) use proxy_http_inner_authrpc::ProxyHttpInnerAuthrpc; +mod inner_authrpc; +pub(crate) use inner_authrpc::ProxyHttpInnerAuthrpc; -mod proxy_http_inner_rpc; -pub(crate) use proxy_http_inner_rpc::ProxyHttpInnerRpc; +mod inner_rpc; +pub(crate) use inner_rpc::ProxyHttpInnerRpc; -mod proxy_http; -pub(crate) use proxy_http::{ - ProxiedHttpRequest, - ProxiedHttpResponse, - ProxyHttp, - ProxyHttpRequestInfo, -}; +mod http; +pub(crate) use http::{ProxiedHttpRequest, ProxiedHttpResponse, ProxyHttp, ProxyHttpRequestInfo}; -mod proxy_http_inner; -pub(crate) use proxy_http_inner::ProxyHttpInner; +mod inner; +pub(crate) use inner::ProxyHttpInner; diff --git a/crates/rproxy/src/proxy_ws/proxy_ws_flashblocks.rs b/crates/rproxy/src/proxy_ws/flashblocks.rs similarity index 100% rename from crates/rproxy/src/proxy_ws/proxy_ws_flashblocks.rs rename to crates/rproxy/src/proxy_ws/flashblocks.rs diff --git a/crates/rproxy/src/proxy_ws/proxy_ws_inner.rs b/crates/rproxy/src/proxy_ws/inner.rs similarity index 100% rename from crates/rproxy/src/proxy_ws/proxy_ws_inner.rs rename to crates/rproxy/src/proxy_ws/inner.rs diff --git a/crates/rproxy/src/proxy_ws/mod.rs b/crates/rproxy/src/proxy_ws/mod.rs index 770cf87..3abc203 100644 --- a/crates/rproxy/src/proxy_ws/mod.rs +++ b/crates/rproxy/src/proxy_ws/mod.rs @@ -1,8 +1,8 @@ -mod proxy_ws; +mod ws; -mod proxy_ws_flashblocks; +mod flashblocks; -mod proxy_ws_inner; -pub(crate) use proxy_ws::ProxyWs; -pub(crate) use proxy_ws_flashblocks::ProxyWsInnerFlashblocks; -pub(crate) use proxy_ws_inner::ProxyWsInner; +mod inner; +pub(crate) use flashblocks::ProxyWsInnerFlashblocks; +pub(crate) use inner::ProxyWsInner; +pub(crate) use ws::ProxyWs; diff --git a/crates/rproxy/src/proxy_ws/proxy_ws.rs b/crates/rproxy/src/proxy_ws/ws.rs similarity index 90% rename from crates/rproxy/src/proxy_ws/proxy_ws.rs rename to crates/rproxy/src/proxy_ws/ws.rs index 72fdfb8..cf0c701 100644 --- a/crates/rproxy/src/proxy_ws/proxy_ws.rs +++ b/crates/rproxy/src/proxy_ws/ws.rs @@ -46,13 +46,13 @@ use crate::{ const WS_PING_INTERVAL_SECONDS: u64 = 1; -const WS_CLI_ERROR: &'static str = "client error"; -const WS_BCK_ERROR: &'static str = "backend error"; -const WS_BCK_TIMEOUT: &'static str = "backend error"; -const WS_CLOSE_OK: &'static str = ""; +const WS_CLI_ERROR: &str = "client error"; +const WS_BCK_ERROR: &str = "backend error"; +const WS_BCK_TIMEOUT: &str = "backend error"; +const WS_CLOSE_OK: &str = ""; -const WS_LABEL_BACKEND: &'static str = "backend"; -const WS_LABEL_CLIENT: &'static str = "client"; +const WS_LABEL_BACKEND: &str = "backend"; +const WS_LABEL_CLIENT: &str = "client"; // ProxyWs ------------------------------------------------------------- @@ -90,7 +90,7 @@ where let config = shared.config(); - let backend = ProxyWsBackendEndpoint::new(id.clone(), config.backend_url()); + let backend = ProxyWsBackendEndpoint::new(id, config.backend_url()); let postprocessor = ProxyWsPostprocessor:: { inner: shared.inner.clone(), @@ -125,7 +125,7 @@ where canceller: tokio_util::sync::CancellationToken, resetter: broadcast::Sender<()>, ) -> Result<(), Box> { - let listen_address = config.listen_address().clone(); + let listen_address = config.listen_address(); let listener = match Self::listen(&config) { Ok(listener) => listener, @@ -190,7 +190,7 @@ where let handler = proxy.handle(); let mut resetter = resetter.subscribe(); tokio::spawn(async move { - if let Ok(_) = resetter.recv().await { + if resetter.recv().await.is_ok() { info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); handler.stop(true).await; } @@ -228,6 +228,7 @@ where Ok(socket.into()) } + #[expect(clippy::unused_async, reason = "required by the actix framework")] async fn receive( cli_req: HttpRequest, cli_req_body: web::Payload, @@ -582,7 +583,7 @@ where start: timestamp, end: UtcDateTime::now(), }); - return Ok(()); + Ok(()) } // text @@ -619,7 +620,7 @@ where start: timestamp, end: UtcDateTime::now(), }); - return Ok(()); + Ok(()) } // ping @@ -634,31 +635,28 @@ where ); return Err(WS_CLI_ERROR); } - return Ok(()); + Ok(()) } // pong actix_ws::Message::Pong(bytes) => { - if let Some(pong) = ProxyWsPing::from_bytes(bytes) { - if let Some((_, ping)) = this.pings.remove_sync(&pong.id) { - if pong == ping { - this.ping_balance_cli.dec(); - this.shared - .metrics - .ws_latency_client - .get_or_create(&LabelsProxyWs { - proxy: P::name(), - destination: WS_LABEL_BACKEND, - }) - .record( - (1000000.0 * - (timestamp - pong.timestamp).as_seconds_f64() / - 2.0) - as i64, - ); - return Ok(()); - } - } + if let Some(pong) = ProxyWsPing::from_bytes(bytes) && + let Some((_, ping)) = this.pings.remove_sync(&pong.id) && + pong == ping + { + this.ping_balance_cli.dec(); + this.shared + .metrics + .ws_latency_client + .get_or_create(&LabelsProxyWs { + proxy: P::name(), + destination: WS_LABEL_BACKEND, + }) + .record( + (1000000.0 * (timestamp - pong.timestamp).as_seconds_f64() / + 2.0) as i64, + ); + return Ok(()); } warn!( proxy = P::name(), @@ -666,7 +664,7 @@ where worker_id = %this.id, "Unexpected websocket pong received from client", ); - return Ok(()); + Ok(()) } // close @@ -691,12 +689,10 @@ where ); return Err(WS_BCK_ERROR); } - return Err(WS_CLOSE_OK); + Err(WS_CLOSE_OK) } - _ => { - return Ok(()); - } + _ => Ok(()), } } @@ -708,7 +704,7 @@ where error = ?err, "Client websocket stream error" ); - return Err(WS_CLI_ERROR); + Err(WS_CLI_ERROR) } None => { @@ -718,7 +714,7 @@ where worker_id = %this.id, "Client had closed websocket stream" ); - return Err(WS_CLOSE_OK); + Err(WS_CLOSE_OK) } } } @@ -760,7 +756,7 @@ where start: timestamp, end: UtcDateTime::now(), }); - return Ok(()); + Ok(()) } // text @@ -789,7 +785,7 @@ where start: timestamp, end: UtcDateTime::now(), }); - return Ok(()); + Ok(()) } // ping @@ -804,31 +800,28 @@ where ); return Err(WS_BCK_ERROR); } - return Ok(()); + Ok(()) } // pong tungstenite::Message::Pong(bytes) => { - if let Some(pong) = ProxyWsPing::from_bytes(bytes) { - if let Some((_, ping)) = this.pings.remove_sync(&pong.id) { - if pong == ping { - this.ping_balance_bck.dec(); - this.shared - .metrics - .ws_latency_backend - .get_or_create(&LabelsProxyWs { - proxy: P::name(), - destination: WS_LABEL_BACKEND, - }) - .record( - (1000000.0 * - (timestamp - pong.timestamp).as_seconds_f64() / - 2.0) - as i64, - ); - return Ok(()); - } - } + if let Some(pong) = ProxyWsPing::from_bytes(bytes) && + let Some((_, ping)) = this.pings.remove_sync(&pong.id) && + pong == ping + { + this.ping_balance_bck.dec(); + this.shared + .metrics + .ws_latency_backend + .get_or_create(&LabelsProxyWs { + proxy: P::name(), + destination: WS_LABEL_BACKEND, + }) + .record( + (1000000.0 * (timestamp - pong.timestamp).as_seconds_f64() / + 2.0) as i64, + ); + return Ok(()); } warn!( proxy = P::name(), @@ -836,7 +829,7 @@ where worker_id = %this.id, "Unexpected websocket pong received from backend", ); - return Ok(()); + Ok(()) } // close @@ -858,12 +851,10 @@ where ); return Err(WS_CLI_ERROR); } - return Err(WS_CLOSE_OK); + Err(WS_CLOSE_OK) } - _ => { - return Ok(()); - } + _ => Ok(()), } } @@ -875,7 +866,7 @@ where error = ?err, "Backend websocket stream error" ); - return Err(WS_BCK_ERROR); + Err(WS_BCK_ERROR) } None => { @@ -885,7 +876,7 @@ where worker_id = %this.id, "Backend had closed websocket stream" ); - return Err(WS_CLOSE_OK); + Err(WS_CLOSE_OK) } } } @@ -909,7 +900,7 @@ where let json_msg = if config.log_backend_messages() { Loggable(&Self::maybe_sanitise( config.log_sanitise(), - serde_json::from_slice(&msg).unwrap_or_default(), + serde_json::from_slice(msg).unwrap_or_default(), )) } else { Loggable(&serde_json::Value::Null) @@ -931,7 +922,7 @@ where let json_msg = if config.log_backend_messages() { Loggable(&Self::maybe_sanitise( config.log_sanitise(), - serde_json::from_str(&msg).unwrap_or_default(), + serde_json::from_str(msg).unwrap_or_default(), )) } else { Loggable(&serde_json::Value::Null) @@ -953,7 +944,7 @@ where let json_msg = if config.log_client_messages() { Loggable(&Self::maybe_sanitise( config.log_sanitise(), - serde_json::from_slice(&msg).unwrap_or_default(), + serde_json::from_slice(msg).unwrap_or_default(), )) } else { Loggable(&serde_json::Value::Null) @@ -975,7 +966,7 @@ where let json_msg = if config.log_client_messages() { Loggable(&Self::maybe_sanitise( config.log_sanitise(), - serde_json::from_str(&msg).unwrap_or_default(), + serde_json::from_str(msg).unwrap_or_default(), )) } else { Loggable(&serde_json::Value::Null) @@ -1000,15 +991,13 @@ where return message; } - if let Some(object) = message.as_object_mut() { - if let Some(diff) = object.get_mut("diff") { - if let Some(transactions) = diff.get_mut("transactions") { - if let Some(transactions) = transactions.as_array_mut() { - for transaction in transactions { - raw_transaction_to_hash(transaction); - } - } - } + if let Some(object) = message.as_object_mut() && + let Some(diff) = object.get_mut("diff") && + let Some(transactions) = diff.get_mut("transactions") && + let Some(transactions) = transactions.as_array_mut() + { + for transaction in transactions { + raw_transaction_to_hash(transaction); } } @@ -1193,7 +1182,7 @@ where fn handle(&mut self, msg: ProxyWsMessage, ctx: &mut Self::Context) -> Self::Result { let inner = self.inner.clone(); let metrics = self.metrics.clone(); - let worker_id = self.worker_id.clone(); + let worker_id = self.worker_id; ctx.spawn( async move { @@ -1267,9 +1256,8 @@ impl ProxyWsPing { let id = Uuid::from_u128(bytes.get_u128()); let connection_id = Uuid::from_u128(bytes.get_u128()); - let timestamp = match UtcDateTime::from_unix_timestamp_nanos(bytes.get_i128()) { - Ok(timestamp) => timestamp, - Err(_) => return None, + let Ok(timestamp) = UtcDateTime::from_unix_timestamp_nanos(bytes.get_i128()) else { + return None; }; Some(Self { id, connection_id, timestamp }) diff --git a/crates/rproxy/src/server/mod.rs b/crates/rproxy/src/server/mod.rs index dc1344f..7315cf7 100644 --- a/crates/rproxy/src/server/mod.rs +++ b/crates/rproxy/src/server/mod.rs @@ -1,2 +1,235 @@ -mod server; -pub use server::Server; +use std::{error::Error, sync::Arc}; + +use tokio::{ + signal::unix::{SignalKind, signal}, + sync::broadcast, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info}; + +use crate::{ + circuit_breaker::CircuitBreaker, + config::{Config, ConfigAuthrpc, ConfigFlashblocks, ConfigRpc}, + metrics::Metrics, + proxy::ProxyInner, + proxy_http::{ProxyHttp, ProxyHttpInnerAuthrpc, ProxyHttpInnerRpc}, + proxy_ws::{ProxyWs, ProxyWsInnerFlashblocks}, + utils::tls_certificate_validity_timestamps, +}; + +// Proxy --------------------------------------------------------------- + +pub struct Server {} + +impl Server { + pub async fn run(config: Config) -> Result<(), Box> { + let canceller = Server::wait_for_shutdown_signal(); + let resetter = Server::wait_for_reset_signal(canceller.clone()); + + // spawn metrics service + let metrics = Arc::new(Metrics::new(config.metrics.clone())); + { + let canceller = canceller.clone(); + let metrics = metrics.clone(); + + tokio::spawn(async move { + metrics.run(canceller).await.inspect_err(|err| { + error!( + service = Metrics::name(), + error = ?err, + "Failed to start metrics service", + ); + std::process::exit(-1); + }) + }); + } + + // spawn circuit-breaker + if !config.circuit_breaker.url.is_empty() { + let canceller = canceller.clone(); + let resetter = resetter.clone(); + + let _ = std::thread::spawn(move || { + let rt = match tokio::runtime::Builder::new_current_thread().enable_all().build() { + Ok(rt) => rt, + Err(err) => { + error!(error = ?err, "Failed to initialise a single-threaded runtime for circuit-breaker"); + std::process::exit(-1); + } + }; + + let circuit_breaker = CircuitBreaker::new(config.circuit_breaker.clone()); + + tokio::task::LocalSet::new() + .block_on(&rt, async move { circuit_breaker.run(canceller, resetter).await }) + }); + } + + while !canceller.is_cancelled() { + if config.tls.enabled() { + let metrics = metrics.clone(); + let (not_before, not_after) = + tls_certificate_validity_timestamps(config.tls.certificate()); + metrics.tls_certificate_valid_not_before.set(not_before); + metrics.tls_certificate_valid_not_after.set(not_after); + } + + let mut services: Vec>>> = Vec::new(); + + // spawn authrpc proxy + if config.authrpc.enabled { + let tls = config.tls.clone(); + let config = config.authrpc.clone(); + let metrics = metrics.clone(); + let canceller = canceller.clone(); + let resetter = resetter.clone(); + + services.push(tokio::spawn(async move { + ProxyHttp::::run( + config, + tls, + metrics, + canceller.clone(), + resetter, + ) + .await + .inspect_err(|err| { + error!( + proxy = ProxyHttpInnerRpc::name(), + error = ?err, + "Failed to start http-proxy, terminating...", + ); + canceller.cancel(); + }) + })); + } + + // spawn rpc proxy + if config.rpc.enabled { + let tls = config.tls.clone(); + let config = config.rpc.clone(); + let metrics = metrics.clone(); + let canceller = canceller.clone(); + let resetter = resetter.clone(); + + services.push(tokio::spawn(async move { + ProxyHttp::::run( + config, + tls, + metrics, + canceller.clone(), + resetter, + ) + .await + .inspect_err(|err| { + error!( + proxy = ProxyHttpInnerRpc::name(), + error = ?err, + "Failed to start http-proxy, terminating...", + ); + canceller.cancel(); + }) + })); + } + + // spawn flashblocks proxy + if config.flashblocks.enabled { + let tls = config.tls.clone(); + let config = config.flashblocks.clone(); + let metrics = metrics.clone(); + let canceller = canceller.clone(); + let resetter = resetter.clone(); + + services.push(tokio::spawn(async move { + ProxyWs::::run( + config, + tls, + metrics, + canceller.clone(), + resetter, + ) + .await + .inspect_err(|err| { + error!( + proxy = ProxyHttpInnerRpc::name(), + error = ?err, + "Failed to start websocket-proxy, terminating...", + ); + canceller.cancel(); + }) + })); + } + + futures::future::join_all(services).await; + } + + Ok(()) + } + + fn wait_for_shutdown_signal() -> CancellationToken { + let canceller = tokio_util::sync::CancellationToken::new(); + + { + let canceller = canceller.clone(); + + tokio::spawn(async move { + let sigint = async { + signal(SignalKind::interrupt()) + .expect("failed to install sigint handler") + .recv() + .await; + }; + + let sigterm = async { + signal(SignalKind::terminate()) + .expect("failed to install sigterm handler") + .recv() + .await; + }; + + tokio::select! { + _ = sigint => {}, + _ = sigterm => {}, + } + + info!("Shutdown signal received, stopping..."); + + canceller.cancel(); + }); + } + + canceller + } + + fn wait_for_reset_signal(canceller: CancellationToken) -> broadcast::Sender<()> { + let (resetter, _) = broadcast::channel::<()>(2); + + { + let resetter = resetter.clone(); + + tokio::spawn(async move { + let mut hangup = + signal(SignalKind::hangup()).expect("failed to install sighup handler"); + + loop { + tokio::select! { + _ = hangup.recv() => { + info!("Hangup signal received, resetting..."); + + if let Err(err) = resetter.send(()) { + error!(from = "sighup", error = ?err, "Failed to broadcast reset signal"); + } + } + + _ = canceller.cancelled() => { + return + }, + } + } + }); + } + + resetter + } +} diff --git a/crates/rproxy/src/server/server.rs b/crates/rproxy/src/server/server.rs deleted file mode 100644 index 12bf1a9..0000000 --- a/crates/rproxy/src/server/server.rs +++ /dev/null @@ -1,238 +0,0 @@ -use std::{error::Error, sync::Arc}; - -use tokio::{ - signal::unix::{SignalKind, signal}, - sync::broadcast, - task::JoinHandle, -}; -use tokio_util::sync::CancellationToken; -use tracing::{error, info}; - -use crate::{ - circuit_breaker::CircuitBreaker, - config::{Config, ConfigAuthrpc, ConfigFlashblocks, ConfigRpc}, - metrics::Metrics, - proxy::ProxyInner, - proxy_http::{ProxyHttp, ProxyHttpInnerAuthrpc, ProxyHttpInnerRpc}, - proxy_ws::{ProxyWs, ProxyWsInnerFlashblocks}, - utils::tls_certificate_validity_timestamps, -}; - -// Proxy --------------------------------------------------------------- - -pub struct Server {} - -impl Server { - pub async fn run(config: Config) -> Result<(), Box> { - let canceller = Server::wait_for_shutdown_signal(); - let resetter = Server::wait_for_reset_signal(canceller.clone()); - - // spawn metrics service - let metrics = Arc::new(Metrics::new(config.metrics.clone())); - { - let canceller = canceller.clone(); - let metrics = metrics.clone(); - - tokio::spawn(async move { - metrics.run(canceller).await.inspect_err(|err| { - error!( - service = Metrics::name(), - error = ?err, - "Failed to start metrics service", - ); - std::process::exit(-1); - }) - }); - } - - // spawn circuit-breaker - if config.circuit_breaker.url != "" { - let canceller = canceller.clone(); - let resetter = resetter.clone(); - - let _ = std::thread::spawn(move || { - let rt = match tokio::runtime::Builder::new_current_thread().enable_all().build() { - Ok(rt) => rt, - Err(err) => { - error!(error = ?err, "Failed to initialise a single-threaded runtime for circuit-breaker"); - std::process::exit(-1); - } - }; - - let circuit_breaker = CircuitBreaker::new(config.circuit_breaker.clone()); - - tokio::task::LocalSet::new() - .block_on(&rt, async move { circuit_breaker.run(canceller, resetter).await }) - }); - } - - while !canceller.is_cancelled() { - if config.tls.enabled() { - let metrics = metrics.clone(); - let (not_before, not_after) = - tls_certificate_validity_timestamps(config.tls.certificate()); - metrics.tls_certificate_valid_not_before.set(not_before); - metrics.tls_certificate_valid_not_after.set(not_after); - } - - let mut services: Vec>>> = Vec::new(); - - // spawn authrpc proxy - if config.authrpc.enabled { - let tls = config.tls.clone(); - let config = config.authrpc.clone(); - let metrics = metrics.clone(); - let canceller = canceller.clone(); - let resetter = resetter.clone(); - - services.push(tokio::spawn(async move { - let res = ProxyHttp::::run( - config, - tls, - metrics, - canceller.clone(), - resetter, - ) - .await - .inspect_err(|err| { - error!( - proxy = ProxyHttpInnerRpc::name(), - error = ?err, - "Failed to start http-proxy, terminating...", - ); - canceller.cancel(); - }); - res - })); - } - - // spawn rpc proxy - if config.rpc.enabled { - let tls = config.tls.clone(); - let config = config.rpc.clone(); - let metrics = metrics.clone(); - let canceller = canceller.clone(); - let resetter = resetter.clone(); - - services.push(tokio::spawn(async move { - let res = ProxyHttp::::run( - config, - tls, - metrics, - canceller.clone(), - resetter, - ) - .await - .inspect_err(|err| { - error!( - proxy = ProxyHttpInnerRpc::name(), - error = ?err, - "Failed to start http-proxy, terminating...", - ); - canceller.cancel(); - }); - res - })); - } - - // spawn flashblocks proxy - if config.flashblocks.enabled { - let tls = config.tls.clone(); - let config = config.flashblocks.clone(); - let metrics = metrics.clone(); - let canceller = canceller.clone(); - let resetter = resetter.clone(); - - services.push(tokio::spawn(async move { - let res = ProxyWs::::run( - config, - tls, - metrics, - canceller.clone(), - resetter, - ) - .await - .inspect_err(|err| { - error!( - proxy = ProxyHttpInnerRpc::name(), - error = ?err, - "Failed to start websocket-proxy, terminating...", - ); - canceller.cancel(); - }); - res - })); - } - - futures::future::join_all(services).await; - } - - Ok(()) - } - - fn wait_for_shutdown_signal() -> CancellationToken { - let canceller = tokio_util::sync::CancellationToken::new(); - - { - let canceller = canceller.clone(); - - tokio::spawn(async move { - let sigint = async { - signal(SignalKind::interrupt()) - .expect("failed to install sigint handler") - .recv() - .await; - }; - - let sigterm = async { - signal(SignalKind::terminate()) - .expect("failed to install sigterm handler") - .recv() - .await; - }; - - tokio::select! { - _ = sigint => {}, - _ = sigterm => {}, - } - - info!("Shutdown signal received, stopping..."); - - canceller.cancel(); - }); - } - - canceller - } - - fn wait_for_reset_signal(canceller: CancellationToken) -> broadcast::Sender<()> { - let (resetter, _) = broadcast::channel::<()>(2); - - { - let resetter = resetter.clone(); - - tokio::spawn(async move { - let mut hangup = - signal(SignalKind::hangup()).expect("failed to install sighup handler"); - - loop { - tokio::select! { - _ = hangup.recv() => { - info!("Hangup signal received, resetting..."); - - if let Err(err) = resetter.send(()) { - error!(from = "sighup", error = ?err, "Failed to broadcast reset signal"); - } - } - - _ = canceller.cancelled() => { - return - }, - } - } - }); - } - - resetter - } -} diff --git a/crates/rproxy/src/utils/utils_compression.rs b/crates/rproxy/src/utils/utils_compression.rs index 3ea8be6..6a21151 100644 --- a/crates/rproxy/src/utils/utils_compression.rs +++ b/crates/rproxy/src/utils/utils_compression.rs @@ -4,7 +4,7 @@ use bytes::Bytes; // decompress ---------------------------------------------------------- -pub fn decompress(body: Bytes, size: usize, content_encoding: String) -> (Bytes, usize) { +pub(crate) fn decompress(body: Bytes, size: usize, content_encoding: String) -> (Bytes, usize) { match content_encoding.to_ascii_lowercase().as_str() { "br" => { let mut decoder = brotli::Decompressor::new(std::io::Cursor::new(body.clone()), 4096); @@ -48,5 +48,5 @@ pub fn decompress(body: Bytes, size: usize, content_encoding: String) -> (Bytes, _ => {} } - return (body.clone(), size); + (body.clone(), size) } diff --git a/crates/rproxy/src/utils/utils_http.rs b/crates/rproxy/src/utils/utils_http.rs index 7c6a3ed..746c289 100644 --- a/crates/rproxy/src/utils/utils_http.rs +++ b/crates/rproxy/src/utils/utils_http.rs @@ -1,5 +1,5 @@ // is_hop_by_hop_header ------------------------------------------------ -pub fn is_hop_by_hop_header(name: &actix_web::http::header::HeaderName) -> bool { +pub(crate) fn is_hop_by_hop_header(name: &actix_web::http::header::HeaderName) -> bool { matches!(name.as_str().to_ascii_lowercase().as_str(), "connection" | "host" | "keep-alive") } diff --git a/crates/rproxy/src/utils/utils_loggable.rs b/crates/rproxy/src/utils/utils_loggable.rs index 65c0931..cb4874d 100644 --- a/crates/rproxy/src/utils/utils_loggable.rs +++ b/crates/rproxy/src/utils/utils_loggable.rs @@ -28,11 +28,10 @@ impl valuable::Valuable for Loggable<'_> { fn visit(&self, visitor: &mut dyn valuable::Visit) { match &self.0 { - serde_json::Value::Null => visitor.visit_value(self.as_value()), - serde_json::Value::Bool(_) => visitor.visit_value(self.as_value()), - serde_json::Value::String(_) => visitor.visit_value(self.as_value()), + serde_json::Value::Null | + serde_json::Value::Bool(_) | + serde_json::Value::String(_) | serde_json::Value::Number(_) => visitor.visit_value(self.as_value()), - serde_json::Value::Array(list) => { for val in list { visitor.visit_value(Self(val).as_value()); @@ -53,7 +52,7 @@ impl valuable::Listable for Loggable<'_> { if let serde_json::Value::Array(arr) = &self.0 { return (arr.len(), Some(arr.len())); } - return (0, Some(0)); + (0, Some(0)) } } @@ -62,6 +61,6 @@ impl valuable::Mappable for Loggable<'_> { if let serde_json::Value::Object(obj) = &self.0 { return (obj.len(), Some(obj.len())); } - return (0, Some(0)); + (0, Some(0)) } } diff --git a/crates/rproxy/src/utils/utils_op_stack.rs b/crates/rproxy/src/utils/utils_op_stack.rs index 70a9ba5..8cf53f2 100644 --- a/crates/rproxy/src/utils/utils_op_stack.rs +++ b/crates/rproxy/src/utils/utils_op_stack.rs @@ -3,20 +3,17 @@ use hex::FromHex; // raw_transaction_to_hash --------------------------------------------- -pub fn raw_transaction_to_hash(transaction: &mut serde_json::Value) { - let hex = match transaction.as_str() { - Some(hex) => hex, - None => return, +pub(crate) fn raw_transaction_to_hash(transaction: &mut serde_json::Value) { + let Some(hex) = transaction.as_str() else { + return; }; let hex = hex.strip_prefix("0x").unwrap_or(hex); - let bytes = match Vec::from_hex(hex) { - Ok(bytes) => bytes, - Err(_) => return, + let Ok(bytes) = Vec::from_hex(hex) else { + return; }; let mut buf = bytes.as_slice(); - let envelope = match op_alloy_consensus::OpTxEnvelope::decode(&mut buf) { - Ok(envelope) => envelope, - Err(_) => return, + let Ok(envelope) = op_alloy_consensus::OpTxEnvelope::decode(&mut buf) else { + return; }; let hash = envelope.hash().to_string(); *transaction = serde_json::Value::String(hash);