reader (Rust): stream JSONL hot loops; use memchr crate#372
reader (Rust): stream JSONL hot loops; use memchr crate#372willwashburn merged 2 commits intomainfrom
Conversation
📝 WalkthroughWalkthroughPerformance optimization of relayburn-sdk JSONL reader hot paths: adds memchr dependency, refactors Claude and Codex session readers to stream lines incrementally with reusable buffers instead of pre-allocating multi-GB allocations, and accelerates newline detection via memchr. ChangesReader streaming and memchr optimization
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ab56ca69ad
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let n = match reader.read_until(b'\n', &mut line_buf) { | ||
| Ok(0) => break, | ||
| Ok(n) => n, | ||
| Err(_) => break, | ||
| }; |
There was a problem hiding this comment.
Propagate incremental read failures instead of truncating
The new streaming loop swallows read_until I/O errors by breaking and returning a normal parse result, which can silently drop the unread tail of a session if a mid-file read fails (for example on transient filesystem/network errors). Because parse_codex_session_incremental returns std::io::Result, callers expect real read failures to surface as errors rather than as partial success with an advanced resume state.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Good catch — fixed in 8df59ba. parse_codex_buffer now returns std::io::Result<ParseCodexIncrementalResult> and the loop uses ? to propagate, matching what claude.rs already did. Transient mid-file read failures now surface as errors instead of silently advancing the resume cursor.
Generated by Claude Code
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
crates/relayburn-sdk/src/reader/claude.rs (1)
2502-2502:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winReturn
last_user_textfrom the committed boundary, not the scan tail.
end_offsetcan back up before the last lines scanned, but the result always returns the finalcurrent_user_textfrom the whole read window. If a later plain-text user line lands after that backed-up boundary, the next incremental call is seeded with prompt text from beyond its start cursor and can classify the resumed turn against the wrong prompt. Please snapshot the latest plain-text user text at the same boundary you commit toend_offset.Suggested approach
+ let mut user_text_updates: Vec<(u64, String)> = Vec::new(); let mut current_user_text = options.last_user_text.clone().unwrap_or_default(); ... if let Some(text) = extract_plain_user_text_from_obj(&obj) { if !text.is_empty() { - current_user_text = text; + current_user_text = text.clone(); + user_text_updates.push((line_start_offset, text)); } } ... let end_offset = earliest_incomplete.unwrap_or(cursor_offset); + let committed_last_user_text = user_text_updates + .iter() + .rfind(|(off, _)| *off < end_offset) + .map(|(_, text)| text.clone()) + .unwrap_or_else(|| options.last_user_text.clone().unwrap_or_default()); ... end_offset, - last_user_text: current_user_text, + last_user_text: committed_last_user_text, evidence,Also applies to: 2615-2618, 2847-2848
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/relayburn-sdk/src/reader/claude.rs` at line 2502, The code currently seeds current_user_text from options.last_user_text or the scan tail, which can include text beyond the committed end_offset; instead capture (snapshot) the latest plain-text user text at the same boundary where you commit end_offset and use that snapshot to set current_user_text. Concretely, when you compute and commit end_offset, also compute committed_last_user_text (from the scan results up to that boundary) and replace the assignment to current_user_text (and the analogous spots referenced at lines ~2615-2618 and ~2847-2848) to use committed_last_user_text.unwrap_or_default() so incremental calls are seeded only with text within the committed window.crates/relayburn-sdk/src/reader/codex.rs (1)
337-342:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPropagate
read_untilfailures instead of treating them as EOF.The new streaming loop turns any mid-stream read error into a clean break and returns a partial parse as success. That can silently truncate turns/events after transient FS errors while still advancing callers with a bogus
end_offset.Suggested fix
-fn parse_codex_buffer<R: BufRead>( +fn parse_codex_buffer<R: BufRead>( mut reader: R, start_offset: u64, options: &ParseCodexIncrementalOptions, project_resolver: &ProjectResolver, -) -> ParseCodexIncrementalResult { +) -> std::io::Result<ParseCodexIncrementalResult> { ... loop { line_buf.clear(); - let n = match reader.read_until(b'\n', &mut line_buf) { - Ok(0) => break, - Ok(n) => n, - Err(_) => break, - }; + let n = reader.read_until(b'\n', &mut line_buf)?; + if n == 0 { + break; + } ... - ParseCodexIncrementalResult { + Ok(ParseCodexIncrementalResult { turns, content: content_out, events: events_out, user_turns: user_turns_out, relationships: relationships_out, tool_result_events: tool_events_out, end_offset: committed_end_offset, resume, - } + }) }And at the call site:
- Ok(parse_codex_buffer( + parse_codex_buffer( reader, start_offset, options, &project_resolver, - )) + )Also applies to: 398-401
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/relayburn-sdk/src/reader/codex.rs` around lines 337 - 342, In parse_codex_buffer, don’t treat read_until failures as EOF—propagate the I/O error back to the caller instead of breaking the streaming loop and returning a partial success; locate the loop that calls reader.read_until(...) inside parse_codex_buffer (and the similar block around lines referenced 398-401) and change the handling so any Err(e) from read_until is returned (or converted into the ParseCodexIncrementalResult error variant) rather than being interpreted as Ok(0) or a normal loop break, ensuring end_offset is only advanced on successful reads and remains unchanged on I/O errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@crates/relayburn-sdk/src/reader/claude.rs`:
- Line 2502: The code currently seeds current_user_text from
options.last_user_text or the scan tail, which can include text beyond the
committed end_offset; instead capture (snapshot) the latest plain-text user text
at the same boundary where you commit end_offset and use that snapshot to set
current_user_text. Concretely, when you compute and commit end_offset, also
compute committed_last_user_text (from the scan results up to that boundary) and
replace the assignment to current_user_text (and the analogous spots referenced
at lines ~2615-2618 and ~2847-2848) to use
committed_last_user_text.unwrap_or_default() so incremental calls are seeded
only with text within the committed window.
In `@crates/relayburn-sdk/src/reader/codex.rs`:
- Around line 337-342: In parse_codex_buffer, don’t treat read_until failures as
EOF—propagate the I/O error back to the caller instead of breaking the streaming
loop and returning a partial success; locate the loop that calls
reader.read_until(...) inside parse_codex_buffer (and the similar block around
lines referenced 398-401) and change the handling so any Err(e) from read_until
is returned (or converted into the ParseCodexIncrementalResult error variant)
rather than being interpreted as Ok(0) or a normal loop break, ensuring
end_offset is only advanced on successful reads and remains unchanged on I/O
errors.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: bfaff264-5c75-4410-a430-f9744fff00be
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
CHANGELOG.mdCargo.tomlcrates/relayburn-sdk/Cargo.tomlcrates/relayburn-sdk/src/reader/claude.rscrates/relayburn-sdk/src/reader/codex.rs
Address review feedback on #372: the codex streaming loop was swallowing `read_until` failures via `Err(_) => break`, which would silently truncate the parse at a transient mid-file read error and advance the resume cursor as if the bytes had been processed. The claude.rs equivalents already use `?` to propagate. Bubble the error by changing `parse_codex_buffer` to return `std::io::Result<ParseCodexIncrementalResult>` and using `?` in the loop, matching `parse_codex_session_incremental`'s outer signature.
|
@copilot resolve the merge conflicts in this pull request |
The Claude / Codex incremental and prescan paths previously built a multi-GB up-front buffer (`vec![0u8; (size - start_offset) as usize]`, `Vec::with_capacity((size - start_offset) as usize)` + `read_to_end`) to scan a session log. Switch to BufReader + `read_until(b'\n', ...)` into a reused line buffer so only the longest single line stays resident, regardless of file size. The main `parse_claude_session` loop also moves off `BufReader::lines()` (a fresh `String` per line) onto `read_line` into a reused `String`, keeping per-line allocation bounded by the longest line for sessions with tens of thousands of turns. `memchr_newline` in the codex parser was named for `memchr` but did `buf.iter().position(|&b| b == b'\n')`. Wire the actual `memchr` crate (already a transitive dep through `regex`) for SIMD-accelerated line splits; pinned at the workspace root and depended on directly from `relayburn-sdk` so we own the version. Closes #323.
Address review feedback on #372: the codex streaming loop was swallowing `read_until` failures via `Err(_) => break`, which would silently truncate the parse at a transient mid-file read error and advance the resume cursor as if the bytes had been processed. The claude.rs equivalents already use `?` to propagate. Bubble the error by changing `parse_codex_buffer` to return `std::io::Result<ParseCodexIncrementalResult>` and using `?` in the loop, matching `parse_codex_session_incremental`'s outer signature.
8df59ba to
459e1cf
Compare
Resolved — this branch now merges cleanly with |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
crates/relayburn-sdk/src/reader/claude.rs (1)
2310-2332: ⚡ Quick winAdd a regression fixture for newline-boundary EOF behavior.
These loops now define the incremental commit boundary. A fixture that distinguishes a complete final JSON object without
\nfrom a genuinely truncated tail would make later refactors much safer, especially since the sync and incremental entry points currently take different EOF paths.Also applies to: 2521-2545
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/relayburn-sdk/src/reader/claude.rs` around lines 2310 - 2332, Create a regression fixture that verifies EOF handling at newline boundaries for the prescan loops that use reader.read_until and line_buf: write tests that feed a complete JSON object without a trailing '\n' and assert it is treated as a complete final entry (not truncated), and also feed a genuinely truncated tail and assert it is rejected/ignored; add assertions around the incremental commit behavior (using the same variables/logic as in the loop that references reader, line_buf, last_assistant_message_id, next_event_index) and duplicate the fixture for the similar loop later in the file that uses the same read_until<<newline>> logic so future refactors won’t change EOF semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@crates/relayburn-sdk/src/reader/claude.rs`:
- Around line 2310-2332: Create a regression fixture that verifies EOF handling
at newline boundaries for the prescan loops that use reader.read_until and
line_buf: write tests that feed a complete JSON object without a trailing '\n'
and assert it is treated as a complete final entry (not truncated), and also
feed a genuinely truncated tail and assert it is rejected/ignored; add
assertions around the incremental commit behavior (using the same
variables/logic as in the loop that references reader, line_buf,
last_assistant_message_id, next_event_index) and duplicate the fixture for the
similar loop later in the file that uses the same read_until<<newline>> logic so
future refactors won’t change EOF semantics.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 9487cdd0-c160-413c-9656-9c337de6cf24
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
CHANGELOG.mdCargo.tomlcrates/relayburn-sdk/Cargo.tomlcrates/relayburn-sdk/src/reader/claude.rscrates/relayburn-sdk/src/reader/codex.rs
✅ Files skipped from review due to trivial changes (3)
- Cargo.toml
- crates/relayburn-sdk/Cargo.toml
- CHANGELOG.md
Summary
Tackles the two adjacent wins called out in #323. The full typed-shell refactor (the issue's primary "single biggest reader perf win" item) is not in this PR — the existing Claude/Codex parsers thread
&serde_json::Valueand&Map<String, Value>deeply through their helpers, so a meaningful shell would either need the helper signatures rewritten or a deserialize / re-serialize round trip that's a perf regression for matched lines. The streaming + memchr changes here are independently valuable and unblock the larger refactor.Multi-GB up-front allocations → BufReader streaming
The Claude incremental and prescan paths built a
(size - start_offset)-byte buffer up front:Codex incremental had the same shape (
vec![0u8; (size - start_offset) as usize]+read_exact). All three switch toBufReader+read_until(b'\n', &mut line_buf)into a reused line buffer, so only the longest single line stays resident regardless of file size. Cursor offsets are tracked by accumulatingnreturned byread_until; partial trailing lines (no\n) break the loop so the next incremental call resumes from a committed\nboundary, matching the prior semantics.parse_codex_bufferswitched frombuf: &[u8]to<R: BufRead>(mut reader: R, ...). Onlyparse_codex_session_incrementalcalls it.BufReader::lines()→ reusedStringThe main
parse_claude_session_with_counterloop allocated a freshStringper line viaBufReader::lines(). Replaced withread_line(&mut line)into a single reused buffer.memchr_newlineactually usesmemchrreader/codex.rs:1231was named formemchrbut didbuf.iter().position(|&b| b == b'\n'). Now wired tomemchr::memchr(b'\n', buf). Thememchrcrate is already pulled in transitively throughregex; added at the workspace root and depended on directly fromrelayburn-sdkso we own the version.Test plan
cargo test --workspace— all 618 SDK unit tests + 2 integration tests + bindings + doctests pass on the streaming paths (incremental cursor handling, prescan, full-file parse all exercised by existing tests).burn ingestto confirm RSS no longer scales with file size (manual; reviewer can defer).Out of scope
Typed
#[derive(Deserialize)]shells for the per-line shape — see issue body. Worth a follow-up that also refactors the&Value-taking helpers, since the win comes from skipping theBTreeMapbuild, not from the deserialize itself.Closes #323.
https://claude.ai/code/session_01Mu7DKodN2MX4qWD55FW5zg
Generated by Claude Code