relayburn-ingest: port gap-warning + reingest_missing_content (#278)#296
relayburn-ingest: port gap-warning + reingest_missing_content (#278)#296willwashburn merged 2 commits intomainfrom
Conversation
…ontent (#278) Port the per-process content-capture gap tracker and the `reingest_missing_content` verb from `@relayburn/ingest`. The gap state lives behind an `OnceLock<Mutex<GapState>>` so suppression survives across multiple `ingest_*` calls in a single binary invocation (matches the TS `moduleGapState` lifetime exactly). Suppression semantics mirror TS verbatim: one warning per fresh affected session, silent on steady-state, re-fires after the affected set decays back to empty (or when churn introduces a fresh affected session id even at a flat net count). - `crates/relayburn-ingest/src/gap.rs` — `record_session_gap`, `emit_gap_warning`, `count_tool_call_gaps`, `count_new_tool_calls`, `count_new_tool_results`, `reset_ingest_gap_warnings`, `set_ingest_gap_writer`, `restore_ingest_gap_writer`, `AdapterName`, `ToolCallGapCounts`. Eight tests cover the TS suppression cases (fresh emit, steady-state silent, decay-to-zero re-fire, churn-keeps-flat re-fire, healed-immune, content-mode-not-full silent, empty-session-id ignore, on_warn callback override). - `crates/relayburn-ingest/src/reingest.rs` — `reingest_missing_content` walks every harness's source files, AND-skips sessions present in both `content` and `user_turns`, then re-parses the rest with `ContentStoreMode::Full` and appends only filtered records. `derive_codex_session_id` ports the rollout-uuid-suffix regex + first-line `session_meta` hint fallback. - `crates/relayburn-ledger/src/lib.rs` + `crates/relayburn-ledger/src/reader.rs` add `Ledger::list_user_turn_session_ids` (returns `HashSet<String>`, mirrors `list_content_session_ids`); the reingest skip filter AND-combines the two. Wiring the gap calls into the per-harness orchestration helpers in `ingest.rs` (`ingest_claude_into` / `ingest_codex_into` / `ingest_opencode_into`) is left to #277, which is filling those helper bodies in parallel. Closes #278. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughPorts a TypeScript ingest gap-warning state machine and a reingest_missing_content verb into Rust, adds a ledger API to list user-turn session IDs, updates module exports and changelog, and includes unit/integration tests for gap tracking and reingest behavior. ChangesGap Warning & Reingest Feature Port
Sequence Diagram(s)sequenceDiagram
participant Ingest as Ingest Orchestrator
participant Gap as Gap State Machine
participant Writer as Warning Writer/Callback
rect rgba(100, 150, 200, 0.5)
Note over Ingest,Writer: First batch with new affected session
Ingest->>Gap: record_session_gap(adapter, session_id, new_calls, new_results)
activate Gap
Gap->>Gap: Mark session as affected (no tool_results yet)
Gap->>Gap: Accumulate orphan tool_call count
deactivate Gap
Ingest->>Gap: emit_gap_warning(adapter, content_mode, on_warn)
activate Gap
Gap->>Gap: Check if session not in last emitted set
Gap->>Gap: Apply suppression (fire only for new affected)
Gap->>Writer: Write warning (callback or sink)
Gap->>Gap: Update last_emitted snapshot
deactivate Gap
end
rect rgba(150, 150, 100, 0.5)
Note over Ingest,Writer: Later batch: session gains tool_results
Ingest->>Gap: record_session_gap(adapter, session_id, 0, new_results)
activate Gap
Gap->>Gap: Mark session as healed (remove from affected)
Gap->>Gap: Clear suppression if affected set empty
deactivate Gap
Ingest->>Gap: emit_gap_warning(adapter, content_mode, on_warn)
activate Gap
Gap->>Gap: Suppression active (no new affected)
Gap->>Gap: Skip emission
deactivate Gap
end
sequenceDiagram
participant Orchestrator as reingest_missing_content
participant Ledger as Ledger (Skip Filter)
participant Parser as Per-Harness Parser
participant Ledger2 as Ledger (Append)
rect rgba(100, 200, 150, 0.5)
Note over Orchestrator,Ledger2: Reingest initialization & skip filter
Orchestrator->>Ledger: list_content_session_ids()
activate Ledger
Ledger-->>Orchestrator: content_sessions {id1, id2, ...}
deactivate Ledger
Orchestrator->>Ledger: list_user_turn_session_ids()
activate Ledger
Ledger-->>Orchestrator: user_turn_sessions {id2, id3, ...}
deactivate Ledger
Note over Orchestrator: Skip only when session in BOTH sets
end
rect rgba(200, 150, 100, 0.5)
Note over Orchestrator,Ledger2: Per-session reingest loop
Orchestrator->>Parser: parse(session, ContentStoreMode::Full)
activate Parser
Parser-->>Orchestrator: parsed {content[], user_turn[]}
deactivate Parser
Orchestrator->>Orchestrator: Filter records not in in-memory sets
Orchestrator->>Ledger2: append(missing_content)
activate Ledger2
Ledger2-->>Orchestrator: success
deactivate Ledger2
Orchestrator->>Ledger2: append(missing_user_turns)
activate Ledger2
Ledger2-->>Orchestrator: success
deactivate Ledger2
Orchestrator->>Orchestrator: Increment report counters
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9f00e0772d
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let state = state_lock(); | ||
| (state.write)(&body); |
There was a problem hiding this comment.
Call default warning sink outside gap-state mutex
emit_gap_warning re-locks GapState and invokes state.write while still holding the mutex, which makes the default-sink path re-entrant-unsafe: if a caller has replaced the sink via set_ingest_gap_writer and that sink touches any gap API (record_session_gap, emit_gap_warning, reset_ingest_gap_warnings, etc.), the second lock attempt will deadlock the thread. This is especially easy to hit in tests or integrations that wrap warning emission in shared helpers; capture/copy the sink and call it after dropping the lock, like the on_warn branch already does.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Good catch — fixed in 9725b1f. The default-sink branch was indeed re-entrant-unsafe. Switched WriterFn from Box<dyn Fn> to Arc<dyn Fn> so we can clone the writer out of the guard, drop the lock, then invoke it (the same pattern the on_warn branch already follows). Added a regression test (default_writer_can_re_enter_gap_api_without_deadlock) that re-enters record_session_gap from inside the sink to lock in the behaviour.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/relayburn-ingest/src/gap.rs`:
- Around line 333-336: The code is invoking the user-supplied writer while
holding the global gap mutex (via state_lock()), which can deadlock if the
writer calls back into gap APIs; fix by extracting/cloning the writer from the
guarded state (the writer set by set_ingest_gap_writer(), accessible as
state.write) while the mutex is held, then drop the guard and call the
extracted/cloned writer (similar to how on_warn is handled) so the user callback
runs without the mutex held.
In `@crates/relayburn-ingest/src/lib.rs`:
- Around line 34-43: There is a type mismatch: emit_gap_warning expects
relayburn_reader::ContentStoreMode (which includes HashOnly) but the crate
re-exports ingest::ContentStoreMode (only Off/Summary/Full); fix by making the
crate consistently export the reader's enum or by changing emit_gap_warning to
use the local ingest::ContentStoreMode. Concretely, either change the pub use
list to re-export relayburn_reader::ContentStoreMode instead of
ingest::ContentStoreMode (so emit_gap_warning, tests, and callers use the same
type), or modify the emit_gap_warning signature and any callers/tests to
accept/convert to ingest::ContentStoreMode; update references to
ContentStoreMode, emit_gap_warning, and any tests that reference
ContentStoreMode::HashOnly accordingly.
In `@crates/relayburn-ingest/src/reingest.rs`:
- Around line 253-279: The current session-level filtering
(filtered_content/filtered_user_turns using
existing_content/existing_user_turns) drops rows when any row for a session
already exists; instead either filter at record granularity or just append the
full parsed slices and let Ledger::append_content / Ledger::append_user_turns
handle deduplication. Concretely, remove the .filter(|c|
!existing_content.contains(&c.session_id)) and .filter(|u|
!existing_user_turns.contains(&u.session_id)) steps (or replace them with
per-record checks such as unique record IDs), call
ledger.append_content(&content) and ledger.append_user_turns(&user_turns) with
the full parsed vectors, and update report.appended_content and
report.appended_user_turns based on the actual number appended (or the ledger
response) and only insert into existing_content/existing_user_turns after
successful append.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: cf2b4d4b-6d25-44ba-8749-a5a83caa8b1c
📒 Files selected for processing (8)
CHANGELOG.mdcrates/relayburn-ingest/src/gap.rscrates/relayburn-ingest/src/ingest.rscrates/relayburn-ingest/src/lib.rscrates/relayburn-ingest/src/reingest.rscrates/relayburn-ledger/src/lib.rscrates/relayburn-ledger/src/reader.rscrates/relayburn-ledger/src/tests.rs
- gap: invoke the default writer after dropping the state mutex. Switch `WriterFn` from `Box<dyn Fn>` to `Arc<dyn Fn>` so we can clone the writer out of the guard before calling it. Otherwise a sink that re-enters any gap API would deadlock on the non-reentrant `std::sync::Mutex`. Adds a regression test that re-enters `record_session_gap` from the sink. - ContentStoreMode: drop the local enum from `ingest.rs` and re-export `relayburn_reader::ContentStoreMode` from the crate root so `emit_gap_warning` and the reader's incremental-parse options share the same type. The local enum was stale (`Summary` variant doesn't exist in the reader or TS surface) and unused. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
gapmodule:record_session_gap,emit_gap_warning,count_tool_call_gaps,reset_ingest_gap_warnings,set_ingest_gap_writer,restore_ingest_gap_writer, plusAdapterName/ToolCallGapCounts). Suppression semantics mirror the TSmoduleGapStatebyte-for-byte: one warning per fresh affected session, silent on steady-state, re-fires after the affected set decays to empty, and churn that introduces a fresh affected session id re-emits even at a flat count.OnceLock<Mutex<GapState>>(not per-Ledger-handle), matching the TSmoduleGapStatelifetime so suppression survives across multipleingest_*calls in one binary invocation. Tying it to aLedgerhandle would have lost suppression across theburn runpath's two distinct handles (per-session pre-spawn, sweep post-spawn). Documented inline ingap.rs.reingest_missing_content(reingestmodule) — walks every harness's source files, skips sessions already present in bothcontentanduser_turns, re-parses the rest withContentStoreMode::Full, and appends filtered records.relayburn-ledgerwithLedger::list_user_turn_session_ids(one method + one test) soreingest_missing_contentcan AND-combine its skip filter withlist_content_session_idswithout materializing every user-turn row. Mirrorslist_content_session_idsfrom [Rust port] relayburn-ledger: list_content_session_ids + Config / loadConfig surface #279 in shape and defensive-filter behaviour.ingest_claude_into/ingest_codex_into/ingest_opencode_into) is left to [Rust port] relayburn-ingest: per-harness orchestration loops #277, which is filling those helper bodies in parallel.Test plan
cargo test -p relayburn-ingest(31 tests, all green — covers the eight gap-warning suppression cases plus the fourreingest_missing_contentpaths)cargo test -p relayburn-ledger(47 tests, all green —list_user_turn_session_ids_returns_distinct_setadded)cargo build --workspacecargo test --workspacepnpm run build && pnpm run test(873 tests, all green — TS surface unaffected)cargo clippy -p relayburn-ingest -p relayburn-ledgerclean for the new codeCloses #278.
🤖 Generated with Claude Code