From d1b00ee44eeb9c0753caaf0b2693a09db0ec4592 Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Thu, 27 Nov 2025 15:01:38 +0100 Subject: [PATCH 1/2] feat: add chaos-testing to ws --- Cargo.lock | 1 + build.sh | 2 + crates/rproxy/Cargo.toml | 6 + .../src/server/proxy/config/flashblocks.rs | 113 +- .../src/server/proxy/connection_guard.rs | 1 - crates/rproxy/src/server/proxy/ws/config.rs | 9 + crates/rproxy/src/server/proxy/ws/proxy.rs | 1464 +++++++++-------- readme.md | 28 + 8 files changed, 945 insertions(+), 679 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d0897bd..4f26c6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3355,6 +3355,7 @@ dependencies = [ "pin-project", "pnet", "prometheus-client", + "rand 0.9.2", "rustc-hash", "rustls", "rustls-pemfile", diff --git a/build.sh b/build.sh index 977ed59..6c3b950 100755 --- a/build.sh +++ b/build.sh @@ -7,6 +7,7 @@ export LC_ALL=C export SOURCE_DATE_EPOCH=$(git log -1 --pretty=%ct) export TZ=UTC +FEATURES=${FEATURES:-default} TARGET=${TARGET:-$( rustc --print host-tuple )} export RUSTFLAGS="\ @@ -26,6 +27,7 @@ export CARGO_TARGET_$( echo "${TARGET}" | tr '[:lower:]' '[:upper:]' | tr '-' '_ " cargo build --package rproxy \ + --features ${FEATURES} \ --locked \ --release \ --target ${TARGET} diff --git a/crates/rproxy/Cargo.toml b/crates/rproxy/Cargo.toml index 07aa7c0..8a6a4dc 100644 --- a/crates/rproxy/Cargo.toml +++ b/crates/rproxy/Cargo.toml @@ -11,6 +11,11 @@ path = "src/bin/main.rs" [lints] workspace = true +[features] +default = [] + +chaos = ["rand"] + [dependencies] actix = "0.13.5" actix-http = { version = "3.11.1", features = ["ws"] } @@ -41,6 +46,7 @@ parking_lot = "0.12.4" pin-project = "1.1.10" pnet = "0.35.0" prometheus-client = { git = "https://github.com/0x416e746f6e/client_rust.git", branch = "nested-labels"} +rand = { version = "0.9.2", optional = true } rustc-hash = "2.1.1" rustls = "0.23.32" rustls-pemfile = "2.2.0" diff --git a/crates/rproxy/src/server/proxy/config/flashblocks.rs b/crates/rproxy/src/server/proxy/config/flashblocks.rs index dece62f..54fd650 100644 --- a/crates/rproxy/src/server/proxy/config/flashblocks.rs +++ b/crates/rproxy/src/server/proxy/config/flashblocks.rs @@ -78,9 +78,48 @@ pub(crate) struct ConfigFlashblocks { help_heading = "flashblocks", env = "RPROXY_FLASHBLOCKS_LOG_SANITISE", long("flashblocks-log-sanitise"), - name("flashblocks-log_sanitise") + name("flashblocks_log_sanitise") )] pub(crate) log_sanitise: bool, + + /// the chance (between 0.0 and 1.0) that pings received from + /// flashblocks backend would be ignored (no pong sent) + #[arg( + default_value = "0.0", + help_heading = "chaos", + env = "RPROXY_CHAOS_PROBABILITY_FLASHBLOCKS_BACKEND_PING_IGNORED", + long("chaos-probability-flashblocks-backend-ping-ignored"), + name("chaos_probability_flashblocks_backend_ping_ignored"), + value_name = "probability" + )] + #[cfg(feature = "chaos")] + pub(crate) chaos_probability_backend_ping_ignored: f64, + + /// the chance (between 0.0 and 1.0) that pings received from + /// flashblocks client would be ignored (no pong sent) + #[arg( + default_value = "0.0", + help_heading = "chaos", + env = "RPROXY_CHAOS_PROBABILITY_FLASHBLOCKS_CLIENT_PING_IGNORED", + long("chaos-probability-flashblocks-client-ping-ignored"), + name("chaos_probability_flashblocks_client_ping_ignored"), + value_name = "probability" + )] + #[cfg(feature = "chaos")] + pub(crate) chaos_probability_client_ping_ignored: f64, + + /// the chance (between 0.0 and 1.0) that client's flashblocks stream + /// would block (no more messages sent) + #[arg( + default_value = "0.0", + help_heading = "chaos", + env = "RPROXY_CHAOS_PROBABILITY_FLASHBLOCKS_STREAM_BLOCKED", + long("chaos-probability-flashblocks-stream-blocked"), + name("chaos_probability_flashblocks_stream_blocked"), + value_name = "probability" + )] + #[cfg(feature = "chaos")] + pub(crate) chaos_probability_stream_blocked: f64, } impl ConfigFlashblocks { @@ -119,6 +158,42 @@ impl ConfigFlashblocks { }) }); + #[cfg(feature = "chaos")] + { + // chaos_probability_flashblocks_backend_ping_ignored + if self.chaos_probability_backend_ping_ignored < 0.0 || + self.chaos_probability_backend_ping_ignored > 1.0 + { + errs.push( + ConfigFlashblocksError::ChaosProbabilityFlashblocksBackendPingIgnoredInvalid { + probability: self.chaos_probability_backend_ping_ignored, + }, + ); + } + + // chaos_probability_flashblocks_client_ping_ignored + if self.chaos_probability_client_ping_ignored < 0.0 || + self.chaos_probability_client_ping_ignored > 1.0 + { + errs.push( + ConfigFlashblocksError::ChaosProbabilityFlashblocksClientPingIgnoredInvalid { + probability: self.chaos_probability_client_ping_ignored, + }, + ); + } + + // chaos_probability_flashblocks_stream_blocked + if self.chaos_probability_stream_blocked < 0.0 || + self.chaos_probability_stream_blocked > 1.0 + { + errs.push( + ConfigFlashblocksError::ChaosProbabilityFlashblocksStreamBlockedInvalid { + probability: self.chaos_probability_stream_blocked, + }, + ); + } + } + match errs.len() { 0 => None, _ => Some(errs), @@ -156,6 +231,24 @@ impl ConfigProxyWs for ConfigFlashblocks { fn log_sanitise(&self) -> bool { self.log_sanitise } + + #[inline] + #[cfg(feature = "chaos")] + fn chaos_probability_backend_ping_ignored(&self) -> f64 { + self.chaos_probability_backend_ping_ignored + } + + #[inline] + #[cfg(feature = "chaos")] + fn chaos_probability_client_ping_ignored(&self) -> f64 { + self.chaos_probability_client_ping_ignored + } + + #[inline] + #[cfg(feature = "chaos")] + fn chaos_probability_stream_blocked(&self) -> f64 { + self.chaos_probability_stream_blocked + } } // ConfigFlashblocksError ---------------------------------------------- @@ -170,4 +263,22 @@ pub(crate) enum ConfigFlashblocksError { #[error("invalid flashblocks proxy listen address '{addr}': {err}")] ListenAddressInvalid { addr: String, err: std::net::AddrParseError }, + + #[error( + "invalid flashblocks backend ping ignore probability (must be within [0.0 .. 1.0]: {probability}" + )] + #[cfg(feature = "chaos")] + ChaosProbabilityFlashblocksBackendPingIgnoredInvalid { probability: f64 }, + + #[error( + "invalid flashblocks client ping ignore probability (must be within [0.0 .. 1.0]: {probability}" + )] + #[cfg(feature = "chaos")] + ChaosProbabilityFlashblocksClientPingIgnoredInvalid { probability: f64 }, + + #[error( + "invalid flashblocks stream block probability (must be within [0.0 .. 1.0]: {probability}" + )] + #[cfg(feature = "chaos")] + ChaosProbabilityFlashblocksStreamBlockedInvalid { probability: f64 }, } diff --git a/crates/rproxy/src/server/proxy/connection_guard.rs b/crates/rproxy/src/server/proxy/connection_guard.rs index 05faf54..d1c04ef 100644 --- a/crates/rproxy/src/server/proxy/connection_guard.rs +++ b/crates/rproxy/src/server/proxy/connection_guard.rs @@ -127,7 +127,6 @@ impl Drop for ConnectionGuard { self.metrics.client_connections_active_count.get_or_create(&metric_labels).set(val); self.metrics.client_connections_closed_count.get_or_create(&metric_labels).inc(); - #[cfg(debug_assertions)] debug!( proxy = self.proxy, connection_id = %self.id, diff --git a/crates/rproxy/src/server/proxy/ws/config.rs b/crates/rproxy/src/server/proxy/ws/config.rs index 1f31522..af17611 100644 --- a/crates/rproxy/src/server/proxy/ws/config.rs +++ b/crates/rproxy/src/server/proxy/ws/config.rs @@ -7,4 +7,13 @@ pub(crate) trait ConfigProxyWs: Clone + Send + Unpin + 'static { fn log_backend_messages(&self) -> bool; fn log_client_messages(&self) -> bool; fn log_sanitise(&self) -> bool; + + #[cfg(feature = "chaos")] + fn chaos_probability_backend_ping_ignored(&self) -> f64; + + #[cfg(feature = "chaos")] + fn chaos_probability_client_ping_ignored(&self) -> f64; + + #[cfg(feature = "chaos")] + fn chaos_probability_stream_blocked(&self) -> f64; } diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index cc912ec..392e85d 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -73,13 +73,6 @@ where resetter: broadcast::Sender<()>, backend: ProxyWsBackendEndpoint, - - pings: HashMap, - ping_balance_clnt: AtomicI64, - ping_balance_bknd: AtomicI64, - - _config: PhantomData, - _proxy: PhantomData

