Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 63 additions & 29 deletions desktop/Backend-Rust/src/routes/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,17 @@ async fn deepgram_ws_proxy(
}))
}

/// Which side of the proxy terminated first
#[derive(Debug)]
enum ProxyCloseOrigin {
ClientClosed,
UpstreamClosed,
ClientError,
UpstreamError,
}

/// Bidirectional WebSocket proxy between client (axum) and upstream (tokio-tungstenite).
/// When one side closes or errors, a close frame is forwarded to the other side before teardown.
async fn proxy_ws_bidirectional(
client_socket: axum::extract::ws::WebSocket,
upstream_url: &str,
Expand All @@ -210,47 +220,71 @@ async fn proxy_ws_bidirectional(

// Client → Upstream
let client_to_upstream = async {
while let Some(Ok(msg)) = client_stream.next().await {
let tung_msg = match msg {
AxumMsg::Text(t) => TungMsg::Text(t),
AxumMsg::Binary(b) => TungMsg::Binary(b),
AxumMsg::Ping(p) => TungMsg::Ping(p),
AxumMsg::Pong(p) => TungMsg::Pong(p),
AxumMsg::Close(_) => {
let _ = upstream_sink.close().await;
return;
while let Some(result) = client_stream.next().await {
match result {
Ok(msg) => {
let tung_msg = match msg {
AxumMsg::Text(t) => TungMsg::Text(t),
AxumMsg::Binary(b) => TungMsg::Binary(b),
AxumMsg::Ping(p) => TungMsg::Ping(p),
AxumMsg::Pong(p) => TungMsg::Pong(p),
AxumMsg::Close(_) => {
let _ = upstream_sink.close().await;
return ProxyCloseOrigin::ClientClosed;
}
Comment on lines +231 to +234
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Double upstream_sink.close() for ClientClosed

When the client sends a Close frame, upstream_sink.close() is already awaited here before returning ProxyCloseOrigin::ClientClosed. Then in the post-select! match block (lines 285–287), upstream_sink.close() is called a second time via the timeout wrapper.

The second call is typically harmless (the sink ignores a redundant close), but it is redundant and adds an unnecessary 5-second timeout wait for a connection that's already cleanly closed.

One way to avoid this is to skip the inner upstream_sink.close() and let the outer match block handle all close forwarding uniformly:

AxumMsg::Close(_) => {
    return ProxyCloseOrigin::ClientClosed;
}

This makes the post-select! block the single authoritative place for close forwarding.

};
if upstream_sink.send(tung_msg).await.is_err() {
return ProxyCloseOrigin::UpstreamError;
}
}
};
if upstream_sink.send(tung_msg).await.is_err() {
return;
Err(_) => return ProxyCloseOrigin::ClientError,
}
}
ProxyCloseOrigin::ClientClosed
};

// Upstream → Client
let upstream_to_client = async {
while let Some(Ok(msg)) = upstream_stream.next().await {
let axum_msg = match msg {
TungMsg::Text(t) => AxumMsg::Text(t),
TungMsg::Binary(b) => AxumMsg::Binary(b),
TungMsg::Ping(p) => AxumMsg::Ping(p),
TungMsg::Pong(p) => AxumMsg::Pong(p),
TungMsg::Close(_) => {
let _ = client_sink.close().await;
return;
while let Some(result) = upstream_stream.next().await {
match result {
Ok(msg) => {
let axum_msg = match msg {
TungMsg::Text(t) => AxumMsg::Text(t),
TungMsg::Binary(b) => AxumMsg::Binary(b),
TungMsg::Ping(p) => AxumMsg::Ping(p),
TungMsg::Pong(p) => AxumMsg::Pong(p),
TungMsg::Close(_) => {
let _ = client_sink.close().await;
return ProxyCloseOrigin::UpstreamClosed;
}
TungMsg::Frame(_) => continue,
};
if client_sink.send(axum_msg).await.is_err() {
return ProxyCloseOrigin::ClientError;
}
}
TungMsg::Frame(_) => continue,
};
if client_sink.send(axum_msg).await.is_err() {
return;
Err(_) => return ProxyCloseOrigin::UpstreamError,
}
}
ProxyCloseOrigin::UpstreamClosed
};

// Run both directions concurrently; when either ends, gracefully close the other side
let origin = tokio::select! {
origin = client_to_upstream => origin,
origin = upstream_to_client => origin,
};

// Run both directions concurrently; when either ends, drop both
tokio::select! {
_ = client_to_upstream => {},
_ = upstream_to_client => {},
// Forward close frame to the surviving side with a timeout to prevent hanging
let close_timeout = std::time::Duration::from_secs(5);
tracing::debug!("deepgram_ws_proxy: proxy ended ({:?})", origin);
match origin {
ProxyCloseOrigin::UpstreamClosed | ProxyCloseOrigin::UpstreamError => {
let _ = tokio::time::timeout(close_timeout, client_sink.close()).await;
}
ProxyCloseOrigin::ClientClosed | ProxyCloseOrigin::ClientError => {
let _ = tokio::time::timeout(close_timeout, upstream_sink.close()).await;
}
}

Ok(())
Expand Down
4 changes: 3 additions & 1 deletion desktop/CHANGELOG.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
{
"unreleased": [],
"unreleased": [
"Fixed WebSocket transcription disconnects: proper handshake detection, audio buffering during reconnection, unlimited retry with backoff, and thread-safe connection state"
],
"releases": [
{
"version": "0.11.186",
Expand Down
Loading
Loading