diff --git a/crates/rproxy/src/server.rs b/crates/rproxy/src/server.rs index 5417f83..5485b0b 100644 --- a/crates/rproxy/src/server.rs +++ b/crates/rproxy/src/server.rs @@ -20,11 +20,10 @@ use crate::{ server::{ metrics::Metrics, proxy::{ - ProxyInner, circuit_breaker::CircuitBreaker, config::{ConfigAuthrpc, ConfigFlashblocks, ConfigRpc}, - http::{ProxyHttp, ProxyHttpInnerAuthrpc, ProxyHttpInnerRpc}, - ws::{ProxyWs, ProxyWsInnerFlashblocks}, + http::{ProxyHttp, ProxyHttpInner, ProxyHttpInnerAuthrpc, ProxyHttpInnerRpc}, + ws::{ProxyWs, ProxyWsInner, ProxyWsInnerFlashblocks}, }, }, utils::tls_certificate_validity_timestamps, @@ -108,7 +107,7 @@ impl Server { .await .inspect_err(|err| { error!( - proxy = ProxyHttpInnerRpc::name(), + proxy = ProxyHttpInnerAuthrpc::name(), error = ?err, "Failed to start http-proxy, terminating...", ); @@ -164,7 +163,7 @@ impl Server { .await .inspect_err(|err| { error!( - proxy = ProxyHttpInnerRpc::name(), + proxy = ProxyWsInnerFlashblocks::name(), error = ?err, "Failed to start websocket-proxy, terminating...", ); diff --git a/crates/rproxy/src/server/proxy.rs b/crates/rproxy/src/server/proxy/connection_guard.rs similarity index 73% rename from crates/rproxy/src/server/proxy.rs rename to crates/rproxy/src/server/proxy/connection_guard.rs index 0fb9ae4..2e82417 100644 --- a/crates/rproxy/src/server/proxy.rs +++ b/crates/rproxy/src/server/proxy/connection_guard.rs @@ -1,10 +1,3 @@ -pub(crate) mod circuit_breaker; -pub(crate) mod config; -pub(crate) mod http; -pub(crate) mod ws; - -// --------------------------------------------------------------------- - use std::{ any::Any, sync::{ @@ -19,20 +12,46 @@ use uuid::Uuid; use crate::server::metrics::{LabelsProxy, Metrics}; -// Proxy --------------------------------------------------------------- +// ProxyConnectionGuard ------------------------------------------------ + +pub(crate) struct ConnectionGuard { + pub id: Uuid, + pub remote_addr: Option, + pub local_addr: Option, + + proxy: &'static str, + metrics: Arc, + client_connections_count: Arc, +} + +impl ConnectionGuard { + fn new( + id: Uuid, + proxy: &'static str, + remote_addr: Option, + local_addr: Option, + metrics: Arc, + client_connections_count: Arc, + ) -> Self { + Self { + id, + remote_addr, + local_addr, + proxy, + metrics: metrics.clone(), + client_connections_count, + } + } -pub(crate) trait Proxy

-where - P: ProxyInner, -{ - fn on_connect( + pub(crate) fn on_connect( + proxy: &'static str, metrics: Arc, client_connections_count: Arc, ) -> impl Fn(&dyn Any, &mut Extensions) { move |connection, extensions| { { let val = client_connections_count.fetch_add(1, Ordering::Relaxed) + 1; - let metric_labels = LabelsProxy { proxy: P::name() }; + let metric_labels = LabelsProxy { proxy }; metrics.client_connections_active_count.get_or_create(&metric_labels).set(val); metrics.client_connections_established_count.get_or_create(&metric_labels).inc(); @@ -54,32 +73,32 @@ where let remote_addr = match stream.peer_addr() { Ok(local_addr) => Some(local_addr.to_string()), Err(err) => { - warn!(proxy = P::name(), error = ?err, "Failed to get remote address"); + warn!(proxy = proxy, error = ?err, "Failed to get remote address"); None } }; let local_addr = match stream.local_addr() { Ok(local_addr) => Some(local_addr.to_string()), Err(err) => { - warn!(proxy = P::name(), error = ?err, "Failed to get remote address"); + warn!(proxy = proxy, error = ?err, "Failed to get remote address"); None } }; debug!( - proxy = P::name(), + proxy = proxy, connection_id = %id, remote_addr = remote_addr.as_ref().map_or("unknown", |v| v.as_str()), local_addr = local_addr.as_ref().map_or("unknown", |v| v.as_str()), "Client connection open" ); - extensions.insert(ProxyConnectionGuard::new( + extensions.insert(ConnectionGuard::new( id, - P::name(), + proxy, remote_addr, local_addr, - &metrics, + metrics.clone(), client_connections_count.clone(), )); } @@ -87,55 +106,17 @@ where } } -// ProxyInner ---------------------------------------------------------- - -pub(crate) trait ProxyInner: 'static { - fn name() -> &'static str; -} - -// ProxyConnectionGuard ------------------------------------------------ - -pub(crate) struct ProxyConnectionGuard { - pub id: Uuid, - pub remote_addr: Option, - pub local_addr: Option, - - proxy_name: &'static str, - metrics: Arc, - client_connections_count: Arc, -} - -impl ProxyConnectionGuard { - fn new( - id: Uuid, - proxy_name: &'static str, - remote_addr: Option, - local_addr: Option, - metrics: &Arc, - client_connections_count: Arc, - ) -> Self { - Self { - id, - remote_addr, - local_addr, - proxy_name, - metrics: metrics.clone(), - client_connections_count, - } - } -} - -impl Drop for ProxyConnectionGuard { +impl Drop for ConnectionGuard { fn drop(&mut self) { let val = self.client_connections_count.fetch_sub(1, Ordering::Relaxed) - 1; - let metric_labels = LabelsProxy { proxy: self.proxy_name }; + let metric_labels = LabelsProxy { proxy: self.proxy }; self.metrics.client_connections_active_count.get_or_create(&metric_labels).set(val); self.metrics.client_connections_closed_count.get_or_create(&metric_labels).inc(); debug!( - proxy = self.proxy_name, + proxy = self.proxy, connection_id = %self.id, remote_addr = self.remote_addr.as_ref().map_or("unknown", |v| v.as_str()), local_addr = self.local_addr.as_ref().map_or("unknown", |v| v.as_str()), diff --git a/crates/rproxy/src/server/proxy/http/authrpc.rs b/crates/rproxy/src/server/proxy/http/authrpc.rs index 6d45ff8..4cf14e7 100644 --- a/crates/rproxy/src/server/proxy/http/authrpc.rs +++ b/crates/rproxy/src/server/proxy/http/authrpc.rs @@ -1,7 +1,6 @@ use crate::{ jrpc::{JrpcRequestMeta, JrpcRequestMetaMaybeBatch}, server::proxy::{ - ProxyInner, config::ConfigAuthrpc, http::{ProxiedHttpRequest, ProxiedHttpResponse, ProxyHttpInner}, }, @@ -16,14 +15,12 @@ pub(crate) struct ProxyHttpInnerAuthrpc { config: ConfigAuthrpc, } -impl ProxyInner for ProxyHttpInnerAuthrpc { +impl ProxyHttpInner for ProxyHttpInnerAuthrpc { #[inline] fn name() -> &'static str { PROXY_HTTP_INNER_AUTHRPC_NAME } -} -impl ProxyHttpInner for ProxyHttpInnerAuthrpc { fn new(config: ConfigAuthrpc) -> Self { Self { config } } diff --git a/crates/rproxy/src/server/proxy/http/inner.rs b/crates/rproxy/src/server/proxy/http/inner.rs index dda0bc8..2447aa9 100644 --- a/crates/rproxy/src/server/proxy/http/inner.rs +++ b/crates/rproxy/src/server/proxy/http/inner.rs @@ -1,18 +1,15 @@ use crate::{ jrpc::JrpcRequestMetaMaybeBatch, - server::proxy::{ - ProxyInner, - http::{ProxiedHttpRequest, ProxiedHttpResponse, config::ConfigProxyHttp}, - }, + server::proxy::http::{ProxiedHttpRequest, ProxiedHttpResponse, config::ConfigProxyHttp}, }; // ProxyHttpInner ------------------------------------------------------ -pub(crate) trait ProxyHttpInner: - ProxyInner + Clone + Send + Sized + Sync + 'static +pub(crate) trait ProxyHttpInner: Clone + Send + Sized + Sync + 'static where C: ConfigProxyHttp, { + fn name() -> &'static str; fn new(config: C) -> Self; fn config(&self) -> &C; diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 6b51b97..2e3b6f0 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -52,8 +52,7 @@ use crate::{ server::{ metrics::{LabelsProxy, LabelsProxyClientInfo, LabelsProxyHttpJrpc, Metrics}, proxy::{ - Proxy, - ProxyConnectionGuard, + ConnectionGuard, config::ConfigTls, http::{ ProxyHttpInner, @@ -191,7 +190,7 @@ where .wrap(NormalizePath::new(TrailingSlash::Trim)) .default_service(web::route().to(Self::receive)) }) - .on_connect(Self::on_connect(metrics, client_connections_count)) + .on_connect(ConnectionGuard::on_connect(P::name(), metrics, client_connections_count)) .shutdown_signal(canceller.cancelled_owned()) .workers(workers_count); @@ -313,7 +312,7 @@ where .inc(); } - let info = ProxyHttpRequestInfo::new(&cli_req, cli_req.conn_data::()); + let info = ProxyHttpRequestInfo::new(&cli_req, cli_req.conn_data::()); let id = info.id; let connection_id = info.connection_id; @@ -768,13 +767,6 @@ where } } -impl Proxy

for ProxyHttp -where - C: ConfigProxyHttp, - P: ProxyHttpInner, -{ -} - impl Drop for ProxyHttp where C: ConfigProxyHttp, @@ -1069,7 +1061,7 @@ pub(crate) struct ProxyHttpRequestInfo { } impl ProxyHttpRequestInfo { - pub(crate) fn new(req: &HttpRequest, guard: Option<&ProxyConnectionGuard>) -> Self { + pub(crate) fn new(req: &HttpRequest, guard: Option<&ConnectionGuard>) -> Self { // copy over only non hop-by-hop headers let mut headers = HeaderMap::new(); for (header, value) in req.headers().iter() { diff --git a/crates/rproxy/src/server/proxy/http/rpc.rs b/crates/rproxy/src/server/proxy/http/rpc.rs index fb80257..b097da1 100644 --- a/crates/rproxy/src/server/proxy/http/rpc.rs +++ b/crates/rproxy/src/server/proxy/http/rpc.rs @@ -3,7 +3,6 @@ use tracing::warn; use crate::{ jrpc::{JrpcError, JrpcRequestMeta, JrpcRequestMetaMaybeBatch, JrpcResponseMeta}, server::proxy::{ - ProxyInner, config::ConfigRpc, http::{ProxiedHttpRequest, ProxiedHttpResponse, ProxyHttpInner}, }, @@ -18,14 +17,12 @@ pub(crate) struct ProxyHttpInnerRpc { config: ConfigRpc, } -impl ProxyInner for ProxyHttpInnerRpc { +impl ProxyHttpInner for ProxyHttpInnerRpc { #[inline] fn name() -> &'static str { PROXY_HTTP_INNER_RPC_NAME } -} -impl ProxyHttpInner for ProxyHttpInnerRpc { fn new(config: ConfigRpc) -> Self { Self { config } } diff --git a/crates/rproxy/src/server/proxy/mod.rs b/crates/rproxy/src/server/proxy/mod.rs new file mode 100644 index 0000000..0683f60 --- /dev/null +++ b/crates/rproxy/src/server/proxy/mod.rs @@ -0,0 +1,7 @@ +pub(crate) mod circuit_breaker; +pub(crate) mod config; +pub(crate) mod http; +pub(crate) mod ws; + +pub(crate) mod connection_guard; +use connection_guard::ConnectionGuard; diff --git a/crates/rproxy/src/server/proxy/ws/flashblocks.rs b/crates/rproxy/src/server/proxy/ws/flashblocks.rs index 6af6476..ebe93f0 100644 --- a/crates/rproxy/src/server/proxy/ws/flashblocks.rs +++ b/crates/rproxy/src/server/proxy/ws/flashblocks.rs @@ -1,4 +1,4 @@ -use crate::server::proxy::{ProxyInner, config::ConfigFlashblocks, ws::ProxyWsInner}; +use crate::server::proxy::{config::ConfigFlashblocks, ws::ProxyWsInner}; const PROXY_WS_FLASHBLOCKS_RPC_NAME: &str = "rproxy-flashblocks"; // ProxyWsInnerFlashblocks --------------------------------------------- @@ -8,13 +8,12 @@ pub(crate) struct ProxyWsInnerFlashblocks { config: ConfigFlashblocks, } -impl ProxyInner for ProxyWsInnerFlashblocks { +impl ProxyWsInner for ProxyWsInnerFlashblocks { + #[inline] fn name() -> &'static str { PROXY_WS_FLASHBLOCKS_RPC_NAME } -} -impl ProxyWsInner for ProxyWsInnerFlashblocks { fn new(config: ConfigFlashblocks) -> Self { Self { config } } diff --git a/crates/rproxy/src/server/proxy/ws/inner.rs b/crates/rproxy/src/server/proxy/ws/inner.rs index c655eea..5c3e3a7 100644 --- a/crates/rproxy/src/server/proxy/ws/inner.rs +++ b/crates/rproxy/src/server/proxy/ws/inner.rs @@ -1,11 +1,12 @@ -use crate::server::proxy::{ProxyInner, ws::config::ConfigProxyWs}; +use crate::server::proxy::ws::config::ConfigProxyWs; // ProxyWsInner -------------------------------------------------------- -pub(crate) trait ProxyWsInner: ProxyInner + Clone + Send + Sync +pub(crate) trait ProxyWsInner: Clone + Send + Sync + 'static where C: ConfigProxyWs, { + fn name() -> &'static str; fn new(config: C) -> Self; fn config(&self) -> &C; } diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index e7cb593..2dd10c4 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -40,8 +40,7 @@ use crate::{ server::{ metrics::{LabelsProxyWs, Metrics}, proxy::{ - Proxy, - ProxyConnectionGuard, + ConnectionGuard, config::ConfigTls, http::ProxyHttpRequestInfo, ws::{ProxyWsInner, config::ConfigProxyWs}, @@ -175,7 +174,7 @@ where .wrap(NormalizePath::new(TrailingSlash::Trim)) .default_service(web::route().to(Self::receive)) }) - .on_connect(Self::on_connect(metrics, client_connections_count)) + .on_connect(ConnectionGuard::on_connect(P::name(), metrics, client_connections_count)) .shutdown_signal(canceller.cancelled_owned()) .workers(workers_count); @@ -248,7 +247,7 @@ where cli_req_body: web::Payload, this: web::Data, ) -> Result { - let info = ProxyHttpRequestInfo::new(&cli_req, cli_req.conn_data::()); + let info = ProxyHttpRequestInfo::new(&cli_req, cli_req.conn_data::()); let (res, cli_tx, cli_rx) = match actix_ws::handle(&cli_req, cli_req_body) { Ok(res) => res, @@ -1097,13 +1096,6 @@ where } } -impl Proxy

for ProxyWs -where - C: ConfigProxyWs, - P: ProxyWsInner, -{ -} - // ProxyWsSharedState -------------------------------------------------- #[derive(Clone)]