, } impl ProxyWs @@ -106,19 +99,7 @@ where } .start(); - Self { - id, - shared, - postprocessor, - canceller, - resetter, - backend, - pings: HashMap::new(), - ping_balance_bknd: AtomicI64::new(0), - ping_balance_clnt: AtomicI64::new(0), - _config: PhantomData, - _proxy: PhantomData, - } + Self { id, shared, postprocessor, canceller, resetter, backend } } fn config(&self) -> &C { @@ -331,434 +312,573 @@ where let (bknd_tx, bknd_rx) = bknd_stream.split(); - Self::pump(this, info, clnt_tx, clnt_rx, bknd_tx, bknd_rx).await; - } - - async fn pump( - this: web::Data, - info: ProxyHttpRequestInfo, - mut clnt_tx: Session, - mut clnt_rx: MessageStream, - mut bknd_tx: SplitSink>, tungstenite::Message>, - mut bknd_rx: SplitStream>>, - ) { - info!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - "Starting websocket pump..." - ); - - let info = Arc::new(info); - - let mut heartbeat = tokio::time::interval(Duration::from_secs(WS_PING_INTERVAL_SECONDS)); - - let mut pumping: Result<(), &str> = Ok(()); - - let mut resetter = this.resetter.subscribe(); + let mut pump = ProxyWsPump { + info: Arc::new(info), + worker_id: this.id, + shared: this.shared.clone(), + postprocessor: this.postprocessor.clone(), + canceller: this.canceller.clone(), + resetter: this.resetter.clone(), + clnt_tx, + clnt_rx, + bknd_tx, + bknd_rx, + pings: HashMap::new(), + ping_balance_bknd: AtomicI64::new(0), + ping_balance_clnt: AtomicI64::new(0), - while pumping.is_ok() && !this.canceller.is_cancelled() && !resetter.is_closed() { - tokio::select! { - _ = this.canceller.cancelled() => { - break; - } + #[cfg(feature = "chaos")] + chaos: ProxyWsPumpChaos { + stream_is_blocked: std::sync::atomic::AtomicBool::new(false), + }, + }; - _ = resetter.recv() => { - break; - } + pump.run().await; + } - // ping both sides - _ = heartbeat.tick() => { - pumping = Self::heartbeat(&this, info.clone(), &mut clnt_tx, &mut bknd_tx).await; - } + fn finalise_proxying( + msg: ProxyWsMessage, + inner: Arc

, + metrics: Arc, + worker_id: Uuid, + ) { + Self::maybe_log_proxied_message(&msg, inner.clone(), worker_id); - // client => backend - clnt_msg = clnt_rx.next() => { - pumping = Self::pump_clnt_to_bknd( - &this, - info.clone(), - UtcDateTime::now(), - clnt_msg, - &mut bknd_tx, - &mut clnt_tx - ).await; - } + Self::emit_metrics_on_proxy_success(&msg, metrics.clone()); + } - // backend => client - bknd_msg = bknd_rx.next() => { - pumping = Self::pump_bknd_to_cli( - &this, - info.clone(), - UtcDateTime::now(), - bknd_msg, - &mut clnt_tx, - &mut bknd_tx - ).await; - } - } - } + fn maybe_log_proxied_message(msg: &ProxyWsMessage, inner: Arc

, worker_id: Uuid) { + let config = inner.config(); - if let Err(msg) = pumping && - msg != WS_CLOSE_OK - { - debug!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - msg = %msg, - "Closing client websocket session..." - ); - let _ = clnt_tx // only 1 possible error (i.e. "already closed") - .close(Some(actix_ws::CloseReason { - code: awc::ws::CloseCode::Error, - description: Some(String::from(WS_BKND_ERROR)), - })) - .await; + match msg { + ProxyWsMessage::BackendToClientBinary { msg, info, start, end } => { + let json_msg = if config.log_backend_messages() { + Loggable(&Self::maybe_sanitise( + config.log_sanitise(), + serde_json::from_slice(msg).unwrap_or_default(), + )) + } else { + Loggable(&serde_json::Value::Null) + }; - debug!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - msg = %msg, - "Closing backend websocket session..." - ); - if let Err(err) = bknd_tx - .send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame { - code: tungstenite::protocol::frame::coding::CloseCode::Error, - reason: msg.into(), - }))) - .await - { - error!( + info!( proxy = P::name(), connection_id = %info.conn_id(), - worker_id = %this.id, - msg = %msg, - error = ?err, - "Failed to close backend websocket session" + worker_id = %worker_id, + remote_addr = info.remote_addr(), + ts_message_received = start.format(&Iso8601::DEFAULT).unwrap_or_default(), + latency_proxying = (*end - *start).as_seconds_f64(), + json_msg = tracing::field::valuable(&json_msg), + "Proxied binary message to client", ); } - } else { - debug!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - "Closing client websocket session..." - ); - let _ = clnt_tx // only 1 possible error (i.e. "already closed") - .close(Some(actix_ws::CloseReason { - code: awc::ws::CloseCode::Normal, - description: None, - })) - .await; - debug!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - "Closing backend websocket session..." - ); - if let Err(err) = bknd_tx - .send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame { - code: tungstenite::protocol::frame::coding::CloseCode::Normal, - reason: Utf8Bytes::default(), - }))) - .await - { - error!( + ProxyWsMessage::BackendToClientText { msg, info, start, end } => { + let json_msg = if config.log_backend_messages() { + Loggable(&Self::maybe_sanitise( + config.log_sanitise(), + serde_json::from_str(msg).unwrap_or_default(), + )) + } else { + Loggable(&serde_json::Value::Null) + }; + + info!( proxy = P::name(), connection_id = %info.conn_id(), - worker_id = %this.id, - error = ?err, - "Failed to close backend websocket session" + worker_id = %worker_id, + remote_addr = info.remote_addr(), + ts_message_received = start.format(&Iso8601::DEFAULT).unwrap_or_default(), + latency_proxying = (*end - *start).as_seconds_f64(), + json_msg = tracing::field::valuable(&json_msg), + "Proxied text message to client", ); } - } - - info!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - "Stopped websocket pump" - ); - } - async fn heartbeat( - this: &web::Data, - info: Arc, - clnt_tx: &mut Session, - bknd_tx: &mut SplitSink>, tungstenite::Message>, - ) -> Result<(), &'static str> { - let ping_threshold = - (1 + this.config().backend_timeout().as_secs() / WS_PING_INTERVAL_SECONDS) as i64; - - { - // ping -> client + ProxyWsMessage::ClientToBackendBinary { msg, info, start, end } => { + let json_msg = if config.log_client_messages() { + Loggable(&Self::maybe_sanitise( + config.log_sanitise(), + serde_json::from_slice(msg).unwrap_or_default(), + )) + } else { + Loggable(&serde_json::Value::Null) + }; - if this.ping_balance_clnt.load(Ordering::Relaxed) > ping_threshold { - error!( + info!( proxy = P::name(), connection_id = %info.conn_id(), - worker_id = %this.id, - "More than {} websocket pings sent to client didn't return, terminating the pump...", ping_threshold, + worker_id = %worker_id, + remote_addr = info.remote_addr(), + ts_message_received = start.format(&Iso8601::DEFAULT).unwrap_or_default(), + latency_proxying = (*end - *start).as_seconds_f64(), + json_msg = tracing::field::valuable(&json_msg), + "Proxied binary message to backend", ); - return Err(WS_CLNT_ERROR); } - let clnt_ping = ProxyWsPing::new(info.conn_id()); - if let Err(err) = clnt_tx.ping(&clnt_ping.to_slice()).await { - error!( + ProxyWsMessage::ClientToBackendText { msg, info, start, end } => { + let json_msg = if config.log_client_messages() { + Loggable(&Self::maybe_sanitise( + config.log_sanitise(), + serde_json::from_str(msg).unwrap_or_default(), + )) + } else { + Loggable(&serde_json::Value::Null) + }; + + info!( proxy = P::name(), connection_id = %info.conn_id(), - worker_id = %this.id, - error = ?err, - "Failed to send ping websocket message to client" + worker_id = %worker_id, + remote_addr = info.remote_addr(), + ts_message_received = start.format(&Iso8601::DEFAULT).unwrap_or_default(), + latency_proxying = (*end - *start).as_seconds_f64(), + json_msg = tracing::field::valuable(&json_msg), + "Proxied text message to backend", ); - return Err(WS_CLNT_ERROR); } - let _ = this.pings.insert_sync(clnt_ping.id, clnt_ping); - this.ping_balance_clnt.inc(); } + } - { - // ping -> backend - - if this.ping_balance_bknd.load(Ordering::Relaxed) > ping_threshold { - error!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - "More than {} websocket pings sent to backend didn't return, terminating the pump...", ping_threshold, - ); - return Err(WS_BKND_ERROR); - } + fn maybe_sanitise(do_sanitise: bool, mut message: serde_json::Value) -> serde_json::Value { + if !do_sanitise { + return message; + } - let bknd_ping = ProxyWsPing::new(info.conn_id()); - if let Err(err) = bknd_tx.send(tungstenite::Message::Ping(bknd_ping.to_bytes())).await { - error!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - error = ?err, - "Failed to send ping websocket message to backend" - ); - return Err(WS_BKND_ERROR); + if let Some(object) = message.as_object_mut() && + let Some(diff) = object.get_mut("diff") && + let Some(transactions) = diff.get_mut("transactions") && + let Some(transactions) = transactions.as_array_mut() + { + for transaction in transactions { + raw_transaction_to_hash(transaction); } - let _ = this.pings.insert_sync(bknd_ping.id, bknd_ping); - this.ping_balance_bknd.inc(); } - Ok(()) + + message } - async fn pump_clnt_to_bknd( - this: &web::Data, - info: Arc, - timestamp: UtcDateTime, - clnt_msg: Option>, - bknd_tx: &mut SplitSink>, tungstenite::Message>, - clnt_tx: &mut Session, - ) -> Result<(), &'static str> { - match clnt_msg { - Some(Ok(msg)) => { - match msg { - // binary - actix_ws::Message::Binary(bytes) => { - if let Err(err) = - bknd_tx.send(tungstenite::Message::Binary(bytes.clone())).await - { - error!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - error = ?err, - "Failed to proxy binary websocket message to backend" - ); - this.shared - .metrics - .ws_proxy_failure_count - .get_or_create(&LabelsProxyWs { - proxy: P::name(), - destination: WS_LABEL_BKND, - }) - .inc(); - return Err(WS_BKND_ERROR); - } - this.postprocessor.do_send(ProxyWsMessage::ClientToBackendBinary { - msg: bytes, - info, - start: timestamp, - end: UtcDateTime::now(), - }); - Ok(()) - } + fn emit_metrics_on_proxy_success(msg: &ProxyWsMessage, metrics: Arc) { + match msg { + ProxyWsMessage::BackendToClientBinary { msg, info: _, start, end } => { + let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_CLNT }; + metrics + .ws_latency_proxy + .get_or_create(&labels) + .record((1000000.0 * (*end - *start).as_seconds_f64()) as i64); + metrics.ws_proxy_success_count.get_or_create_owned(&labels).inc(); + metrics.ws_message_size.get_or_create_owned(&labels).record(msg.len() as i64); + } - // text - actix_ws::Message::Text(text) => { - if let Err(err) = bknd_tx - .send(tungstenite::Message::Text(unsafe { - // safety: it's from client's ws message => must be valid utf-8 - tungstenite::protocol::frame::Utf8Bytes::from_bytes_unchecked( - text.clone().into_bytes(), - ) - })) - .await - { - error!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - error = ?err, - "Failed to proxy text websocket message to backend" - ); - this.shared - .metrics - .ws_proxy_failure_count - .get_or_create(&LabelsProxyWs { - proxy: P::name(), - destination: WS_LABEL_BKND, - }) - .inc(); - return Err(WS_BKND_ERROR); - } - this.postprocessor.do_send(ProxyWsMessage::ClientToBackendText { - msg: text, - info, - start: timestamp, - end: UtcDateTime::now(), - }); - Ok(()) - } + ProxyWsMessage::BackendToClientText { msg, info: _, start, end } => { + let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_CLNT }; + metrics + .ws_latency_proxy + .get_or_create(&labels) + .record((1000000.0 * (*end - *start).as_seconds_f64()) as i64); + metrics.ws_proxy_success_count.get_or_create_owned(&labels).inc(); + metrics.ws_message_size.get_or_create_owned(&labels).record(msg.len() as i64); + } - // ping - actix_ws::Message::Ping(bytes) => { - if let Err(err) = clnt_tx.pong(&bytes).await { - error!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - error = ?err, - "Failed to return pong message to client" - ); - return Err(WS_CLNT_ERROR); - } - Ok(()) - } + ProxyWsMessage::ClientToBackendBinary { msg, info: _, start, end } => { + let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_BKND }; + metrics + .ws_latency_proxy + .get_or_create(&labels) + .record((1000000.0 * (*end - *start).as_seconds_f64()) as i64); + metrics.ws_proxy_success_count.get_or_create_owned(&labels).inc(); + metrics.ws_message_size.get_or_create_owned(&labels).record(msg.len() as i64); + } - // pong - actix_ws::Message::Pong(bytes) => { - if let Some(pong) = ProxyWsPing::from_bytes(bytes) && - let Some((_, ping)) = this.pings.remove_sync(&pong.id) && - pong == ping - { - this.ping_balance_clnt.dec(); - this.shared - .metrics - .ws_latency_client - .get_or_create(&LabelsProxyWs { - proxy: P::name(), - destination: WS_LABEL_CLNT, - }) - .record( - (1000000.0 * (timestamp - pong.timestamp).as_seconds_f64() / - 2.0) as i64, - ); - return Ok(()); - } - warn!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - "Unexpected websocket pong received from client", - ); - Ok(()) - } + ProxyWsMessage::ClientToBackendText { msg, info: _, start, end } => { + let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_BKND }; + metrics + .ws_latency_proxy + .get_or_create(&labels) + .record((1000000.0 * (*end - *start).as_seconds_f64()) as i64); + metrics.ws_proxy_success_count.get_or_create_owned(&labels).inc(); + metrics.ws_message_size.get_or_create_owned(&labels).record(msg.len() as i64); + } + } + } +} - // close - actix_ws::Message::Close(reason) => { - if let Err(err) = bknd_tx - .send(tungstenite::Message::Close(reason.map(|r| { - tungstenite::protocol::CloseFrame { - code: tungstenite::protocol::frame::coding::CloseCode::from( - u16::from(r.code), - ), - reason: r.description.unwrap_or_default().into(), - } - }))) - .await - { - error!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - error = ?err, - "Failed to close backend websocket session" - ); - return Err(WS_BKND_ERROR); - } - Err(WS_CLOSE_OK) - } +// ProxyWsSharedState -------------------------------------------------- - _ => Ok(()), +#[derive(Clone)] +struct ProxyWsSharedState +where + C: ConfigProxyWs, + P: ProxyWsInner, +{ + inner: Arc

, + metrics: Arc, + + client_connections_count: Arc, + + _config: PhantomData, +} + +impl ProxyWsSharedState +where + C: ConfigProxyWs, + P: ProxyWsInner, +{ + fn new(config: C, metrics: &Arc) -> Self { + Self { + inner: Arc::new(P::new(config)), + metrics: metrics.clone(), + client_connections_count: Arc::new(AtomicI64::new(0)), + _config: PhantomData, + } + } + + #[inline] + fn config(&self) -> &C { + self.inner.config() + } +} + +// ProxyWsBackendEndpoint ---------------------------------------------- + +struct ProxyWsBackendEndpoint +where + C: ConfigProxyWs, + P: ProxyWsInner, +{ + worker_id: Uuid, + + url: tungstenite::http::Uri, + + _config: PhantomData, + _inner: PhantomData

, +} + +impl ProxyWsBackendEndpoint +where + C: ConfigProxyWs, + P: ProxyWsInner, +{ + fn new(worker_id: Uuid, url: tungstenite::http::Uri) -> Self { + Self { worker_id, url, _config: PhantomData, _inner: PhantomData } + } + + fn new_backend_uri(&self, info: &ProxyHttpRequestInfo) -> tungstenite::http::Uri { + let mut parts = self.url.clone().into_parts(); + let pq = tungstenite::http::uri::PathAndQuery::from_str(info.path_and_query()) + .inspect_err(|err| { + error!( + proxy = P::name(), + request_id = %info.req_id(), + connection_id = %info.conn_id(), + worker_id = %self.worker_id, + error = ?err, + "Failed to re-parse client request's path and query", + ); + }) + .ok(); + parts.path_and_query = pq; + + tungstenite::http::Uri::from_parts(parts) + .inspect_err(|err| { + error!( + proxy = P::name(), + request_id = %info.req_id(), + connection_id = %info.conn_id(), + worker_id = %self.worker_id, + error = ?err, "Failed to construct backend URI, defaulting to the base one", + ); + }) + .unwrap_or(self.url.clone()) + } +} + +// ProxyWsPump --------------------------------------------------------- + +struct ProxyWsPump +where + C: ConfigProxyWs, + P: ProxyWsInner, +{ + info: Arc, + worker_id: Uuid, + + shared: ProxyWsSharedState, + postprocessor: actix::Addr>, + canceller: tokio_util::sync::CancellationToken, + resetter: broadcast::Sender<()>, + + clnt_tx: Session, + clnt_rx: MessageStream, + bknd_tx: SplitSink>, tungstenite::Message>, + bknd_rx: SplitStream>>, + + pings: HashMap, + ping_balance_clnt: AtomicI64, + ping_balance_bknd: AtomicI64, + + #[cfg(feature = "chaos")] + chaos: ProxyWsPumpChaos, +} + +impl ProxyWsPump +where + C: ConfigProxyWs, + P: ProxyWsInner, +{ + async fn run(&mut self) { + info!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Starting websocket pump..." + ); + + let mut heartbeat = tokio::time::interval(Duration::from_secs(WS_PING_INTERVAL_SECONDS)); + + let mut pumping: Result<(), &str> = Ok(()); + + let mut resetter = self.resetter.subscribe(); + + while pumping.is_ok() && !self.canceller.is_cancelled() && !resetter.is_closed() { + tokio::select! { + _ = self.canceller.cancelled() => { + break; + } + + _ = resetter.recv() => { + break; + } + + // ping both sides + _ = heartbeat.tick() => { + pumping = self.heartbeat().await; + } + + // client => backend + clnt_msg = self.clnt_rx.next() => { + pumping = self.pump_clnt_to_bknd(UtcDateTime::now(), clnt_msg).await; + } + + // backend => client + bknd_msg = self.bknd_rx.next() => { + pumping = self.pump_bknd_to_cli(UtcDateTime::now(), bknd_msg).await; } } + } - Some(Err(err)) => { + if let Err(msg) = pumping && + msg != WS_CLOSE_OK + { + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %msg, + "Closing client websocket session..." + ); + let _ = self // only 1 possible error (i.e. "already closed") + .clnt_tx + .clone() // .close() consumes it + .close(Some(actix_ws::CloseReason { + code: awc::ws::CloseCode::Error, + description: Some(String::from(WS_BKND_ERROR)), + })) + .await; + + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %msg, + "Closing backend websocket session..." + ); + if let Err(err) = self + .bknd_tx + .send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame { + code: tungstenite::protocol::frame::coding::CloseCode::Error, + reason: msg.into(), + }))) + .await + { error!( proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %msg, error = ?err, - "Client websocket stream error" + "Failed to close backend websocket session" ); - Err(WS_CLNT_ERROR) } + } else { + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Closing client websocket session..." + ); + let _ = self // only 1 possible error (i.e. "already closed") + .clnt_tx + .clone() // .close() consumes it + .close(Some(actix_ws::CloseReason { + code: awc::ws::CloseCode::Normal, + description: None, + })) + .await; - None => { - info!( + debug!( proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - "Client had closed websocket stream" + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Closing backend websocket session..." + ); + if let Err(err) = self + .bknd_tx + .send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame { + code: tungstenite::protocol::frame::coding::CloseCode::Normal, + reason: Utf8Bytes::default(), + }))) + .await + { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + error = ?err, + "Failed to close backend websocket session" ); - Err(WS_CLOSE_OK) } } + + info!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Stopped websocket pump" + ); + } + + async fn heartbeat(&mut self) -> Result<(), &'static str> { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + return Ok(()); + } + + let ping_threshold = (1 + self.shared.config().backend_timeout().as_secs() / + WS_PING_INTERVAL_SECONDS) as i64; + + { + // ping -> client + + if self.ping_balance_clnt.load(Ordering::Relaxed) > ping_threshold { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "More than {} websocket pings sent to client didn't return, terminating the pump...", ping_threshold, + ); + return Err(WS_CLNT_ERROR); + } + + let clnt_ping = ProxyWsPing::new(self.info.conn_id()); + if let Err(err) = self.clnt_tx.ping(&clnt_ping.to_slice()).await { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + error = ?err, + "Failed to send ping websocket message to client" + ); + return Err(WS_CLNT_ERROR); + } + let _ = self.pings.insert_sync(clnt_ping.id, clnt_ping); + self.ping_balance_clnt.inc(); + } + + { + // ping -> backend + + if self.ping_balance_bknd.load(Ordering::Relaxed) > ping_threshold { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "More than {} websocket pings sent to backend didn't return, terminating the pump...", ping_threshold, + ); + return Err(WS_BKND_ERROR); + } + + let bknd_ping = ProxyWsPing::new(self.info.conn_id()); + if let Err(err) = + self.bknd_tx.send(tungstenite::Message::Ping(bknd_ping.to_bytes())).await + { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + error = ?err, + "Failed to send ping websocket message to backend" + ); + return Err(WS_BKND_ERROR); + } + let _ = self.pings.insert_sync(bknd_ping.id, bknd_ping); + self.ping_balance_bknd.inc(); + } + Ok(()) } - async fn pump_bknd_to_cli( - this: &web::Data, - info: Arc, + async fn pump_clnt_to_bknd( + &mut self, timestamp: UtcDateTime, - bknd_msg: Option>, - clnt_tx: &mut Session, - bknd_tx: &mut SplitSink>, tungstenite::Message>, + clnt_msg: Option>, ) -> Result<(), &'static str> { - match bknd_msg { + #[cfg(feature = "chaos")] + if !self.chaos.stream_is_blocked.load(Ordering::Relaxed) && + rand::random::() < self.shared.config().chaos_probability_stream_blocked() + { + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Blocking the stream (chaos)" + ); + self.chaos.stream_is_blocked.store(true, Ordering::Relaxed); + } + + match clnt_msg { Some(Ok(msg)) => { match msg { // binary - tungstenite::Message::Binary(bytes) => { - if let Err(err) = clnt_tx.binary(bytes.clone()).await { + actix_ws::Message::Binary(bytes) => { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + return Ok(()); + } + + if let Err(err) = + self.bknd_tx.send(tungstenite::Message::Binary(bytes.clone())).await + { error!( proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, error = ?err, - "Failed to proxy binary websocket message to client" + "Failed to proxy binary websocket message to backend" ); - this.shared + self.shared .metrics .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { proxy: P::name(), - destination: WS_LABEL_CLNT, + destination: WS_LABEL_BKND, }) .inc(); - return Err(WS_CLNT_ERROR); + return Err(WS_BKND_ERROR); } - this.postprocessor.do_send(ProxyWsMessage::BackendToClientBinary { + self.postprocessor.do_send(ProxyWsMessage::ClientToBackendBinary { msg: bytes, - info, + info: self.info.clone(), start: timestamp, end: UtcDateTime::now(), }); @@ -766,28 +886,42 @@ where } // text - tungstenite::Message::Text(text) => { - if let Err(err) = clnt_tx.text(text.clone().as_str()).await { + actix_ws::Message::Text(text) => { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + return Ok(()); + } + + if let Err(err) = self + .bknd_tx + .send(tungstenite::Message::Text(unsafe { + // safety: it's from client's ws message => must be valid utf-8 + tungstenite::protocol::frame::Utf8Bytes::from_bytes_unchecked( + text.clone().into_bytes(), + ) + })) + .await + { error!( proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, error = ?err, - "Failed to proxy text websocket message to client" + "Failed to proxy text websocket message to backend" ); - this.shared + self.shared .metrics .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { proxy: P::name(), - destination: WS_LABEL_CLNT, + destination: WS_LABEL_BKND, }) .inc(); - return Err(WS_CLNT_ERROR); + return Err(WS_BKND_ERROR); } - this.postprocessor.do_send(ProxyWsMessage::BackendToClientText { + self.postprocessor.do_send(ProxyWsMessage::ClientToBackendText { msg: text, - info, + info: self.info.clone(), start: timestamp, end: UtcDateTime::now(), }); @@ -795,33 +929,51 @@ where } // ping - tungstenite::Message::Ping(bytes) => { - if let Err(err) = bknd_tx.send(tungstenite::Message::Pong(bytes)).await { + actix_ws::Message::Ping(bytes) => { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + return Ok(()); + } + + #[cfg(feature = "chaos")] + if rand::random::() < + self.shared.config().chaos_probability_client_ping_ignored() + { + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Ignored client's ping (chaos)" + ); + return Ok(()); + } + + if let Err(err) = self.clnt_tx.pong(&bytes).await { error!( proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, error = ?err, - "Failed to return pong message to backend" + "Failed to return pong message to client" ); - return Err(WS_BKND_ERROR); + return Err(WS_CLNT_ERROR); } Ok(()) } // pong - tungstenite::Message::Pong(bytes) => { + actix_ws::Message::Pong(bytes) => { if let Some(pong) = ProxyWsPing::from_bytes(bytes) && - let Some((_, ping)) = this.pings.remove_sync(&pong.id) && + let Some((_, ping)) = self.pings.remove_sync(&pong.id) && pong == ping { - this.ping_balance_bknd.dec(); - this.shared + self.ping_balance_clnt.dec(); + self.shared .metrics - .ws_latency_backend + .ws_latency_client .get_or_create(&LabelsProxyWs { proxy: P::name(), - destination: WS_LABEL_BKND, + destination: WS_LABEL_CLNT, }) .record( (1000000.0 * (timestamp - pong.timestamp).as_seconds_f64() / @@ -831,321 +983,270 @@ where } warn!( proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - "Unexpected websocket pong received from backend", + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Unexpected websocket pong received from client", ); Ok(()) } // close - tungstenite::Message::Close(reason) => { - if let Err(err) = clnt_tx - .clone() // .close() consumes it - .close(reason.map(|reason| actix_ws::CloseReason { - code: u16::from(reason.code).into(), - description: reason.reason.to_string().into(), - })) - .await - { - error!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - error = ?err, - "Failed to proxy close websocket message to client" - ); - return Err(WS_CLNT_ERROR); - } - Err(WS_CLOSE_OK) - } - - _ => Ok(()), - } - } - - Some(Err(err)) => { - error!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - error = ?err, - "Backend websocket stream error" - ); - Err(WS_BKND_ERROR) - } - - None => { - info!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %this.id, - "Backend had closed websocket stream" - ); - Err(WS_CLOSE_OK) - } - } - } - - fn finalise_proxying( - msg: ProxyWsMessage, - inner: Arc

, - metrics: Arc, - worker_id: Uuid, - ) { - Self::maybe_log_proxied_message(&msg, inner.clone(), worker_id); - - Self::emit_metrics_on_proxy_success(&msg, metrics.clone()); - } - - fn maybe_log_proxied_message(msg: &ProxyWsMessage, inner: Arc

, worker_id: Uuid) { - let config = inner.config(); - - match msg { - ProxyWsMessage::BackendToClientBinary { msg, info, start, end } => { - let json_msg = if config.log_backend_messages() { - Loggable(&Self::maybe_sanitise( - config.log_sanitise(), - serde_json::from_slice(msg).unwrap_or_default(), - )) - } else { - Loggable(&serde_json::Value::Null) - }; - - info!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %worker_id, - remote_addr = info.remote_addr(), - ts_message_received = start.format(&Iso8601::DEFAULT).unwrap_or_default(), - latency_proxying = (*end - *start).as_seconds_f64(), - json_msg = tracing::field::valuable(&json_msg), - "Proxied binary message to client", - ); - } - - ProxyWsMessage::BackendToClientText { msg, info, start, end } => { - let json_msg = if config.log_backend_messages() { - Loggable(&Self::maybe_sanitise( - config.log_sanitise(), - serde_json::from_str(msg).unwrap_or_default(), - )) - } else { - Loggable(&serde_json::Value::Null) - }; - - info!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %worker_id, - remote_addr = info.remote_addr(), - ts_message_received = start.format(&Iso8601::DEFAULT).unwrap_or_default(), - latency_proxying = (*end - *start).as_seconds_f64(), - json_msg = tracing::field::valuable(&json_msg), - "Proxied text message to client", - ); - } - - ProxyWsMessage::ClientToBackendBinary { msg, info, start, end } => { - let json_msg = if config.log_client_messages() { - Loggable(&Self::maybe_sanitise( - config.log_sanitise(), - serde_json::from_slice(msg).unwrap_or_default(), - )) - } else { - Loggable(&serde_json::Value::Null) - }; - - info!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %worker_id, - remote_addr = info.remote_addr(), - ts_message_received = start.format(&Iso8601::DEFAULT).unwrap_or_default(), - latency_proxying = (*end - *start).as_seconds_f64(), - json_msg = tracing::field::valuable(&json_msg), - "Proxied binary message to backend", - ); - } - - ProxyWsMessage::ClientToBackendText { msg, info, start, end } => { - let json_msg = if config.log_client_messages() { - Loggable(&Self::maybe_sanitise( - config.log_sanitise(), - serde_json::from_str(msg).unwrap_or_default(), - )) - } else { - Loggable(&serde_json::Value::Null) - }; - - info!( - proxy = P::name(), - connection_id = %info.conn_id(), - worker_id = %worker_id, - remote_addr = info.remote_addr(), - ts_message_received = start.format(&Iso8601::DEFAULT).unwrap_or_default(), - latency_proxying = (*end - *start).as_seconds_f64(), - json_msg = tracing::field::valuable(&json_msg), - "Proxied text message to backend", - ); - } - } - } - - fn maybe_sanitise(do_sanitise: bool, mut message: serde_json::Value) -> serde_json::Value { - if !do_sanitise { - return message; - } - - if let Some(object) = message.as_object_mut() && - let Some(diff) = object.get_mut("diff") && - let Some(transactions) = diff.get_mut("transactions") && - let Some(transactions) = transactions.as_array_mut() - { - for transaction in transactions { - raw_transaction_to_hash(transaction); - } - } - - message - } - - fn emit_metrics_on_proxy_success(msg: &ProxyWsMessage, metrics: Arc) { - match msg { - ProxyWsMessage::BackendToClientBinary { msg, info: _, start, end } => { - let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_CLNT }; - metrics - .ws_latency_proxy - .get_or_create(&labels) - .record((1000000.0 * (*end - *start).as_seconds_f64()) as i64); - metrics.ws_proxy_success_count.get_or_create_owned(&labels).inc(); - metrics.ws_message_size.get_or_create_owned(&labels).record(msg.len() as i64); - } + actix_ws::Message::Close(reason) => { + if let Err(err) = self + .bknd_tx + .send(tungstenite::Message::Close(reason.map(|r| { + tungstenite::protocol::CloseFrame { + code: tungstenite::protocol::frame::coding::CloseCode::from( + u16::from(r.code), + ), + reason: r.description.unwrap_or_default().into(), + } + }))) + .await + { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + error = ?err, + "Failed to close backend websocket session" + ); + return Err(WS_BKND_ERROR); + } + Err(WS_CLOSE_OK) + } - ProxyWsMessage::BackendToClientText { msg, info: _, start, end } => { - let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_CLNT }; - metrics - .ws_latency_proxy - .get_or_create(&labels) - .record((1000000.0 * (*end - *start).as_seconds_f64()) as i64); - metrics.ws_proxy_success_count.get_or_create_owned(&labels).inc(); - metrics.ws_message_size.get_or_create_owned(&labels).record(msg.len() as i64); + _ => Ok(()), + } } - ProxyWsMessage::ClientToBackendBinary { msg, info: _, start, end } => { - let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_BKND }; - metrics - .ws_latency_proxy - .get_or_create(&labels) - .record((1000000.0 * (*end - *start).as_seconds_f64()) as i64); - metrics.ws_proxy_success_count.get_or_create_owned(&labels).inc(); - metrics.ws_message_size.get_or_create_owned(&labels).record(msg.len() as i64); + Some(Err(err)) => { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + error = ?err, + "Client websocket stream error" + ); + Err(WS_CLNT_ERROR) } - ProxyWsMessage::ClientToBackendText { msg, info: _, start, end } => { - let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_BKND }; - metrics - .ws_latency_proxy - .get_or_create(&labels) - .record((1000000.0 * (*end - *start).as_seconds_f64()) as i64); - metrics.ws_proxy_success_count.get_or_create_owned(&labels).inc(); - metrics.ws_message_size.get_or_create_owned(&labels).record(msg.len() as i64); + None => { + info!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Client had closed websocket stream" + ); + Err(WS_CLOSE_OK) } } } -} -// ProxyWsSharedState -------------------------------------------------- + async fn pump_bknd_to_cli( + &mut self, + timestamp: UtcDateTime, + bknd_msg: Option>, + ) -> Result<(), &'static str> { + #[cfg(feature = "chaos")] + if !self.chaos.stream_is_blocked.load(Ordering::Relaxed) && + rand::random::() < self.shared.config().chaos_probability_stream_blocked() + { + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Blocking the stream (chaos)" + ); + self.chaos.stream_is_blocked.store(true, Ordering::Relaxed); + } -#[derive(Clone)] -struct ProxyWsSharedState -where - C: ConfigProxyWs, - P: ProxyWsInner, -{ - inner: Arc

