From 0416399e23ba1bfdefc96c034ad9a36da117da22 Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Mon, 27 Oct 2025 20:22:17 +0100 Subject: [PATCH 1/3] feat: enable fcu deduplication --- Cargo.lock | 175 ++++++++++++- Cargo.toml | 4 +- crates/rproxy/Cargo.toml | 2 + crates/rproxy/src/jrpc/jrpc_request.rs | 25 +- .../rproxy/src/server/proxy/config/authrpc.rs | 10 + .../rproxy/src/server/proxy/http/authrpc.rs | 79 +++++- crates/rproxy/src/server/proxy/http/inner.rs | 11 + crates/rproxy/src/server/proxy/http/proxy.rs | 237 +++++++++++++++--- crates/rproxy/src/server/proxy/http/rpc.rs | 13 +- readme.md | 9 +- 10 files changed, 519 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec719f8..1cd6ce2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -299,9 +299,9 @@ dependencies = [ [[package]] name = "alloy-consensus" -version = "1.0.36" +version = "1.0.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cd9d29a6a0bb8d4832ff7685dcbb430011b832f2ccec1af9571a0e75c1f7e9c" +checksum = "b9b151e38e42f1586a01369ec52a6934702731d07e8509a7307331b09f6c46dc" dependencies = [ "alloy-eips", "alloy-primitives", @@ -360,9 +360,9 @@ dependencies = [ [[package]] name = "alloy-eips" -version = "1.0.36" +version = "1.0.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bfec530782b30151e2564edf3c900f1fa6852128b7a993e458e8e3815d8b915" +checksum = "e5434834adaf64fa20a6fb90877bc1d33214c41b055cc49f82189c98614368cc" dependencies = [ "alloy-eip2124", "alloy-eip2930", @@ -380,6 +380,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "alloy-json-rpc" +version = "1.0.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c69f6c9c68a1287c9d5ff903d0010726934de0dac10989be37b75a29190d55" +dependencies = [ + "alloy-primitives", + "alloy-sol-types", + "http 1.3.1", + "serde", + "serde_json", + "thiserror", + "tracing", +] + [[package]] name = "alloy-primitives" version = "1.4.0" @@ -431,15 +446,73 @@ dependencies = [ [[package]] name = "alloy-serde" -version = "1.0.36" +version = "1.0.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19c3835bdc128f2f3418f5d6c76aec63a245d72973e0eaacc9720aa0787225c5" +checksum = "64600fc6c312b7e0ba76f73a381059af044f4f21f43e07f51f1fa76c868fe302" dependencies = [ "alloy-primitives", "serde", "serde_json", ] +[[package]] +name = "alloy-sol-macro" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ce480400051b5217f19d6e9a82d9010cdde20f1ae9c00d53591e4a1afbb312" +dependencies = [ + "alloy-sol-macro-expander", + "alloy-sol-macro-input", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "alloy-sol-macro-expander" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d792e205ed3b72f795a8044c52877d2e6b6e9b1d13f431478121d8d4eaa9028" +dependencies = [ + "alloy-sol-macro-input", + "const-hex", + "heck", + "indexmap 2.11.4", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn 2.0.106", + "syn-solidity", + "tiny-keccak", +] + +[[package]] +name = "alloy-sol-macro-input" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bd1247a8f90b465ef3f1207627547ec16940c35597875cdc09c49d58b19693c" +dependencies = [ + "const-hex", + "dunce", + "heck", + "macro-string", + "proc-macro2", + "quote", + "syn 2.0.106", + "syn-solidity", +] + +[[package]] +name = "alloy-sol-types" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c8a9a909872097caffc05df134e5ef2253a1cdb56d3a9cf0052a042ac763f9" +dependencies = [ + "alloy-primitives", + "alloy-sol-macro", +] + [[package]] name = "alloy-trie" version = "0.9.1" @@ -458,9 +531,9 @@ dependencies = [ [[package]] name = "alloy-tx-macros" -version = "1.0.36" +version = "1.0.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc79013f9ac3a8ddeb60234d43da09e6d6abfc1c9dd29d3fe97adfbece3f4a08" +checksum = "f8e52276fdb553d3c11563afad2898f4085165e4093604afe3d78b69afbf408f" dependencies = [ "alloy-primitives", "darling", @@ -1234,6 +1307,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2250,6 +2332,17 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +[[package]] +name = "macro-string" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b27834086c65ec3f9387b096d66e99f221cf081c2b738042aa252bcd41204e3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "matchers" version = "0.2.0" @@ -2298,6 +2391,24 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "moka" +version = "0.12.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8261cd88c312e0004c1d51baad2980c66528dfdb2bee62003e643a4d8f86b077" +dependencies = [ + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "equivalent", + "parking_lot", + "portable-atomic", + "rustc_version 0.4.1", + "smallvec", + "tagptr", + "uuid", +] + [[package]] name = "no-std-net" version = "0.6.0" @@ -2639,6 +2750,12 @@ dependencies = [ "pnet_sys", ] +[[package]] +name = "portable-atomic" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" + [[package]] name = "potential_utf" version = "0.1.3" @@ -2693,6 +2810,28 @@ dependencies = [ "toml_edit", ] +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "proc-macro2" version = "1.0.101" @@ -2947,6 +3086,7 @@ dependencies = [ "actix-tls", "actix-web", "actix-ws", + "alloy-json-rpc", "alloy-primitives", "alloy-rlp", "awc", @@ -2961,6 +3101,7 @@ dependencies = [ "hex", "http 1.3.1", "humantime", + "moka", "op-alloy-consensus", "parking_lot", "pin-project", @@ -3509,6 +3650,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn-solidity" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff790eb176cc81bb8936aed0f7b9f14fc4670069a2d371b3e3b0ecce908b2cb3" +dependencies = [ + "paste", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "synstructure" version = "0.13.2" @@ -3520,6 +3673,12 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 74148d6..1cee3b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ unreachable_pub = "deny" [workspace.lints.clippy] +manual_let_else = "warn" match_same_arms = "warn" -unused_async = "warn" uninlined_format_args = "warn" -manual_let_else = "warn" +unused_async = "warn" diff --git a/crates/rproxy/Cargo.toml b/crates/rproxy/Cargo.toml index 6abb033..6aa61cf 100644 --- a/crates/rproxy/Cargo.toml +++ b/crates/rproxy/Cargo.toml @@ -17,6 +17,7 @@ actix-http = { version = "3.11.1", features = ["ws"] } actix-tls = "3.4.0" actix-web = { version = "4.11.0", features = ["rustls-0_23"] } actix-ws = "0.3.0" +alloy-json-rpc = "1.0.41" alloy-primitives = "1.3.1" alloy-rlp = "0.3.12" awc = "3.7.0" @@ -31,6 +32,7 @@ futures-core = "0.3.31" hex = "0.4.3" http = "1.3.1" humantime = "2.2.0" +moka = { version = "0.12.11", features = ["sync"] } op-alloy-consensus = "0.20.0" parking_lot = "0.12.4" pin-project = "1.1.10" diff --git a/crates/rproxy/src/jrpc/jrpc_request.rs b/crates/rproxy/src/jrpc/jrpc_request.rs index fba69a7..f1aa59d 100644 --- a/crates/rproxy/src/jrpc/jrpc_request.rs +++ b/crates/rproxy/src/jrpc/jrpc_request.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; +use alloy_json_rpc::Id; use serde::Deserialize; // JrpcRequestMeta ----------------------------------------------------- @@ -12,11 +13,20 @@ const JRPC_METHOD_FCUV3_WITH_PAYLOAD: Cow<'static, str> = Cow::Borrowed("engine_forkchoiceUpdatedV3_withPayload"); pub(crate) struct JrpcRequestMeta { + id: Id, + method: Cow<'static, str>, method_enriched: Cow<'static, str>, + + params: Vec, } impl JrpcRequestMeta { + #[inline] + pub(crate) fn id(&self) -> &Id { + &self.id + } + #[inline] pub(crate) fn method(&self) -> Cow<'static, str> { self.method.clone() @@ -26,6 +36,11 @@ impl JrpcRequestMeta { pub(crate) fn method_enriched(&self) -> Cow<'static, str> { self.method_enriched.clone() } + + #[inline] + pub(crate) fn params(&self) -> &Vec { + &self.params + } } impl<'a> Deserialize<'a> for JrpcRequestMeta { @@ -35,6 +50,7 @@ impl<'a> Deserialize<'a> for JrpcRequestMeta { { #[derive(Deserialize)] struct JrpcRequestMetaWire { + id: Id, method: Cow<'static, str>, params: Vec, } @@ -49,7 +65,12 @@ impl<'a> Deserialize<'a> for JrpcRequestMeta { } if params_count < 2 { - return Ok(Self { method: wire.method.clone(), method_enriched: wire.method.clone() }); + return Ok(Self { + id: wire.id, + method: wire.method.clone(), + method_enriched: wire.method.clone(), + params: wire.params, + }); } let method_enriched = match wire.method.as_ref() { @@ -60,7 +81,7 @@ impl<'a> Deserialize<'a> for JrpcRequestMeta { _ => wire.method.clone(), }; - Ok(Self { method: wire.method, method_enriched }) + Ok(Self { id: wire.id, method: wire.method, method_enriched, params: wire.params }) } } diff --git a/crates/rproxy/src/server/proxy/config/authrpc.rs b/crates/rproxy/src/server/proxy/config/authrpc.rs index 90accb3..054bd28 100644 --- a/crates/rproxy/src/server/proxy/config/authrpc.rs +++ b/crates/rproxy/src/server/proxy/config/authrpc.rs @@ -52,6 +52,16 @@ pub(crate) struct ConfigAuthrpc { )] pub(crate) backend_timeout: Duration, + /// whether authrpc proxy should deduplicate incoming fcus w/o payload + /// (mitigates fcu avalanche issue) + #[arg( + env = "RPROXY_AUTHRPC_DEDUPLICATE_FCUS_WO_PAYLOAD", + help_heading = "authrpc", + long("authrpc-deduplicate-fcus-wo-payload"), + name("authrpc_deduplicate_fcus_wo_payload") + )] + pub(crate) deduplicate_fcus_wo_payload: bool, + /// enable authrpc proxy #[arg( env = "RPROXY_AUTHRPC_ENABLED", diff --git a/crates/rproxy/src/server/proxy/http/authrpc.rs b/crates/rproxy/src/server/proxy/http/authrpc.rs index 4cf14e7..79168eb 100644 --- a/crates/rproxy/src/server/proxy/http/authrpc.rs +++ b/crates/rproxy/src/server/proxy/http/authrpc.rs @@ -1,3 +1,9 @@ +use std::time::Duration; + +use actix_http::header; +use actix_web::HttpResponse; +use moka::sync::Cache; + use crate::{ jrpc::{JrpcRequestMeta, JrpcRequestMetaMaybeBatch}, server::proxy::{ @@ -7,12 +13,14 @@ use crate::{ }; const PROXY_HTTP_INNER_AUTHRPC_NAME: &str = "rproxy-authrpc"; +const HASH_LEN: usize = 66; // 2 (for 0x) + 64 (for 32 bytes) // ProxyHttpInnerAuthrpc ----------------------------------------------- #[derive(Clone)] pub(crate) struct ProxyHttpInnerAuthrpc { config: ConfigAuthrpc, + fcu_cache: Cache<[u8; 3 * HASH_LEN], ()>, // head + safe + finalised } impl ProxyHttpInner for ProxyHttpInnerAuthrpc { @@ -22,13 +30,25 @@ impl ProxyHttpInner for ProxyHttpInnerAuthrpc { } fn new(config: ConfigAuthrpc) -> Self { - Self { config } + Self { + config, + fcu_cache: Cache::builder() + .time_to_live(Duration::from_mins(1)) + .max_capacity(4096) + .build(), + } } + #[inline] fn config(&self) -> &ConfigAuthrpc { &self.config } + #[inline] + fn might_intercept(&self) -> bool { + self.config.deduplicate_fcus_wo_payload + } + fn should_mirror( &self, jrpc_req: &JrpcRequestMetaMaybeBatch, @@ -60,4 +80,61 @@ impl ProxyHttpInner for ProxyHttpInnerAuthrpc { } } } + + fn should_intercept( + &self, + jrpc_req: &JrpcRequestMetaMaybeBatch, + ) -> Option> { + let JrpcRequestMetaMaybeBatch::Single(jrpc_req) = jrpc_req else { + return None; + }; + + if !jrpc_req.method_enriched().starts_with("engine_forkchoiceUpdated") || + jrpc_req.method_enriched().ends_with("withPayload") + { + return None; + } + + let params = jrpc_req.params(); + + if params.len() != 1 { + return None; + } + + let state = params[0].as_object()?; + let head = state.get("headBlockHash")?.as_str()?; + let safe = state.get("safeBlockHash")?.as_str()?; + let finalized = state.get("finalizedBlockHash")?.as_str()?; + + let mut key = [0u8; 3 * HASH_LEN]; + key[0..HASH_LEN].copy_from_slice(head.as_bytes()); + key[HASH_LEN..2 * HASH_LEN].copy_from_slice(safe.as_bytes()); + key[2 * HASH_LEN..3 * HASH_LEN].copy_from_slice(finalized.as_bytes()); + + if !self.fcu_cache.contains_key(&key) { + self.fcu_cache.insert(key, ()); + return None; + } + + let body = if jrpc_req.id().is_number() { + format!( + r#"{{"jsonrpc":"2.0","id":{},"result":{{"payloadStatus":{{"status":"VALID","latestValidHash":"{head}"}}}}}}"#, + jrpc_req.id(), + ) + } else if jrpc_req.id().is_string() { + format!( + r#"{{"jsonrpc":"2.0","id":"{}","result":{{"payloadStatus":{{"status":"VALID","latestValidHash":"{head}"}}}}}}"#, + jrpc_req.id(), + ) + } else { + format!( + r#"{{"jsonrpc":"2.0","result":{{"payloadStatus":{{"status":"VALID","latestValidHash":"{head}"}}}}}}"#, + ) + }; + + Some(Ok(HttpResponse::Ok() + .append_header((header::CACHE_STATUS, "rproxy; hit")) + .append_header((header::CONTENT_TYPE, "application/json; charset=utf-8")) + .body(body))) + } } diff --git a/crates/rproxy/src/server/proxy/http/inner.rs b/crates/rproxy/src/server/proxy/http/inner.rs index 2447aa9..084bd94 100644 --- a/crates/rproxy/src/server/proxy/http/inner.rs +++ b/crates/rproxy/src/server/proxy/http/inner.rs @@ -13,6 +13,17 @@ where fn new(config: C) -> Self; fn config(&self) -> &C; + fn might_intercept(&self) -> bool { + false + } + + fn should_intercept( + &self, + _: &JrpcRequestMetaMaybeBatch, + ) -> Option> { + None + } + fn should_mirror( &self, jrpc_req: &JrpcRequestMetaMaybeBatch, diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index b7daa14..5fa8e52 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -314,6 +314,19 @@ where let info = ProxyHttpRequestInfo::new(&clnt_req, clnt_req.conn_data::()); + if this.shared.inner.might_intercept() { + Self::send_to_backend_and_maybe_intercept(this, info, clnt_req_body, timestamp).await + } else { + Self::stream_to_backend(this, info, clnt_req_body, timestamp).await + } + } + + async fn stream_to_backend( + this: web::Data, + info: ProxyHttpRequestInfo, + clnt_req_body: web::Payload, + timestamp: UtcDateTime, + ) -> Result { let req_id = info.req_id; let conn_id = info.conn_id; @@ -327,7 +340,141 @@ where ); let bknd_res = match bknd_req.send_stream(bknd_req_body).await { - Ok(res) => res, + Ok(bknd_res) => bknd_res, + + Err(err) => { + warn!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %this.id, + backend_url = %this.backend.url, + error = ?err, + "Failed to proxy a request", + ); + this.shared + .metrics + .http_proxy_failure_count + .get_or_create(&LabelsProxy { proxy: P::name() }) + .inc(); + return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); + } + }; + + Self::stream_to_client(this, req_id, conn_id, bknd_res) + } + + async fn send_to_backend_and_maybe_intercept( + this: web::Data, + info: ProxyHttpRequestInfo, + clnt_req_body: web::Payload, + timestamp: UtcDateTime, + ) -> Result { + let req_id = info.req_id; + let conn_id = info.conn_id; + + let body = + match clnt_req_body.to_bytes_limited(this.shared.config().max_request_size()).await { + Ok(Ok(body)) => body, + + Ok(Err(err)) => { + warn!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %this.id, + backend_url = %this.backend.url, + error = ?err, + "Failed to proxy a request", + ); + this.shared + .metrics + .http_proxy_failure_count + .get_or_create(&LabelsProxy { proxy: P::name() }) + .inc(); + return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); + } + + Err(_) => { + let err = format!( + "request is too large: ?+ > {}", + this.shared.config().max_request_size(), + ); + warn!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %this.id, + backend_url = %this.backend.url, + error = ?err, + "Failed to proxy a request", + ); + this.shared + .metrics + .http_proxy_failure_count + .get_or_create(&LabelsProxy { proxy: P::name() }) + .inc(); + return Ok(HttpResponse::PayloadTooLarge().finish()); + } + }; + let size = body.len(); + + let (decompressed_body, decompressed_size) = + decompress(body.clone(), size, info.content_encoding()); + + match serde_json::from_slice::(&decompressed_body) { + Ok(jrpc) => { + if let Some(res) = this.shared.inner.should_intercept(&jrpc) { + let json_req = if this.shared.config().log_proxied_requests() { + Loggable(&Self::maybe_sanitise( + this.shared.config().log_sanitise(), + serde_json::from_slice(&decompressed_body).unwrap_or_default(), + )) + } else { + Loggable(&serde_json::Value::Null) + }; + info!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %this.id, + jrpc_method = %jrpc.method_enriched(), + remote_addr = info.remote_addr, + ts_request_received = timestamp.format(&Iso8601::DEFAULT).unwrap_or_default(), + json_request = tracing::field::valuable(&json_req), + "Intercepted a request", + ); + + return res + } + } + + Err(err) => { + warn!( + proxy = P::name(), + request_id = %req_id, + connection_id = %conn_id, + worker_id = %this.id, + error = ?err, + "Failed to parse json-rpc request", + ); + } + } + + let req = ProxiedHttpRequest { + info, + body, + size, + decompressed_body, + decompressed_size, + start: timestamp, + end: UtcDateTime::now(), + }; + + let bknd_req = this.backend.new_backend_request(&req.info); + let bknd_res = match bknd_req.send_body(req.body.clone()).await { + Ok(bknd_res) => bknd_res, + Err(err) => { warn!( proxy = P::name(), @@ -343,11 +490,27 @@ where .http_proxy_failure_count .get_or_create(&LabelsProxy { proxy: P::name() }) .inc(); + return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } }; + this.postprocess_client_request(req); + + Self::stream_to_client(this, req_id, conn_id, bknd_res) + } + + fn stream_to_client( + this: web::Data, + req_id: Uuid, + conn_id: Uuid, + bknd_res: ClientResponse, + ) -> Result + where + S: Stream> + Unpin + 'static, + { let timestamp = UtcDateTime::now(); + let status = bknd_res.status(); let mut clnt_res = Self::to_client_response(&bknd_res); @@ -500,22 +663,28 @@ where ) { let config = inner.config(); - let json_req = if config.log_proxied_requests() { - Loggable(&Self::maybe_sanitise( - config.log_sanitise(), - serde_json::from_slice(&req.decompressed_body).unwrap_or_default(), - )) + let (json_req, http_req) = if config.log_proxied_requests() { + serde_json::from_slice(&req.decompressed_body).map_or( + ( + serde_json::Value::Null, + std::str::from_utf8(&req.decompressed_body).unwrap_or_default(), + ), + |json_req| (json_req, ""), + ) } else { - Loggable(&serde_json::Value::Null) + (serde_json::Value::Null, "") }; - let json_res = if config.log_proxied_responses() { - Loggable(&Self::maybe_sanitise( - config.log_sanitise(), - serde_json::from_slice(&res.decompressed_body).unwrap_or_default(), - )) + let (json_res, http_res) = if config.log_proxied_responses() { + serde_json::from_slice(&res.decompressed_body).map_or( + ( + serde_json::Value::Null, + std::str::from_utf8(&res.decompressed_body).unwrap_or_default(), + ), + |json_res| (json_res, ""), + ) } else { - Loggable(&serde_json::Value::Null) + (serde_json::Value::Null, "") }; info!( @@ -529,8 +698,10 @@ where ts_request_received = req.start().format(&Iso8601::DEFAULT).unwrap_or_default(), latency_backend = (res.start() - req.end()).as_seconds_f64(), latency_total = (res.end() - req.start()).as_seconds_f64(), - json_request = tracing::field::valuable(&json_req), - json_response = tracing::field::valuable(&json_res), + json_request = tracing::field::valuable(&Loggable(&Self::maybe_sanitise(config.log_sanitise(), json_req))), + json_response = tracing::field::valuable(&Loggable(&Self::maybe_sanitise(config.log_sanitise(), json_res))), + http_request = http_req, + http_response = http_res, "Proxied request" ); } @@ -541,22 +712,28 @@ where worker_id: Uuid, config: &C, ) { - let json_req = if config.log_mirrored_requests() { - Loggable(&Self::maybe_sanitise( - config.log_sanitise(), - serde_json::from_slice(&req.decompressed_body).unwrap_or_default(), - )) + let (json_req, http_req) = if config.log_mirrored_requests() { + serde_json::from_slice(&req.decompressed_body).map_or( + ( + serde_json::Value::Null, + std::str::from_utf8(&req.decompressed_body).unwrap_or_default(), + ), + |json_req| (json_req, ""), + ) } else { - Loggable(&serde_json::Value::Null) + (serde_json::Value::Null, "") }; - let json_res = if config.log_mirrored_responses() { - Loggable(&Self::maybe_sanitise( - config.log_sanitise(), - serde_json::from_slice(&res.decompressed_body).unwrap_or_default(), - )) + let (json_res, http_res) = if config.log_mirrored_responses() { + serde_json::from_slice(&res.decompressed_body).map_or( + ( + serde_json::Value::Null, + std::str::from_utf8(&res.decompressed_body).unwrap_or_default(), + ), + |json_res| (json_res, ""), + ) } else { - Loggable(&serde_json::Value::Null) + (serde_json::Value::Null, "") }; info!( @@ -570,8 +747,10 @@ where ts_request_received = req.start().format(&Iso8601::DEFAULT).unwrap_or_default(), latency_backend = (res.start() - req.end()).as_seconds_f64(), latency_total = (res.end() - req.start()).as_seconds_f64(), - json_request = tracing::field::valuable(&json_req), - json_response = tracing::field::valuable(&json_res), + json_request = tracing::field::valuable(&Loggable(&Self::maybe_sanitise(config.log_sanitise(), json_req))), + json_response = tracing::field::valuable(&Loggable(&Self::maybe_sanitise(config.log_sanitise(), json_res))), + http_request = http_req, + http_response = http_res, "Mirrored request" ); } diff --git a/crates/rproxy/src/server/proxy/http/rpc.rs b/crates/rproxy/src/server/proxy/http/rpc.rs index b097da1..3b1966f 100644 --- a/crates/rproxy/src/server/proxy/http/rpc.rs +++ b/crates/rproxy/src/server/proxy/http/rpc.rs @@ -27,6 +27,7 @@ impl ProxyHttpInner for ProxyHttpInnerRpc { Self { config } } + #[inline] fn config(&self) -> &ConfigRpc { &self.config } @@ -34,7 +35,7 @@ impl ProxyHttpInner for ProxyHttpInnerRpc { fn should_mirror( &self, jrpc_req: &JrpcRequestMetaMaybeBatch, - _: &ProxiedHttpRequest, + http_req: &ProxiedHttpRequest, http_res: &ProxiedHttpResponse, ) -> bool { fn should_mirror( @@ -79,7 +80,13 @@ impl ProxyHttpInner for ProxyHttpInnerRpc { ) { Ok(jrpc_response) => jrpc_response, Err(err) => { - warn!(proxy = Self::name(), error = ?err, "Failed to parse json-rpc response"); + warn!( + proxy = Self::name(), + request_id = %http_req.info().id(), + connection_id = %http_req.info().conn_id(), + error = ?err, + "Failed to parse json-rpc response", + ); vec![JrpcResponseMeta { error: Some(JrpcError {}) }; jrpc_req_batch.len()] } }; @@ -87,6 +94,8 @@ impl ProxyHttpInner for ProxyHttpInnerRpc { if jrpc_res_batch.len() != jrpc_req_batch.len() { warn!( proxy = Self::name(), + request_id = %http_req.info().id(), + connection_id = %http_req.info().conn_id(), "A response to jrpc-batch has mismatching count of objects (want: {}, got: {})", jrpc_req_batch.len(), jrpc_res_batch.len(), diff --git a/readme.md b/readme.md index 35eb004..6dfe36b 100644 --- a/readme.md +++ b/readme.md @@ -67,6 +67,12 @@ authrpc: [env: RPROXY_AUTHRPC_BACKEND_TIMEOUT=] [default: 30s] + --authrpc-deduplicate-fcus-wo-payload + whether authrpc proxy should deduplicate incoming fcus w/o payload (mitigates + fcu avalanche issue) + + [env: RPROXY_AUTHRPC_DEDUPLICATE_FCUS_WO_PAYLOAD=] + --authrpc-enabled enable authrpc proxy @@ -214,8 +220,7 @@ flashblocks: [env: RPROXY_FLASHBLOCKS_LOG_BACKEND_MESSAGES=] --flashblocks-log-client-messages - whether to log flashblocks backend messages whether to log flashblocks client - messages + whether to log flashblocks client messages [env: RPROXY_FLASHBLOCKS_LOG_CLIENT_MESSAGES=] From d377d7e2ae9700efa87d60644bc3ad864b9fc185 Mon Sep 17 00:00:00 2001 From: Anton Date: Tue, 28 Oct 2025 09:03:41 +0100 Subject: [PATCH 2/3] chore: use more stable api Co-authored-by: Solar Mithril --- crates/rproxy/src/server/proxy/http/authrpc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rproxy/src/server/proxy/http/authrpc.rs b/crates/rproxy/src/server/proxy/http/authrpc.rs index 79168eb..a5f4cc9 100644 --- a/crates/rproxy/src/server/proxy/http/authrpc.rs +++ b/crates/rproxy/src/server/proxy/http/authrpc.rs @@ -33,7 +33,7 @@ impl ProxyHttpInner for ProxyHttpInnerAuthrpc { Self { config, fcu_cache: Cache::builder() - .time_to_live(Duration::from_mins(1)) + .time_to_live(Duration::from_secs(60)) .max_capacity(4096) .build(), } From 3d61b9148cc196be941d584cdd24e8badc8daf18 Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Tue, 28 Oct 2025 13:24:52 +0100 Subject: [PATCH 3/3] review: change hasher, use tuple --- Cargo.lock | 1 + crates/rproxy/Cargo.toml | 1 + crates/rproxy/src/server/proxy/http/authrpc.rs | 15 +++++++++------ 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1cd6ce2..f529807 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3107,6 +3107,7 @@ dependencies = [ "pin-project", "pnet", "prometheus-client", + "rustc-hash", "rustls", "rustls-pemfile", "scc", diff --git a/crates/rproxy/Cargo.toml b/crates/rproxy/Cargo.toml index 6aa61cf..97e1ffa 100644 --- a/crates/rproxy/Cargo.toml +++ b/crates/rproxy/Cargo.toml @@ -38,6 +38,7 @@ parking_lot = "0.12.4" pin-project = "1.1.10" pnet = "0.35.0" prometheus-client = { git = "https://github.com/0x416e746f6e/client_rust.git", branch = "nested-labels"} +rustc-hash = "2.1.1" rustls = "0.23.32" rustls-pemfile = "2.2.0" scc = "3.0.2" diff --git a/crates/rproxy/src/server/proxy/http/authrpc.rs b/crates/rproxy/src/server/proxy/http/authrpc.rs index a5f4cc9..80edb63 100644 --- a/crates/rproxy/src/server/proxy/http/authrpc.rs +++ b/crates/rproxy/src/server/proxy/http/authrpc.rs @@ -3,6 +3,7 @@ use std::time::Duration; use actix_http::header; use actix_web::HttpResponse; use moka::sync::Cache; +use rustc_hash::FxBuildHasher; use crate::{ jrpc::{JrpcRequestMeta, JrpcRequestMetaMaybeBatch}, @@ -17,10 +18,12 @@ const HASH_LEN: usize = 66; // 2 (for 0x) + 64 (for 32 bytes) // ProxyHttpInnerAuthrpc ----------------------------------------------- +type Hash = [u8; HASH_LEN]; + #[derive(Clone)] pub(crate) struct ProxyHttpInnerAuthrpc { config: ConfigAuthrpc, - fcu_cache: Cache<[u8; 3 * HASH_LEN], ()>, // head + safe + finalised + fcu_cache: Cache<(Hash, Hash, Hash), (), FxBuildHasher>, } impl ProxyHttpInner for ProxyHttpInnerAuthrpc { @@ -35,7 +38,7 @@ impl ProxyHttpInner for ProxyHttpInnerAuthrpc { fcu_cache: Cache::builder() .time_to_live(Duration::from_secs(60)) .max_capacity(4096) - .build(), + .build_with_hasher(FxBuildHasher), } } @@ -106,10 +109,10 @@ impl ProxyHttpInner for ProxyHttpInnerAuthrpc { let safe = state.get("safeBlockHash")?.as_str()?; let finalized = state.get("finalizedBlockHash")?.as_str()?; - let mut key = [0u8; 3 * HASH_LEN]; - key[0..HASH_LEN].copy_from_slice(head.as_bytes()); - key[HASH_LEN..2 * HASH_LEN].copy_from_slice(safe.as_bytes()); - key[2 * HASH_LEN..3 * HASH_LEN].copy_from_slice(finalized.as_bytes()); + let mut key = ([0; HASH_LEN], [0; HASH_LEN], [0; HASH_LEN]); + key.0.copy_from_slice(head.as_bytes()); + key.1.copy_from_slice(safe.as_bytes()); + key.2.copy_from_slice(finalized.as_bytes()); if !self.fcu_cache.contains_key(&key) { self.fcu_cache.insert(key, ());