relayburn-ingest: port per-harness orchestration loops (#277)#297
relayburn-ingest: port per-harness orchestration loops (#277)#297willwashburn merged 4 commits intomainfrom
Conversation
Fills in the bodies of `ingest_claude_into`, `ingest_codex_into`, `ingest_opencode_into`, and the `ingest_claude_session` fast-path that were stubbed in #245. Each helper drives the matching reader parser incrementally with cursor + state carry-over, appends every record kind through the `Ledger` writer surface, and (for codex / opencode) folds matching pending-stamp manifests into the freshly-discovered session via `resolve_pending_stamps_for_session`. `ingest_claude_into` runs `reconcile_claude_session_relationships` once at the end of the pass so cross-file fork / continuation rows are emitted alongside per-file relationships. `ingest_claude_session` (the per-session fast-path used by the `burn run claude` adapter after the child exits) now runs the synchronous `parse_claude_session`, appends, and persists a Claude cursor at EOF so a follow-up `ingest_all` sweep skips the file. Content-mode is sourced from `load_config().content.store` (#279) and defaults to `Full` if the config layer errors. Gap-warning and `reingest_missing_content` call sites are intentionally stubbed out with `TODO(#278)` markers; the sibling PR fills them in. Adds `tests/orchestration.rs` (5 tests): - `ingest_claude_projects_round_trips_a_fixture_session` - `ingest_codex_sessions_round_trips_a_fixture_session` - `ingest_opencode_sessions_round_trips_a_fixture_session` - `ingest_all_walks_each_harness_root_once` - `ingest_claude_session_writes_eof_cursor_so_followup_skips_file` Each test pins all three IngestRoots under a temp dir and serializes on `RELAYBURN_HOME` via a module-level mutex so a parallel cargo test run can't see a leaked env mutation. The follow-up sweep in the last test asserts `appendedTurns == 0`, proving the EOF cursor wins. Also exposes `derive_codex_session_id` from the public surface — #278's `reingest_missing_content` body needs it for the codex skip-existing filter. cargo test -p relayburn-ingest: 32 tests pass (18 unit + 5 orchestration + 5 pending_stamps + 3 watch_loop + 1 doc). cargo build --workspace + cargo test --workspace clean. pnpm run build + pnpm run test still green (873 TS tests). Closes #277. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR implements the per-harness ingestion orchestration in ChangesOrchestration and Cursor-Driven Ingestion
Sequence DiagramsequenceDiagram
participant Walk as Directory Walker
participant FSMeta as Filesystem Meta
participant Cursor as Cursor Store
participant Parse as Parser (incremental)
participant Ledger as Ledger
loop For each discovered JSONL file
Walk->>FSMeta: stat file (inode, mtime, size)
FSMeta-->>Walk: metadata
Walk->>Cursor: load cursor for file
Cursor-->>Walk: last offset / inode / last_user_text
alt Rotated or reset condition
Walk->>Cursor: reset cursor state
end
Walk->>Parse: parse from offset with content_mode & carried state
Parse-->>Walk: turns, content, compactions, relationships
Walk->>Ledger: append parsed entities
Ledger-->>Walk: ack
Walk->>Cursor: persist updated cursor (offset, mtime, inode, carried state)
end
Walk->>Walk: reconcile cross-file relationships (Claude)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
crates/relayburn-ingest/src/ingest.rs (1)
909-909: 💤 Low valueUnnecessary sort on already-sorted array.
The
expected_dashesarray[8, 13, 18, 23]is already in sorted order; the.sort()call has no effect.♻️ Proposed fix
fn is_uuid(s: &str) -> bool { let groups = [8usize, 4, 4, 4, 12]; - let mut expected_dashes = [8usize, 13, 18, 23]; - expected_dashes.sort(); + let expected_dashes = [8usize, 13, 18, 23]; if s.len() != groups.iter().sum::<usize>() + (groups.len() - 1) {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/relayburn-ingest/src/ingest.rs` at line 909, The array assigned to expected_dashes is already sorted ([8, 13, 18, 23]), so remove the redundant .sort() call that follows its declaration to avoid an unnecessary operation; locate the declaration of the variable expected_dashes in ingest.rs (the let mut expected_dashes = [8usize, 13, 18, 23]; line) and delete the subsequent expected_dashes.sort() invocation.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@crates/relayburn-ingest/src/ingest.rs`:
- Line 909: The array assigned to expected_dashes is already sorted ([8, 13, 18,
23]), so remove the redundant .sort() call that follows its declaration to avoid
an unnecessary operation; locate the declaration of the variable expected_dashes
in ingest.rs (the let mut expected_dashes = [8usize, 13, 18, 23]; line) and
delete the subsequent expected_dashes.sort() invocation.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: f0af3980-721a-4cdd-9db4-ec0e9bf64bd6
📒 Files selected for processing (3)
crates/relayburn-ingest/src/ingest.rscrates/relayburn-ingest/src/lib.rscrates/relayburn-ingest/tests/orchestration.rs
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 130ae73b4d
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| inode: file_inode(&meta), | ||
| offset_bytes: meta.len(), | ||
| mtime_ms: mtime_ms(&meta), | ||
| last_user_text: None, |
There was a problem hiding this comment.
Preserve Claude resume context in fast-path cursor
ingest_claude_session always persists last_user_text: None in the Claude cursor, but later incremental ingests depend on that seed to classify resumed assistant completions correctly when the prompt was before the resume offset. After this fast-path runs, a follow-up ingest_all on a session that later appends a completion can parse the turn without its prior prompt context and misclassify activity/user-turn attribution. Persisting resume context (or using the incremental parser result for cursor fields) avoids this regression path.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Skipping — this matches the existing TS behavior in ingestClaudeSession (packages/ingest/src/ingest.ts:285–291), which also writes the cursor with lastUserText unset:
const cursor: ClaudeCursor = {
kind: 'claude',
inode: st.ino,
offsetBytes: st.size,
mtimeMs: st.mtimeMs,
};The fast path uses the non-incremental parse_claude_session, which doesn't expose last_user_text on its ParseResult — only parse_claude_session_incremental does. Threading that field through the full parser is a TS-side change (and a divergence from #277's "faithful port" scope). If the misclassification path the comment describes is real, it's an existing TS bug worth filing separately rather than fixing only on the Rust side here.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Addressed bot feedback on
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/relayburn-ingest/src/ingest.rs`:
- Around line 377-385: When a file is skipped because start_offset >= size (EOF)
the code currently updates prior_claude.mtime_ms and stores it back into cursors
then continues, which prevents unchanged sessions from ever producing
ReconcileClaudeRelationshipsInput and breaks relationship linking (also see
similar block around the ingest_claude_session fast-path at 430-452). Update the
EOF-skip path so that when prior_claude exists you also emit or enqueue a
ReconcileClaudeRelationshipsInput (built from prior_claude) into the same
reconciliation collection used for parsed sessions (or mark the cursor so the
reconciler will load it later) instead of letting the file be permanently
skipped by the ingest_claude_session fast-path; keep updating mtime_ms and
storing FileCursor::Claude(c) as before but ensure the reconciler receives the
unchanged session’s relationship info so forks/continuations can be linked.
- Around line 552-566: The call to resolve_pending_stamps_for_session is
currently ignored which can drop pre-spawn manifests; change the code around
resolve_pending_stamps_for_session(...) (the call that builds
PendingStampSessionCandidate with PendingStampHarness::Codex and the analogous
block at lines 666-676) to check its Result and on Err either propagate the
error out of the enclosing function (use ? or map_err to return a meaningful
error) or, if you cannot return an error there, prevent advancing/committing the
session cursor (i.e., bail/return early or skip the commit) so the failed stamp
resolution will be retried later; apply the same Result-handling fix to the
other harness branch as well.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 206f5a0b-2b31-4937-b15e-5db1502b9d48
📒 Files selected for processing (1)
crates/relayburn-ingest/src/ingest.rs
| if !rotated && start_offset >= size { | ||
| // Nothing new; refresh mtime bookkeeping and skip parse + | ||
| // reconciliation evidence — `relationshipIdHash` dedup keeps | ||
| // re-emits idempotent. | ||
| if let Some(mut c) = prior_claude.clone() { | ||
| c.mtime_ms = mtime; | ||
| cursors.insert(key, FileCursor::Claude(c)); | ||
| } | ||
| continue; |
There was a problem hiding this comment.
Claude reconciliation misses relationships to unchanged sessions.
EOF-skipped files never contribute ReconcileClaudeRelationshipsInput, so the final reconciliation only sees sessions parsed in this sweep. That means a newly discovered fork/continuation can miss its link to an older unchanged session, and the ingest_claude_session fast-path makes this permanent by parking that older file at EOF immediately.
Also applies to: 430-452
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/relayburn-ingest/src/ingest.rs` around lines 377 - 385, When a file is
skipped because start_offset >= size (EOF) the code currently updates
prior_claude.mtime_ms and stores it back into cursors then continues, which
prevents unchanged sessions from ever producing
ReconcileClaudeRelationshipsInput and breaks relationship linking (also see
similar block around the ingest_claude_session fast-path at 430-452). Update the
EOF-skip path so that when prior_claude exists you also emit or enqueue a
ReconcileClaudeRelationshipsInput (built from prior_claude) into the same
reconciliation collection used for parsed sessions (or mark the cursor so the
reconciler will load it later) instead of letting the file be permanently
skipped by the ingest_claude_session fast-path; keep updating mtime_ms and
storing FileCursor::Claude(c) as before but ensure the reconciler receives the
unchanged session’s relationship info so forks/continuations can be linked.
| if !parsed.turns.is_empty() { | ||
| if let Some(sid) = &codex_session_id { | ||
| let cwd = next_resume | ||
| .session_cwd | ||
| .clone() | ||
| .or_else(|| parsed.turns.first().and_then(|t| t.project.clone())); | ||
| let candidate = PendingStampSessionCandidate { | ||
| harness: PendingStampHarness::Codex, | ||
| session_id: sid.clone(), | ||
| session_path: Some(file.clone()), | ||
| session_mtime_ms: Some(mtime), | ||
| cwd, | ||
| }; | ||
| let _ = resolve_pending_stamps_for_session(ledger, &candidate); | ||
| } |
There was a problem hiding this comment.
Don’t discard pending-stamp resolution failures.
Both harnesses ignore resolve_pending_stamps_for_session(...). If that write fails, the session still appends turns and advances its cursor, so the pre-spawn manifest is silently dropped and won’t be retried on later sweeps. Propagate the error, or at least avoid committing the cursor when stamp resolution fails.
Also applies to: 666-676
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/relayburn-ingest/src/ingest.rs` around lines 552 - 566, The call to
resolve_pending_stamps_for_session is currently ignored which can drop pre-spawn
manifests; change the code around resolve_pending_stamps_for_session(...) (the
call that builds PendingStampSessionCandidate with PendingStampHarness::Codex
and the analogous block at lines 666-676) to check its Result and on Err either
propagate the error out of the enclosing function (use ? or map_err to return a
meaningful error) or, if you cannot return an error there, prevent
advancing/committing the session cursor (i.e., bail/return early or skip the
commit) so the failed stamp resolution will be retried later; apply the same
Result-handling fix to the other harness branch as well.
…7-ingest-orchestration # Conflicts: # crates/relayburn-ingest/src/ingest.rs # crates/relayburn-ingest/src/lib.rs
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
crates/relayburn-ingest/src/ingest.rs (2)
283-294:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftClaude reconciliation still excludes unchanged sessions.
EOF-skipped files never contribute
ReconcileClaudeRelationshipsInput, andingest_claude_session()immediately parks fast-pathed files at EOF. That means a newly parsed fork/continuation can only reconcile against sessions re-parsed in this sweep, so links to older unchanged sessions are still missed.Also applies to: 352-360, 423-427
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/relayburn-ingest/src/ingest.rs` around lines 283 - 294, The current logic persists an EOF cursor (building ClaudeCursor and calling save_cursor_changes) before reconciliation, causing unchanged sessions to be excluded from ReconcileClaudeRelationshipsInput and preventing links to older sessions; modify ingest flow so either (A) defer persisting/updating the cursor (the ClaudeCursor/FileCursor::Claude and save_cursor_changes call) until after ingest_claude_session() and reconciliation have been run, or (B) if you must fast-path and park EOF files immediately, still emit or queue a ReconcileClaudeRelationshipsInput for that file using the existing session record so unchanged sessions are included in reconciliation; update the code paths around load_cursors, the ClaudeCursor creation, and save_cursor_changes to implement one of these two fixes.
527-541:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDon’t advance past failed pending-stamp resolution.
Both harnesses discard
resolve_pending_stamps_for_session(...). If that write fails, turns still append and the cursor still advances, so the pre-spawn manifest is effectively dropped and won’t be retried on later sweeps.Suggested fix
- let _ = resolve_pending_stamps_for_session(ledger, &candidate); + resolve_pending_stamps_for_session(ledger, &candidate)?;Apply the same change in the OpenCode branch as well.
Also applies to: 642-651
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/relayburn-ingest/src/ingest.rs` around lines 527 - 541, The code currently ignores the Result from resolve_pending_stamps_for_session, allowing cursor advancement even if the pending-stamp write fails; change the call in both the Codex branch (where PendingStampHarness::Codex and PendingStampSessionCandidate are constructed) and the OpenCode branch to propagate errors instead of discarding them — e.g. capture the Result and use ? (or return Err) so that on failure you abort/return early and avoid appending turns or advancing next_resume; keep the candidate construction (session_id, session_path, session_mtime_ms, cwd) as-is and only change how resolve_pending_stamps_for_session(ledger, &candidate) is handled.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/relayburn-ingest/src/ingest.rs`:
- Around line 241-248: The code takes offset_bytes from the initial
fs::metadata(&file) (meta) before calling parse_claude_session(), which can
cause replay if the file grows during parsing; after calling
parse_claude_session() (and before saving the cursor), re-stat the file (e.g.,
fs::metadata(&file) or file.metadata()) and derive offset_bytes from that fresh
metadata (replace uses of the pre-parse meta/offset_bytes with the post-parse
stat), and apply the same change for the other occurrences noted (the blocks
around lines 250-256 and 283-289) so the saved cursor always points to the true
EOF after parsing.
---
Duplicate comments:
In `@crates/relayburn-ingest/src/ingest.rs`:
- Around line 283-294: The current logic persists an EOF cursor (building
ClaudeCursor and calling save_cursor_changes) before reconciliation, causing
unchanged sessions to be excluded from ReconcileClaudeRelationshipsInput and
preventing links to older sessions; modify ingest flow so either (A) defer
persisting/updating the cursor (the ClaudeCursor/FileCursor::Claude and
save_cursor_changes call) until after ingest_claude_session() and reconciliation
have been run, or (B) if you must fast-path and park EOF files immediately,
still emit or queue a ReconcileClaudeRelationshipsInput for that file using the
existing session record so unchanged sessions are included in reconciliation;
update the code paths around load_cursors, the ClaudeCursor creation, and
save_cursor_changes to implement one of these two fixes.
- Around line 527-541: The code currently ignores the Result from
resolve_pending_stamps_for_session, allowing cursor advancement even if the
pending-stamp write fails; change the call in both the Codex branch (where
PendingStampHarness::Codex and PendingStampSessionCandidate are constructed) and
the OpenCode branch to propagate errors instead of discarding them — e.g.
capture the Result and use ? (or return Err) so that on failure you abort/return
early and avoid appending turns or advancing next_resume; keep the candidate
construction (session_id, session_path, session_mtime_ms, cwd) as-is and only
change how resolve_pending_stamps_for_session(ledger, &candidate) is handled.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: d1570385-251a-426c-b803-5ff691cc99a0
📒 Files selected for processing (1)
crates/relayburn-ingest/src/ingest.rs
`ingest_claude_session` was building the EOF cursor from the pre-parse fs::metadata, but `parse_claude_session` reads via `BufReader::lines()` to actual EOF — so if the file grew during parse, the parser consumed the new bytes but the saved cursor still pointed at the pre-parse size. A follow-up `ingest_all` would replay that range and emit duplicate turns. Re-stat after parse (and before saving the cursor) so the cursor reflects what was actually read. Diverges from TS, which has the same race window; cursor position is internal state, so this stays parity-safe. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
crates/relayburn-ingest/src/ingest.rs (1)
711-716: 💤 Low valuePotential truncation when cumulative exceeds
i64::MAX.Casting
u64toi64at lines 712-715 can overflow if cumulative token counts exceed 2^63. In practice this is unlikely, but a defensive.min(i64::MAX as u64) as i64would prevent silent data corruption on extreme values.♻️ Defensive casting
CodexResumeState { cumulative: ReaderCumulativeUsage { - input: c.cumulative.input as i64, - output: c.cumulative.output as i64, - cache_read: c.cumulative.cache_read as i64, - reasoning: c.cumulative.reasoning as i64, + input: c.cumulative.input.min(i64::MAX as u64) as i64, + output: c.cumulative.output.min(i64::MAX as u64) as i64, + cache_read: c.cumulative.cache_read.min(i64::MAX as u64) as i64, + reasoning: c.cumulative.reasoning.min(i64::MAX as u64) as i64, },🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/relayburn-ingest/src/ingest.rs` around lines 711 - 716, The current casts of c.cumulative.* from u64 to i64 (used when constructing ReaderCumulativeUsage in ingest.rs) can overflow; update each field (input, output, cache_read, reasoning) to clamp the u64 to i64::MAX before casting (e.g. replace direct as i64 with (.min(i64::MAX as u64) as i64)) so extreme counts never silently wrap or truncate while keeping the same ReaderCumulativeUsage field types.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@crates/relayburn-ingest/src/ingest.rs`:
- Around line 711-716: The current casts of c.cumulative.* from u64 to i64 (used
when constructing ReaderCumulativeUsage in ingest.rs) can overflow; update each
field (input, output, cache_read, reasoning) to clamp the u64 to i64::MAX before
casting (e.g. replace direct as i64 with (.min(i64::MAX as u64) as i64)) so
extreme counts never silently wrap or truncate while keeping the same
ReaderCumulativeUsage field types.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 32661740-38e9-4e71-9f53-9e1487e9ae7b
📒 Files selected for processing (1)
crates/relayburn-ingest/src/ingest.rs
Summary
ingest_claude_into,ingest_codex_into,ingest_opencode_into, and theingest_claude_sessionfast-path that were stubbed in [Rust port] relayburn-ingest: discovery + watch loop (poll-based) #245. Each helper drives the matching reader parser incrementally with cursor + state carry-over, appends every record kind through theLedgerwriter surface, and (for codex / opencode) folds matching pending-stamp manifests into freshly-discovered sessions. Content-mode is sourced fromload_config().content.store([Rust port] relayburn-ledger: list_content_session_ids + Config / loadConfig surface #279) and defaults toFullif the config layer errors.reingest_missing_contentcall sites are intentionally stubbed out withTODO(#278)markers — the sibling PR fills them in. Also exposesderive_codex_session_idfrom the public surface so [Rust port] relayburn-ingest: gap-warning state machine + reingest_missing_content #278'sreingest_missing_contentbody has the codex skip-existing filter it needs.tests/orchestration.rswith 5 round-trip integration tests: per-harness session through the full parse-and-append path, aningest_allcross-harness sweep, and a fast-path EOF-cursor test that asserts a follow-upingest_allreportsappendedTurns == 0. Each test pins all threeIngestRootsunder a temp dir and serializes onRELAYBURN_HOMEvia a module-level mutex so parallel cargo test runs can't race.Test plan
cargo test -p relayburn-ingest(32 tests pass: 18 unit + 5 orchestration + 5 pending_stamps + 3 watch_loop + 1 doc).cargo build --workspace --all-targetsclean.cargo test --workspaceclean.cargo clippy -p relayburn-ingest --all-targetsclean.pnpm run build && pnpm run testclean (873 TS tests pass).Closes #277.
🤖 Generated with Claude Code