, - metrics: Arc, + match bknd_msg { + Some(Ok(msg)) => { + match msg { + // binary + tungstenite::Message::Binary(bytes) => { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + return Ok(()); + } - client_connections_count: Arc, + if let Err(err) = self.clnt_tx.binary(bytes.clone()).await { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + error = ?err, + "Failed to proxy binary websocket message to client" + ); + self.shared + .metrics + .ws_proxy_failure_count + .get_or_create(&LabelsProxyWs { + proxy: P::name(), + destination: WS_LABEL_CLNT, + }) + .inc(); + return Err(WS_CLNT_ERROR); + } + self.postprocessor.do_send(ProxyWsMessage::BackendToClientBinary { + msg: bytes, + info: self.info.clone(), + start: timestamp, + end: UtcDateTime::now(), + }); + Ok(()) + } - _config: PhantomData, -} + // text + tungstenite::Message::Text(text) => { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + return Ok(()); + } -impl ProxyWsSharedState -where - C: ConfigProxyWs, - P: ProxyWsInner, -{ - fn new(config: C, metrics: &Arc) -> Self { - Self { - inner: Arc::new(P::new(config)), - metrics: metrics.clone(), - client_connections_count: Arc::new(AtomicI64::new(0)), - _config: PhantomData, - } - } + if let Err(err) = self.clnt_tx.text(text.clone().as_str()).await { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + error = ?err, + "Failed to proxy text websocket message to client" + ); + self.shared + .metrics + .ws_proxy_failure_count + .get_or_create(&LabelsProxyWs { + proxy: P::name(), + destination: WS_LABEL_CLNT, + }) + .inc(); + return Err(WS_CLNT_ERROR); + } + self.postprocessor.do_send(ProxyWsMessage::BackendToClientText { + msg: text, + info: self.info.clone(), + start: timestamp, + end: UtcDateTime::now(), + }); + Ok(()) + } - #[inline] - fn config(&self) -> &C { - self.inner.config() - } -} + // ping + tungstenite::Message::Ping(bytes) => { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + return Ok(()); + } -// ProxyWsBackendEndpoint ---------------------------------------------- + #[cfg(feature = "chaos")] + if rand::random::() < + self.shared.config().chaos_probability_backend_ping_ignored() + { + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Ignored backend's ping (chaos)" + ); + return Ok(()); + } -struct ProxyWsBackendEndpoint -where - C: ConfigProxyWs, - P: ProxyWsInner, -{ - worker_id: Uuid, + if let Err(err) = self.bknd_tx.send(tungstenite::Message::Pong(bytes)).await + { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + error = ?err, + "Failed to return pong message to backend" + ); + return Err(WS_BKND_ERROR); + } + Ok(()) + } - url: tungstenite::http::Uri, + // pong + tungstenite::Message::Pong(bytes) => { + if let Some(pong) = ProxyWsPing::from_bytes(bytes) && + let Some((_, ping)) = self.pings.remove_sync(&pong.id) && + pong == ping + { + self.ping_balance_bknd.dec(); + self.shared + .metrics + .ws_latency_backend + .get_or_create(&LabelsProxyWs { + proxy: P::name(), + destination: WS_LABEL_BKND, + }) + .record( + (1000000.0 * (timestamp - pong.timestamp).as_seconds_f64() / + 2.0) as i64, + ); + return Ok(()); + } + warn!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Unexpected websocket pong received from backend", + ); + Ok(()) + } - _config: PhantomData, - _inner: PhantomData

, -} + // close + tungstenite::Message::Close(reason) => { + if let Err(err) = self + .clnt_tx + .clone() // .close() consumes it + .close(reason.map(|reason| actix_ws::CloseReason { + code: u16::from(reason.code).into(), + description: reason.reason.to_string().into(), + })) + .await + { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + error = ?err, + "Failed to proxy close websocket message to client" + ); + return Err(WS_CLNT_ERROR); + } + Err(WS_CLOSE_OK) + } -impl ProxyWsBackendEndpoint -where - C: ConfigProxyWs, - P: ProxyWsInner, -{ - fn new(worker_id: Uuid, url: tungstenite::http::Uri) -> Self { - Self { worker_id, url, _config: PhantomData, _inner: PhantomData } - } + _ => Ok(()), + } + } - fn new_backend_uri(&self, info: &ProxyHttpRequestInfo) -> tungstenite::http::Uri { - let mut parts = self.url.clone().into_parts(); - let pq = tungstenite::http::uri::PathAndQuery::from_str(info.path_and_query()) - .inspect_err(|err| { + Some(Err(err)) => { error!( proxy = P::name(), - request_id = %info.req_id(), - connection_id = %info.conn_id(), + connection_id = %self.info.conn_id(), worker_id = %self.worker_id, error = ?err, - "Failed to re-parse client request's path and query", + "Backend websocket stream error" ); - }) - .ok(); - parts.path_and_query = pq; + Err(WS_BKND_ERROR) + } - tungstenite::http::Uri::from_parts(parts) - .inspect_err(|err| { - error!( + None => { + info!( proxy = P::name(), - request_id = %info.req_id(), - connection_id = %info.conn_id(), + connection_id = %self.info.conn_id(), worker_id = %self.worker_id, - error = ?err, "Failed to construct backend URI, defaulting to the base one", + "Backend had closed websocket stream" ); - }) - .unwrap_or(self.url.clone()) + Err(WS_CLOSE_OK) + } + } } } -// ProxyWsPostprocessor +// ProxyWsPostprocessor ------------------------------------------------ struct ProxyWsPostprocessor where @@ -1274,6 +1375,15 @@ impl ProxyWsPing { } } +// ProxyWsPumpChaos -------------------------------------------------------- + +#[cfg(feature = "chaos")] +struct ProxyWsPumpChaos { + stream_is_blocked: std::sync::atomic::AtomicBool, +} + +// tests --------------------------------------------------------------- + #[cfg(test)] mod tests { use super::*; diff --git a/readme.md b/readme.md index cf78ba0..7b3d229 100644 --- a/readme.md +++ b/readme.md @@ -375,6 +375,34 @@ tls: [default: ] ``` +### Chaos + +These flags are enabled when `rproxy` is built with `chaos` feature enabled: + +```text +chaos: + --chaos-probability-flashblocks-backend-ping-ignored + the chance (between 0.0 and 1.0) that pings received from flashblocks backend + would be ignored (no pong sent) + + [env: RPROXY_CHAOS_PROBABILITY_FLASHBLOCKS_BACKEND_PING_IGNORED=] + [default: 0.0] + + --chaos-probability-flashblocks-client-ping-ignored + the chance (between 0.0 and 1.0) that pings received from flashblocks client + would be ignored (no pong sent) + + [env: RPROXY_CHAOS_PROBABILITY_FLASHBLOCKS_CLIENT_PING_IGNORED=] + [default: 0.0] + + --chaos-probability-flashblocks-stream-blocked + the chance (between 0.0 and 1.0) that client's flashblocks stream would block + (no more messages sent) + + [env: RPROXY_CHAOS_PROBABILITY_FLASHBLOCKS_STREAM_BLOCKED=] + [default: 0.0] +``` + ## Metrics ```prometheus From 0fd0954ee3da5aa40c9711abfe6bfae5fccb1797 Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Fri, 28 Nov 2025 10:56:53 +0100 Subject: [PATCH 2/2] chore: simplify ws close handling --- crates/rproxy/src/server/proxy/ws/proxy.rs | 292 ++++++++++++--------- 1 file changed, 164 insertions(+), 128 deletions(-) diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index 392e85d..5528a29 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -31,7 +31,6 @@ use time::{UtcDateTime, format_description::well_known::Iso8601}; use tokio::{net::TcpStream, sync::broadcast}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use tracing::{debug, error, info, trace, warn}; -use tungstenite::Utf8Bytes; use uuid::Uuid; use x509_parser::asn1_rs::ToStatic; @@ -55,6 +54,9 @@ const WS_CLNT_ERROR: &str = "client error"; const WS_BKND_ERROR: &str = "backend error"; const WS_CLOSE_OK: &str = ""; +const WS_CLOSE_REASON_NORMAL: &str = "normal-close"; +const WS_CLOSE_REASON_UNSPECIFIED: &str = "unexpected-close"; + const WS_LABEL_BKND: &str = "backend"; const WS_LABEL_CLNT: &str = "client"; @@ -668,87 +670,33 @@ where } } - if let Err(msg) = pumping && - msg != WS_CLOSE_OK - { - debug!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - msg = %msg, - "Closing client websocket session..." - ); - let _ = self // only 1 possible error (i.e. "already closed") - .clnt_tx - .clone() // .close() consumes it - .close(Some(actix_ws::CloseReason { - code: awc::ws::CloseCode::Error, - description: Some(String::from(WS_BKND_ERROR)), - })) - .await; - - debug!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - msg = %msg, - "Closing backend websocket session..." - ); - if let Err(err) = self - .bknd_tx - .send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame { - code: tungstenite::protocol::frame::coding::CloseCode::Error, - reason: msg.into(), - }))) - .await - { - error!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - msg = %msg, - error = ?err, - "Failed to close backend websocket session" - ); - } - } else { - debug!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - "Closing client websocket session..." - ); - let _ = self // only 1 possible error (i.e. "already closed") - .clnt_tx - .clone() // .close() consumes it - .close(Some(actix_ws::CloseReason { - code: awc::ws::CloseCode::Normal, - description: None, - })) - .await; - - debug!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - "Closing backend websocket session..." - ); - if let Err(err) = self - .bknd_tx - .send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame { - code: tungstenite::protocol::frame::coding::CloseCode::Normal, - reason: Utf8Bytes::default(), - }))) - .await - { - error!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - error = ?err, - "Failed to close backend websocket session" - ); - } + if let Err(reason) = pumping { + let (frame_clnt, frame_bknd) = match reason { + WS_CLOSE_OK => ( + actix_ws::CloseReason { + code: awc::ws::CloseCode::Normal, + description: WS_CLOSE_REASON_NORMAL.to_string().into(), + }, + tungstenite::protocol::CloseFrame { + code: tungstenite::protocol::frame::coding::CloseCode::Normal, + reason: WS_CLOSE_REASON_NORMAL.into(), + }, + ), + + _ => ( + actix_ws::CloseReason { + code: awc::ws::CloseCode::Error, + description: reason.to_string().into(), + }, + tungstenite::protocol::CloseFrame { + code: tungstenite::protocol::frame::coding::CloseCode::Error, + reason: reason.into(), + }, + ), + }; + + _ = self.close_clnt_session(frame_clnt).await; + _ = self.close_bknd_session(frame_bknd).await; } info!( @@ -849,7 +797,7 @@ where match clnt_msg { Some(Ok(msg)) => { match msg { - // binary + // binary msg from client actix_ws::Message::Binary(bytes) => { #[cfg(feature = "chaos")] if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { @@ -885,7 +833,7 @@ where Ok(()) } - // text + // text msg from client actix_ws::Message::Text(text) => { #[cfg(feature = "chaos")] if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { @@ -928,8 +876,16 @@ where Ok(()) } - // ping + // ping msg from client actix_ws::Message::Ping(bytes) => { + #[cfg(debug_assertions)] + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Handling client's ping..." + ); + #[cfg(feature = "chaos")] if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { return Ok(()); @@ -961,8 +917,16 @@ where Ok(()) } - // pong + // pong msg from client actix_ws::Message::Pong(bytes) => { + #[cfg(debug_assertions)] + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Received pong form client" + ); + if let Some(pong) = ProxyWsPing::from_bytes(bytes) && let Some((_, ping)) = self.pings.remove_sync(&pong.id) && pong == ping @@ -990,30 +954,26 @@ where Ok(()) } - // close + // close msg from client actix_ws::Message::Close(reason) => { - if let Err(err) = self - .bknd_tx - .send(tungstenite::Message::Close(reason.map(|r| { - tungstenite::protocol::CloseFrame { - code: tungstenite::protocol::frame::coding::CloseCode::from( - u16::from(r.code), - ), - reason: r.description.unwrap_or_default().into(), - } - }))) - .await - { - error!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - error = ?err, - "Failed to close backend websocket session" - ); - return Err(WS_BKND_ERROR); - } - Err(WS_CLOSE_OK) + return self + .close_bknd_session( + reason + .map(|r| tungstenite::protocol::CloseFrame { + code: tungstenite::protocol::frame::coding::CloseCode::from( + u16::from(r.code), + ), + reason: r + .description + .map_or(WS_CLOSE_REASON_NORMAL.into(), |r| r.into()), + }) + .unwrap_or(tungstenite::protocol::CloseFrame { + code: + tungstenite::protocol::frame::coding::CloseCode::Normal, + reason: WS_CLOSE_REASON_NORMAL.into(), + }), + ) + .await; } _ => Ok(()), @@ -1134,6 +1094,14 @@ where // ping tungstenite::Message::Ping(bytes) => { + #[cfg(debug_assertions)] + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Handling backend's ping..." + ); + #[cfg(feature = "chaos")] if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { return Ok(()); @@ -1168,6 +1136,14 @@ where // pong tungstenite::Message::Pong(bytes) => { + #[cfg(debug_assertions)] + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Received pong form backend" + ); + if let Some(pong) = ProxyWsPing::from_bytes(bytes) && let Some((_, ping)) = self.pings.remove_sync(&pong.id) && pong == ping @@ -1197,25 +1173,19 @@ where // close tungstenite::Message::Close(reason) => { - if let Err(err) = self - .clnt_tx - .clone() // .close() consumes it - .close(reason.map(|reason| actix_ws::CloseReason { - code: u16::from(reason.code).into(), - description: reason.reason.to_string().into(), - })) - .await - { - error!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - error = ?err, - "Failed to proxy close websocket message to client" - ); - return Err(WS_CLNT_ERROR); - } - Err(WS_CLOSE_OK) + return self + .close_clnt_session( + reason + .map(|reason| actix_ws::CloseReason { + code: u16::from(reason.code).into(), + description: reason.reason.to_string().into(), + }) + .unwrap_or(actix_ws::CloseReason { + code: awc::ws::CloseCode::Normal, + description: WS_CLOSE_REASON_UNSPECIFIED.to_string().into(), + }), + ) + .await; } _ => Ok(()), @@ -1244,6 +1214,72 @@ where } } } + + async fn close_clnt_session( + &mut self, + frame: actix_ws::CloseReason, + ) -> Result<(), &'static str> { + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = ?frame.description, + "Closing client websocket session..." + ); + let _ = self // only 1 possible "already closed" error (which we ignore) + .clnt_tx + .clone() // .close() consumes it + .close(Some(frame)) + .await; + Ok(()) + } + + async fn close_bknd_session( + &mut self, + frame: tungstenite::protocol::CloseFrame, + ) -> Result<(), &'static str> { + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %frame.reason, + "Closing backend websocket session..." + ); + + if let Err(err) = self + .bknd_tx + .send(tungstenite::Message::Close(Some( + frame.clone(), // it's cheap to clone + ))) + .await + { + if let tungstenite::error::Error::Protocol(protocol_err) = err { + if protocol_err == tungstenite::error::ProtocolError::SendAfterClosing { + return Ok(()); + } + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %frame.reason, + error = ?protocol_err, + "Failed to close backend websocket session" + ); + } else { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %frame.reason, + error = ?err, + "Failed to close backend websocket session" + ); + } + return Err(WS_BKND_ERROR); + } + + Ok(()) + } } // ProxyWsPostprocessor ------------------------------------------------