Skip to content

Commit

Permalink
fix(phoenix-channel): re-queue message upon send failure (#4294)
Browse files Browse the repository at this point in the history
Previously, we would lose one message to the portal upon failing to send
it. We now mitigate this in two ways:

1. We also check the error from `poll_ready` and don't even pop a
message off from our buffer.
2. If sending still fails, we re-queue it to the front of the buffer.

In certain scenarios as discovered in logs from #4058, this might have
caused a loss of the "answer" message from a gateway to the client,
resulting in a state mismatch where the gateway thinks the connection is
established and the client times out on waiting for the answer.
  • Loading branch information
thomaseizinger committed Mar 25, 2024
1 parent cd5cde6 commit 7e68dff
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions rust/phoenix-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,18 +335,26 @@ where
};

// Priority 1: Keep local buffers small and send pending messages.
if stream.poll_ready_unpin(cx).is_ready() {
if let Some(message) = self.pending_messages.pop_front() {
tracing::trace!(target: "wire", to="portal", %message);

match stream.start_send_unpin(Message::Text(message)) {
Ok(()) => {}
Err(e) => {
self.reconnect_on_transient_error(InternalError::WebSocket(e));
match stream.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => {
if let Some(message) = self.pending_messages.pop_front() {
match stream.start_send_unpin(Message::Text(message.clone())) {
Ok(()) => {
tracing::trace!(target: "wire", to="portal", %message);
}
Err(e) => {
self.pending_messages.push_front(message);
self.reconnect_on_transient_error(InternalError::WebSocket(e));
}
}
continue;
}
}
Poll::Ready(Err(e)) => {
self.reconnect_on_transient_error(InternalError::WebSocket(e));
continue;
}
Poll::Pending => {}
}

// Priority 2: Handle incoming messages.
Expand Down

0 comments on commit 7e68dff

Please sign in to comment.