diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index c468c8a..43133d2 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -30,7 +30,7 @@ use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; use tokio::{net::TcpStream, sync::broadcast}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; use tungstenite::Utf8Bytes; use uuid::Uuid; use x509_parser::asn1_rs::ToStatic; @@ -72,6 +72,7 @@ where shared: ProxyWsSharedState, postprocessor: actix::Addr>, canceller: tokio_util::sync::CancellationToken, + resetter: broadcast::Sender<()>, backend: ProxyWsBackendEndpoint, @@ -91,6 +92,7 @@ where fn new( shared: ProxyWsSharedState, canceller: tokio_util::sync::CancellationToken, + resetter: broadcast::Sender<()>, ) -> Self { let id = Uuid::now_v7(); @@ -111,6 +113,7 @@ where shared, postprocessor, canceller, + resetter, backend, pings: HashMap::new(), ping_balance_bck: AtomicI64::new(0), @@ -151,6 +154,7 @@ where let shared = ProxyWsSharedState::::new(config, &metrics); let client_connections_count = shared.client_connections_count.clone(); let worker_canceller = canceller.clone(); + let worker_resetter = resetter.clone(); info!( proxy = P::name(), @@ -160,7 +164,11 @@ where ); let server = HttpServer::new(move || { - let this = web::Data::new(Self::new(shared.clone(), worker_canceller.clone())); + let this = web::Data::new(Self::new( + shared.clone(), + worker_canceller.clone(), + worker_resetter.clone(), + )); App::new() .app_data(this) @@ -224,7 +232,7 @@ where // allow time to flush buffers on close socket.set_linger(Some(config.backend_timeout()))?; - // allow binding to the socket whlie there are still TIME_WAIT conns + // allow binding to the socket while there are still TIME_WAIT connections socket.set_reuse_address(true)?; socket.bind(&socket2::SockAddr::from(config.listen_address()))?; @@ -369,12 +377,18 @@ where let mut pumping: Result<(), &str> = Ok(()); - while pumping.is_ok() && !this.canceller.is_cancelled() { + let mut resetter = this.resetter.subscribe(); + + while pumping.is_ok() && !this.canceller.is_cancelled() && !resetter.is_closed() { tokio::select! { _ = this.canceller.cancelled() => { break; } + _ = resetter.recv() => { + break; + } + // ping both sides _ = heartbeat.tick() => { pumping = Self::heartbeat(&this, info.clone(), &mut cli_tx, &mut bck_tx).await; @@ -409,6 +423,13 @@ where if let Err(msg) = pumping && msg != WS_CLOSE_OK { + debug!( + proxy = P::name(), + connection_id = %info.connection_id(), + worker_id = %this.id, + msg = %msg, + "Closing client websocket session..." + ); if let Err(err) = cli_tx .close(Some(actix_ws::CloseReason { code: awc::ws::CloseCode::Error, @@ -420,11 +441,19 @@ where proxy = P::name(), connection_id = %info.connection_id(), worker_id = %this.id, + msg = %msg, error = ?err, "Failed to close client websocket session" ); } + debug!( + proxy = P::name(), + connection_id = %info.connection_id(), + worker_id = %this.id, + msg = %msg, + "Closing backend websocket session..." + ); if let Err(err) = bck_tx .send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame { code: tungstenite::protocol::frame::coding::CloseCode::Error, @@ -436,11 +465,18 @@ where proxy = P::name(), connection_id = %info.connection_id(), worker_id = %this.id, + msg = %msg, error = ?err, "Failed to close backend websocket session" ); } } else { + debug!( + proxy = P::name(), + connection_id = %info.connection_id(), + worker_id = %this.id, + "Closing client websocket session..." + ); if let Err(err) = cli_tx .close(Some(actix_ws::CloseReason { code: awc::ws::CloseCode::Normal, @@ -457,6 +493,12 @@ where ); } + debug!( + proxy = P::name(), + connection_id = %info.connection_id(), + worker_id = %this.id, + "Closing backend websocket session..." + ); if let Err(err) = bck_tx .send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame { code: tungstenite::protocol::frame::coding::CloseCode::Normal,