fix(broker): preserve split multi-byte UTF-8 in worker_stream (#922)#923
Conversation
PTY reads can land between bytes of a multi-byte UTF-8 codepoint, and `String::from_utf8_lossy` on each chunk replaces the partial sequence with `U+FFFD`. Downstream `worker_stream` consumers (Pear, attached CLIs) then render box/line glyphs, emoji, and CJK characters as `?`. Introduce `Utf8StreamDecoder`, a small stateful decoder that holds the incomplete trailing bytes for the next chunk and only emits replacement characters for byte sequences that are definitively invalid. Use it for both the steady-state PTY read path and the watchdog drain at child exit, and flush any leftover incomplete bytes when the stream finally closes. Unit tests cover ASCII, every byte-boundary split for U+2500, U+2588, U+4E2D (CJK), and U+1F600 (emoji), full byte-by-byte streaming of a mixed string, mid-stream invalid bytes, and end-of-stream truncation.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughAdds a stateful Utf8StreamDecoder, exports it, and replaces lossy per-chunk decoding in PTY startup and watchdog paths with streaming decoding and a final flush so multi-byte UTF-8 characters split across reads are preserved. ChangesStreaming UTF-8 Decoder for PTY Output
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🟡 Missing utf8_decoder.flush() in pty_closed termination path silently drops trailing bytes
When the PTY channel closes normally (pty_rx.recv() returns None at line 871), the utf8_decoder is not flushed, so any incomplete multi-byte codepoint held in utf8_decoder.pending is silently dropped. This contradicts the PR's intent: the watchdog exit path (crates/broker/src/pty_worker.rs:1134-1143) correctly calls utf8_decoder.flush() to emit the remaining bytes as U+FFFD, but the pty_closed path does not. When the PTY reader closes, no further chunks will arrive to complete the pending bytes, so they are permanently lost — neither emitted via worker_stream nor included in the echo_buffer/last_output.
(Refers to lines 871-875)
Prompt for agents
In the `pty_closed` branch of `pty_rx.recv() => None` (around line 871 in crates/broker/src/pty_worker.rs), the utf8_decoder is not flushed before the stream_buffer flush and exit payload construction. The watchdog path (lines 1134-1143) shows the correct pattern: after draining all chunks through utf8_decoder.decode(), call utf8_decoder.flush() and, if non-empty, append the result to the stream_buffer (so it gets emitted via worker_stream) and to the echo_buffer (so it's included in last_output). The fix should add a utf8_decoder.flush() call before flush_stream_buffer!() in the None branch, appending any non-empty result to stream_buffer and echo_buffer.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 08e0e4e3da
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let tail = utf8_decoder.flush(); | ||
| if !tail.is_empty() { |
There was a problem hiding this comment.
Flush UTF-8 decoder on normal PTY channel close
Utf8StreamDecoder is flushed only in the watchdog exit path, but not when pty_rx.recv() returns None (the normal reader-closed path). If the last PTY chunk ends with an incomplete multi-byte sequence, decode() buffers those bytes and this branch never emits them, so the final output is silently dropped instead of producing the expected replacement character. This can truncate worker_stream/last_output tails on clean exits where the watchdog branch is not the one that runs.
Useful? React with 👍 / 👎.
When pty_rx.recv() returns None (the PTY reader closed cleanly), the streaming decoder was not flushed, so any incomplete trailing UTF-8 bytes were silently dropped instead of being emitted to worker_stream and echo_buffer. The watchdog exit path already handled this; mirror the same flush-and-append pattern here so clean exits don't truncate output that the watchdog branch would have surfaced.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/broker/src/pty_worker.rs (1)
871-900:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winMissing decoder flush in normal EOF path causes data loss.
When
pty_rx.recv()returnsNone(PTY reader closed), any incomplete trailing UTF-8 bytes buffered inutf8_decoderare silently dropped. The watchdog path (lines 1134–1143) correctly callsutf8_decoder.flush()before exit, but this path does not.If the last PTY chunk contained an incomplete multi-byte sequence (e.g., process crashed mid-output), those bytes will be lost rather than emitted as
U+FFFDreplacement characters. This violates the PR objective: "flushes remaining buffered bytes at end-of-stream."🔧 Proposed fix: flush decoder before agent_exit
None => { // PTY reader closed — child likely exited. Flush // any buffered stream output before sending // agent_exit to preserve output ordering. flush_stream_buffer!(); + // Flush any incomplete trailing bytes from the decoder. + let tail = utf8_decoder.flush(); + if !tail.is_empty() { + echo_buffer.push_str(&tail); + let _ = send_frame(&out_tx, "worker_stream", None, json!({ + "stream": "stdout", + "chunk": tail, + })).await; + } // Emit agent_exit with any echo_buffer tail so the // dashboard can surface the CLI's last output. let clean = strip_ansi(&echo_buffer);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/broker/src/pty_worker.rs` around lines 871 - 900, When handling the PTY EOF branch where pty_rx.recv() returns None (inside the None => block), flush the UTF-8 decoder (utf8_decoder.flush()) and append any replacement-decoded text to echo_buffer before building exit_payload and calling send_frame("agent_exit", ...); specifically, replicate the watchdog logic that calls utf8_decoder.flush() to push any pending bytes (as U+FFFD) into echo_buffer so trailing incomplete multi-byte sequences are not dropped prior to trimming and sending last_output; update the None => path around echo_buffer/trimmed/exit_payload accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@crates/broker/src/pty_worker.rs`:
- Around line 871-900: When handling the PTY EOF branch where pty_rx.recv()
returns None (inside the None => block), flush the UTF-8 decoder
(utf8_decoder.flush()) and append any replacement-decoded text to echo_buffer
before building exit_payload and calling send_frame("agent_exit", ...);
specifically, replicate the watchdog logic that calls utf8_decoder.flush() to
push any pending bytes (as U+FFFD) into echo_buffer so trailing incomplete
multi-byte sequences are not dropped prior to trimming and sending last_output;
update the None => path around echo_buffer/trimmed/exit_payload accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: e74ff2df-792f-4f34-9f57-5998e639237c
📒 Files selected for processing (4)
CHANGELOG.mdcrates/broker/src/pty_worker.rscrates/broker/src/util/mod.rscrates/broker/src/util/utf8_stream.rs
Fixes #922.
Problem
crates/broker/src/pty_worker.rsdecoded each raw PTY chunk withString::from_utf8_lossy(&chunk)in two places (the steady-state read path and the watchdog drain). When a multi-byte UTF-8 codepoint — box-drawing glyphs, emoji, CJK characters — landed across a PTY read boundary, the partial byte sequence was replaced withU+FFFD.worker_streamconsumers then rendered�/?even though the broker's VT snapshot path stayed correct, which matches what Pear was seeing in Claude Code box UI output.Fix
New
crates/broker/src/util/utf8_stream.rsexposesUtf8StreamDecoder:decodecalls and combines them with the next chunk.U+FFFDonly for byte sequencesstd::str::from_utf8reports as definitively invalid (non-zeroerror_len), never for incomplete-but-recoverable sequences.flush()drains any remaining buffered bytes at end-of-stream and substitutesU+FFFDfor the now-definitive truncation, so we don't silently drop bytes on child exit.pty_worker.rsowns one decoder for the worker's lifetime and routes both the mainpty_rx.recv()arm and the watchdogtry_recvdrain through it. Raw bytes are still fed to the VT parser unchanged; only theworker_streamtext payload changes.Tests
12 new unit tests in
util::utf8_stream::testscover:decodecalls.U+2500(E2 94 80) split into[E2]then[94 80]— the case from the issue.U+2588(3-byte),U+4E2D中 (3-byte CJK), andU+1F600😀 (4-byte emoji).0x41 0xFF 0x42→A\u{FFFD}B) and invalid continuation after a valid lead byte.flush()emitsU+FFFDfor a truncated tail and is idempotent.String::from_utf8_lossyfor fully-formed inputs (no behavior change when chunks are already complete).cargo test -p agent-relay-broker --lib→ 625 passed, 0 failed.cargo clippy -p agent-relay-broker --no-deps→ clean.Test plan
cargo test -p agent-relay-broker --lib utf8_stream)cargo clippy -p agent-relay-broker --no-depscleanworker_streamGenerated by Claude Code