Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 46 additions & 4 deletions crates/rproxy/src/server/proxy/ws/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use scc::HashMap;
use time::{UtcDateTime, format_description::well_known::Iso8601};
use tokio::{net::TcpStream, sync::broadcast};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tracing::{error, info, trace, warn};
use tracing::{debug, error, info, trace, warn};
use tungstenite::Utf8Bytes;
use uuid::Uuid;
use x509_parser::asn1_rs::ToStatic;
Expand Down Expand Up @@ -72,6 +72,7 @@ where
shared: ProxyWsSharedState<C, P>,
postprocessor: actix::Addr<ProxyWsPostprocessor<C, P>>,
canceller: tokio_util::sync::CancellationToken,
resetter: broadcast::Sender<()>,

backend: ProxyWsBackendEndpoint<C, P>,

Expand All @@ -91,6 +92,7 @@ where
fn new(
shared: ProxyWsSharedState<C, P>,
canceller: tokio_util::sync::CancellationToken,
resetter: broadcast::Sender<()>,
) -> Self {
let id = Uuid::now_v7();

Expand All @@ -111,6 +113,7 @@ where
shared,
postprocessor,
canceller,
resetter,
backend,
pings: HashMap::new(),
ping_balance_bck: AtomicI64::new(0),
Expand Down Expand Up @@ -151,6 +154,7 @@ where
let shared = ProxyWsSharedState::<C, P>::new(config, &metrics);
let client_connections_count = shared.client_connections_count.clone();
let worker_canceller = canceller.clone();
let worker_resetter = resetter.clone();

info!(
proxy = P::name(),
Expand All @@ -160,7 +164,11 @@ where
);

let server = HttpServer::new(move || {
let this = web::Data::new(Self::new(shared.clone(), worker_canceller.clone()));
let this = web::Data::new(Self::new(
shared.clone(),
worker_canceller.clone(),
worker_resetter.clone(),
));

App::new()
.app_data(this)
Expand Down Expand Up @@ -224,7 +232,7 @@ where
// allow time to flush buffers on close
socket.set_linger(Some(config.backend_timeout()))?;

// allow binding to the socket whlie there are still TIME_WAIT conns
// allow binding to the socket while there are still TIME_WAIT connections
socket.set_reuse_address(true)?;

socket.bind(&socket2::SockAddr::from(config.listen_address()))?;
Expand Down Expand Up @@ -369,12 +377,18 @@ where

let mut pumping: Result<(), &str> = Ok(());

while pumping.is_ok() && !this.canceller.is_cancelled() {
let mut resetter = this.resetter.subscribe();

while pumping.is_ok() && !this.canceller.is_cancelled() && !resetter.is_closed() {
tokio::select! {
_ = this.canceller.cancelled() => {
break;
}

_ = resetter.recv() => {
break;
}

// ping both sides
_ = heartbeat.tick() => {
pumping = Self::heartbeat(&this, info.clone(), &mut cli_tx, &mut bck_tx).await;
Expand Down Expand Up @@ -409,6 +423,13 @@ where
if let Err(msg) = pumping &&
msg != WS_CLOSE_OK
{
debug!(
proxy = P::name(),
connection_id = %info.connection_id(),
worker_id = %this.id,
msg = %msg,
"Closing client websocket session..."
);
if let Err(err) = cli_tx
.close(Some(actix_ws::CloseReason {
code: awc::ws::CloseCode::Error,
Expand All @@ -420,11 +441,19 @@ where
proxy = P::name(),
connection_id = %info.connection_id(),
worker_id = %this.id,
msg = %msg,
error = ?err,
"Failed to close client websocket session"
);
}

debug!(
proxy = P::name(),
connection_id = %info.connection_id(),
worker_id = %this.id,
msg = %msg,
"Closing backend websocket session..."
);
if let Err(err) = bck_tx
.send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame {
code: tungstenite::protocol::frame::coding::CloseCode::Error,
Expand All @@ -436,11 +465,18 @@ where
proxy = P::name(),
connection_id = %info.connection_id(),
worker_id = %this.id,
msg = %msg,
error = ?err,
"Failed to close backend websocket session"
);
}
} else {
debug!(
proxy = P::name(),
connection_id = %info.connection_id(),
worker_id = %this.id,
"Closing client websocket session..."
);
if let Err(err) = cli_tx
.close(Some(actix_ws::CloseReason {
code: awc::ws::CloseCode::Normal,
Expand All @@ -457,6 +493,12 @@ where
);
}

debug!(
proxy = P::name(),
connection_id = %info.connection_id(),
worker_id = %this.id,
"Closing backend websocket session..."
);
if let Err(err) = bck_tx
.send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame {
code: tungstenite::protocol::frame::coding::CloseCode::Normal,
Expand Down