Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions crates/rproxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ use crate::{
server::{
metrics::Metrics,
proxy::{
ProxyInner,
circuit_breaker::CircuitBreaker,
config::{ConfigAuthrpc, ConfigFlashblocks, ConfigRpc},
http::{ProxyHttp, ProxyHttpInnerAuthrpc, ProxyHttpInnerRpc},
ws::{ProxyWs, ProxyWsInnerFlashblocks},
http::{ProxyHttp, ProxyHttpInner, ProxyHttpInnerAuthrpc, ProxyHttpInnerRpc},
ws::{ProxyWs, ProxyWsInner, ProxyWsInnerFlashblocks},
},
},
utils::tls_certificate_validity_timestamps,
Expand Down Expand Up @@ -108,7 +107,7 @@ impl Server {
.await
.inspect_err(|err| {
error!(
proxy = ProxyHttpInnerRpc::name(),
proxy = ProxyHttpInnerAuthrpc::name(),
error = ?err,
"Failed to start http-proxy, terminating...",
);
Expand Down Expand Up @@ -164,7 +163,7 @@ impl Server {
.await
.inspect_err(|err| {
error!(
proxy = ProxyHttpInnerRpc::name(),
proxy = ProxyWsInnerFlashblocks::name(),
error = ?err,
"Failed to start websocket-proxy, terminating...",
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
pub(crate) mod circuit_breaker;
pub(crate) mod config;
pub(crate) mod http;
pub(crate) mod ws;

// ---------------------------------------------------------------------

use std::{
any::Any,
sync::{
Expand All @@ -19,20 +12,46 @@ use uuid::Uuid;

use crate::server::metrics::{LabelsProxy, Metrics};

// Proxy ---------------------------------------------------------------
// ProxyConnectionGuard ------------------------------------------------

pub(crate) struct ConnectionGuard {
pub id: Uuid,
pub remote_addr: Option<String>,
pub local_addr: Option<String>,

proxy: &'static str,
metrics: Arc<Metrics>,
client_connections_count: Arc<AtomicI64>,
}

impl ConnectionGuard {
fn new(
id: Uuid,
proxy: &'static str,
remote_addr: Option<String>,
local_addr: Option<String>,
metrics: Arc<Metrics>,
client_connections_count: Arc<AtomicI64>,
) -> Self {
Self {
id,
remote_addr,
local_addr,
proxy,
metrics: metrics.clone(),
client_connections_count,
}
}

pub(crate) trait Proxy<P>
where
P: ProxyInner,
{
fn on_connect(
pub(crate) fn on_connect(
proxy: &'static str,
metrics: Arc<Metrics>,
client_connections_count: Arc<AtomicI64>,
) -> impl Fn(&dyn Any, &mut Extensions) {
move |connection, extensions| {
{
let val = client_connections_count.fetch_add(1, Ordering::Relaxed) + 1;
let metric_labels = LabelsProxy { proxy: P::name() };
let metric_labels = LabelsProxy { proxy };

metrics.client_connections_active_count.get_or_create(&metric_labels).set(val);
metrics.client_connections_established_count.get_or_create(&metric_labels).inc();
Expand All @@ -54,88 +73,50 @@ where
let remote_addr = match stream.peer_addr() {
Ok(local_addr) => Some(local_addr.to_string()),
Err(err) => {
warn!(proxy = P::name(), error = ?err, "Failed to get remote address");
warn!(proxy = proxy, error = ?err, "Failed to get remote address");
None
}
};
let local_addr = match stream.local_addr() {
Ok(local_addr) => Some(local_addr.to_string()),
Err(err) => {
warn!(proxy = P::name(), error = ?err, "Failed to get remote address");
warn!(proxy = proxy, error = ?err, "Failed to get remote address");
None
}
};

debug!(
proxy = P::name(),
proxy = proxy,
connection_id = %id,
remote_addr = remote_addr.as_ref().map_or("unknown", |v| v.as_str()),
local_addr = local_addr.as_ref().map_or("unknown", |v| v.as_str()),
"Client connection open"
);

extensions.insert(ProxyConnectionGuard::new(
extensions.insert(ConnectionGuard::new(
id,
P::name(),
proxy,
remote_addr,
local_addr,
&metrics,
metrics.clone(),
client_connections_count.clone(),
));
}
}
}
}

// ProxyInner ----------------------------------------------------------

pub(crate) trait ProxyInner: 'static {
fn name() -> &'static str;
}

// ProxyConnectionGuard ------------------------------------------------

pub(crate) struct ProxyConnectionGuard {
pub id: Uuid,
pub remote_addr: Option<String>,
pub local_addr: Option<String>,

proxy_name: &'static str,
metrics: Arc<Metrics>,
client_connections_count: Arc<AtomicI64>,
}

impl ProxyConnectionGuard {
fn new(
id: Uuid,
proxy_name: &'static str,
remote_addr: Option<String>,
local_addr: Option<String>,
metrics: &Arc<Metrics>,
client_connections_count: Arc<AtomicI64>,
) -> Self {
Self {
id,
remote_addr,
local_addr,
proxy_name,
metrics: metrics.clone(),
client_connections_count,
}
}
}

impl Drop for ProxyConnectionGuard {
impl Drop for ConnectionGuard {
fn drop(&mut self) {
let val = self.client_connections_count.fetch_sub(1, Ordering::Relaxed) - 1;

let metric_labels = LabelsProxy { proxy: self.proxy_name };
let metric_labels = LabelsProxy { proxy: self.proxy };

self.metrics.client_connections_active_count.get_or_create(&metric_labels).set(val);
self.metrics.client_connections_closed_count.get_or_create(&metric_labels).inc();

debug!(
proxy = self.proxy_name,
proxy = self.proxy,
connection_id = %self.id,
remote_addr = self.remote_addr.as_ref().map_or("unknown", |v| v.as_str()),
local_addr = self.local_addr.as_ref().map_or("unknown", |v| v.as_str()),
Expand Down
5 changes: 1 addition & 4 deletions crates/rproxy/src/server/proxy/http/authrpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{
jrpc::{JrpcRequestMeta, JrpcRequestMetaMaybeBatch},
server::proxy::{
ProxyInner,
config::ConfigAuthrpc,
http::{ProxiedHttpRequest, ProxiedHttpResponse, ProxyHttpInner},
},
Expand All @@ -16,14 +15,12 @@ pub(crate) struct ProxyHttpInnerAuthrpc {
config: ConfigAuthrpc,
}

impl ProxyInner for ProxyHttpInnerAuthrpc {
impl ProxyHttpInner<ConfigAuthrpc> for ProxyHttpInnerAuthrpc {
#[inline]
fn name() -> &'static str {
PROXY_HTTP_INNER_AUTHRPC_NAME
}
}

impl ProxyHttpInner<ConfigAuthrpc> for ProxyHttpInnerAuthrpc {
fn new(config: ConfigAuthrpc) -> Self {
Self { config }
}
Expand Down
9 changes: 3 additions & 6 deletions crates/rproxy/src/server/proxy/http/inner.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
use crate::{
jrpc::JrpcRequestMetaMaybeBatch,
server::proxy::{
ProxyInner,
http::{ProxiedHttpRequest, ProxiedHttpResponse, config::ConfigProxyHttp},
},
server::proxy::http::{ProxiedHttpRequest, ProxiedHttpResponse, config::ConfigProxyHttp},
};

// ProxyHttpInner ------------------------------------------------------

pub(crate) trait ProxyHttpInner<C>:
ProxyInner + Clone + Send + Sized + Sync + 'static
pub(crate) trait ProxyHttpInner<C>: Clone + Send + Sized + Sync + 'static
where
C: ConfigProxyHttp,
{
fn name() -> &'static str;
fn new(config: C) -> Self;
fn config(&self) -> &C;

Expand Down
16 changes: 4 additions & 12 deletions crates/rproxy/src/server/proxy/http/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ use crate::{
server::{
metrics::{LabelsProxy, LabelsProxyClientInfo, LabelsProxyHttpJrpc, Metrics},
proxy::{
Proxy,
ProxyConnectionGuard,
ConnectionGuard,
config::ConfigTls,
http::{
ProxyHttpInner,
Expand Down Expand Up @@ -191,7 +190,7 @@ where
.wrap(NormalizePath::new(TrailingSlash::Trim))
.default_service(web::route().to(Self::receive))
})
.on_connect(Self::on_connect(metrics, client_connections_count))
.on_connect(ConnectionGuard::on_connect(P::name(), metrics, client_connections_count))
.shutdown_signal(canceller.cancelled_owned())
.workers(workers_count);

Expand Down Expand Up @@ -313,7 +312,7 @@ where
.inc();
}

let info = ProxyHttpRequestInfo::new(&cli_req, cli_req.conn_data::<ProxyConnectionGuard>());
let info = ProxyHttpRequestInfo::new(&cli_req, cli_req.conn_data::<ConnectionGuard>());

let id = info.id;
let connection_id = info.connection_id;
Expand Down Expand Up @@ -768,13 +767,6 @@ where
}
}

impl<C, P> Proxy<P> for ProxyHttp<C, P>
where
C: ConfigProxyHttp,
P: ProxyHttpInner<C>,
{
}

impl<C, P> Drop for ProxyHttp<C, P>
where
C: ConfigProxyHttp,
Expand Down Expand Up @@ -1069,7 +1061,7 @@ pub(crate) struct ProxyHttpRequestInfo {
}

impl ProxyHttpRequestInfo {
pub(crate) fn new(req: &HttpRequest, guard: Option<&ProxyConnectionGuard>) -> Self {
pub(crate) fn new(req: &HttpRequest, guard: Option<&ConnectionGuard>) -> Self {
// copy over only non hop-by-hop headers
let mut headers = HeaderMap::new();
for (header, value) in req.headers().iter() {
Expand Down
5 changes: 1 addition & 4 deletions crates/rproxy/src/server/proxy/http/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use tracing::warn;
use crate::{
jrpc::{JrpcError, JrpcRequestMeta, JrpcRequestMetaMaybeBatch, JrpcResponseMeta},
server::proxy::{
ProxyInner,
config::ConfigRpc,
http::{ProxiedHttpRequest, ProxiedHttpResponse, ProxyHttpInner},
},
Expand All @@ -18,14 +17,12 @@ pub(crate) struct ProxyHttpInnerRpc {
config: ConfigRpc,
}

impl ProxyInner for ProxyHttpInnerRpc {
impl ProxyHttpInner<ConfigRpc> for ProxyHttpInnerRpc {
#[inline]
fn name() -> &'static str {
PROXY_HTTP_INNER_RPC_NAME
}
}

impl ProxyHttpInner<ConfigRpc> for ProxyHttpInnerRpc {
fn new(config: ConfigRpc) -> Self {
Self { config }
}
Expand Down
7 changes: 7 additions & 0 deletions crates/rproxy/src/server/proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pub(crate) mod circuit_breaker;
pub(crate) mod config;
pub(crate) mod http;
pub(crate) mod ws;

pub(crate) mod connection_guard;
use connection_guard::ConnectionGuard;
7 changes: 3 additions & 4 deletions crates/rproxy/src/server/proxy/ws/flashblocks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::server::proxy::{ProxyInner, config::ConfigFlashblocks, ws::ProxyWsInner};
use crate::server::proxy::{config::ConfigFlashblocks, ws::ProxyWsInner};
const PROXY_WS_FLASHBLOCKS_RPC_NAME: &str = "rproxy-flashblocks";

// ProxyWsInnerFlashblocks ---------------------------------------------
Expand All @@ -8,13 +8,12 @@ pub(crate) struct ProxyWsInnerFlashblocks {
config: ConfigFlashblocks,
}

impl ProxyInner for ProxyWsInnerFlashblocks {
impl ProxyWsInner<ConfigFlashblocks> for ProxyWsInnerFlashblocks {
#[inline]
fn name() -> &'static str {
PROXY_WS_FLASHBLOCKS_RPC_NAME
}
}

impl ProxyWsInner<ConfigFlashblocks> for ProxyWsInnerFlashblocks {
fn new(config: ConfigFlashblocks) -> Self {
Self { config }
}
Expand Down
5 changes: 3 additions & 2 deletions crates/rproxy/src/server/proxy/ws/inner.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::server::proxy::{ProxyInner, ws::config::ConfigProxyWs};
use crate::server::proxy::ws::config::ConfigProxyWs;

// ProxyWsInner --------------------------------------------------------

pub(crate) trait ProxyWsInner<C>: ProxyInner + Clone + Send + Sync
pub(crate) trait ProxyWsInner<C>: Clone + Send + Sync + 'static
where
C: ConfigProxyWs,
{
fn name() -> &'static str;
fn new(config: C) -> Self;
fn config(&self) -> &C;
}
Loading