feat(ingest): multi-agent ingestion (codex/cursor/gemini) — backfill TBD#252
feat(ingest): multi-agent ingestion (codex/cursor/gemini) — backfill TBD#252
Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
📝 WalkthroughWalkthroughThis PR introduces multi-agent session ingestion infrastructure, including a background daemon configured via launchd, a polling-based file watcher to detect changes in Cursor and Gemini agent sessions, parsing and ingestion adapters for both agents, new CLI commands to trigger ingestion, and comprehensive tests. Changes
Sequence DiagramsequenceDiagram
participant Launchd as Launchd Daemon
participant CLI as watch-agents CLI
participant Watcher as AgentSessionWatcher
participant FileSystem as File System
participant Registry as AgentSessionRegistry
participant Parser as Session Parser
participant DB as SQLite/VectorStore
Launchd->>CLI: Trigger (poll_interval)
CLI->>Watcher: start()
Watcher->>Watcher: Main polling loop
loop Every poll_interval
Watcher->>FileSystem: Glob files per source
FileSystem-->>Watcher: File list
Watcher->>Registry: get(filepath)
Registry-->>Watcher: {mtime_ns, size} or None
Watcher->>FileSystem: stat(filepath)
FileSystem-->>Watcher: Current mtime_ns, size
alt File changed
Watcher->>Parser: parse_session(filepath)
Parser->>Parser: Extract, filter, classify
Parser-->>Watcher: Parsed entries
Watcher->>DB: index_chunks_to_sqlite(entries)
DB-->>Watcher: Chunk count
Watcher->>Registry: set(filepath, mtime_ns, size)
else File unchanged
Note over Watcher: Skip ingestion
end
end
Watcher->>Registry: flush() on shutdown
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| fd, tmp_path = tempfile.mkstemp(dir=str(self.path.parent), suffix=".tmp") | ||
| with os.fdopen(fd, "w") as fh: | ||
| json.dump(self._data, fh) | ||
| os.rename(tmp_path, self.path) |
There was a problem hiding this comment.
🟢 Low brainlayer/agent_watch.py:53
On Windows, os.rename(tmp_path, self.path) raises FileExistsError when self.path already exists. After the first successful flush, all subsequent flushes fail silently — the warning is logged but the registry file is never updated. Use os.replace() for cross-platform atomic file replacement.
- os.rename(tmp_path, self.path)
+ os.replace(tmp_path, self.path)🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file src/brainlayer/agent_watch.py around line 53:
On Windows, `os.rename(tmp_path, self.path)` raises `FileExistsError` when `self.path` already exists. After the first successful flush, all subsequent flushes fail silently — the warning is logged but the registry file is never updated. Use `os.replace()` for cross-platform atomic file replacement.
Evidence trail:
src/brainlayer/agent_watch.py lines 45-65 (commit REVIEWED_COMMIT) - shows `os.rename(tmp_path, self.path)` at line 53 with OSError handling. Python documentation at https://docs.python.org/3/library/os.html confirms `os.rename()` raises `FileExistsError` on Windows when destination exists, and recommends `os.replace()` for cross-platform overwriting.
| try: | ||
| line = json.loads(raw) | ||
| except (json.JSONDecodeError, ValueError): | ||
| continue |
There was a problem hiding this comment.
🟢 Low ingest/cursor.py:81
After json.loads() succeeds, line.get("role") is called without checking if line is a dict. If the JSON line contains a primitive like null, a string, a number, or an array, the .get() call raises AttributeError and crashes the parser. Consider validating that line is a dict before accessing it, or catch AttributeError alongside the existing exceptions.
- try:
- line = json.loads(raw)
- except (json.JSONDecodeError, ValueError):
+ try:
+ line = json.loads(raw)
+ if not isinstance(line, dict):
+ continue
+ except (json.JSONDecodeError, ValueError):
continue🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file src/brainlayer/ingest/cursor.py around lines 81-84:
After `json.loads()` succeeds, `line.get("role")` is called without checking if `line` is a dict. If the JSON line contains a primitive like `null`, a string, a number, or an array, the `.get()` call raises `AttributeError` and crashes the parser. Consider validating that `line` is a dict before accessing it, or catch `AttributeError` alongside the existing exceptions.
Evidence trail:
src/brainlayer/ingest/cursor.py lines 70-100 at REVIEWED_COMMIT. Specifically:
- Line 82: `line = json.loads(raw)`
- Lines 83-84: `except (json.JSONDecodeError, ValueError): continue`
- Line 86: `role = line.get('role')` - no type check before calling .get()
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit bd4f7e7. Configure here.
| indexed = source.ingest(file_path) | ||
| logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed) | ||
| self.registry.set(str(file_path), **state) | ||
| processed += 1 |
There was a problem hiding this comment.
Missing per-file error handling blocks all ingestion
High Severity
The source.ingest(file_path) call in poll_once lacks a per-file try-except. If one file fails, the exception escapes poll_once, is caught by start(), and all remaining files in that cycle are skipped. On the next cycle the same broken file is retried (it's not in the registry), fails again, and blocks all alphabetically-subsequent files — permanently. The batch equivalents (ingest_codex_dir, ingest_cursor_dir, ingest_gemini_dir) all correctly wrap each file in try-except with a logger.warning, showing the intended pattern.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit bd4f7e7. Configure here.
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/brainlayer/agent_watch.py`:
- Around line 102-125: Wrap the call to source.ingest(file_path) inside a
per-file try/except in poll_once so a single file error doesn't abort the whole
loop: call source.ingest(file_path) in a try block, on success log via
logger.info, call self.registry.set(...) and increment processed; on exception
catch Exception as e and log a per-file error including source.name, file_path,
and the exception (with traceback/context) and then continue to the next file
without updating the registry or processed count. Ensure the except does not
re-raise so remaining files and sources are still processed and the registry
flush logic remains unchanged.
- Around line 102-125: AgentSessionWatcher.poll_once currently re-parses entire
append-only JSONL sessions on any mtime/size change; change to offset-based
incremental ingestion like JSONLWatcher: persist a per-file byte offset in the
registry (keyed by file path) and on each poll call read the saved offset,
detect rewind (file.size < offset -> full reparse), open the file and seek to
offset to parse only new lines, then call
ingest_cursor_session/ingest_gemini_session (or refactor them to accept a
start_offset or file-like stream) to process only appended entries, update the
registry with the new offset (and mtime/size) after successful ingest, and keep
the existing flush logic; this avoids full re-parsing and re-embedding unchanged
content.
In `@src/brainlayer/index_new.py`:
- Around line 48-63: The fallback that inspects source_file to set created_at
uses a broad except and opens files without an encoding; narrow the exception
handling to only (OSError, json.JSONDecodeError, UnicodeDecodeError) and open
the file with encoding="utf-8" when reading lines, and update the except to
reference logger.debug("Could not extract timestamp from %s: %s", source_file,
e) for those specific exceptions; locate the logic around created_at,
source_file and the logger.debug call in this block to apply the changes (ensure
json.JSONDecodeError is imported or referenced as needed).
In `@src/brainlayer/ingest/cursor.py`:
- Around line 81-88: After json.loads(raw) in cursor.py ensure the decoded value
is a dict before calling .get(): check "if not isinstance(line, dict): continue"
immediately after parsing (and also ensure "message" is treated as a dict, e.g.,
"message = line.get('message') or {}" only after confirming line is a dict), so
that subsequent uses of line.get("role"), line.get("message"), and
_extract_text(message.get("content")) won't raise AttributeError for non-object
JSON payloads; update the block around the variables line, role, message and
text to bail out/continue on non-dict inputs.
- Around line 244-273: The current dedup logic uses only path membership in
already_indexed (see variable already_indexed and the SELECT DISTINCT
source_file query) which causes updated Cursor sessions to be skipped; fix by
either (A) extending the chunks schema to store file state (e.g., mtime, size,
checksum) and update VectorStore write/read logic so ingest_cursor_session (and
analogous code in gemini.py and codex.py) compares stored file metadata against
current file metadata to decide re-ingest, or (B) explicitly document that
ingest-* functions are one-shot-per-file and remove/clarify the path-only dedup
behavior so all three ingest paths define "already indexed" consistently;
implement the chosen approach across cursor.py, gemini.py, and codex.py and
update VectorStore read/write calls that populate/consume chunks accordingly.
In `@src/brainlayer/ingest/gemini.py`:
- Around line 218-228: Replace the direct call into the private
VectorStore._read_cursor() from the Gemini ingest code with a public helper on
VectorStore (e.g., add list_indexed_source_files(self, source: str) -> set[str])
that encapsulates the SQL ("SELECT DISTINCT source_file FROM chunks WHERE source
= ?") and returns the set of filenames; then update the Gemini adapter (the
block that currently imports VectorStore and uses _read_cursor) to call
VectorStore.list_indexed_source_files("gemini") and assign its result to
already_indexed, removing the dependency on the private _read_cursor
implementation.
- Around line 204-207: The recursive glob
sessions_dir.glob("**/chats/session-*.json") can traverse the whole filesystem
if sessions_dir is broad; change to explicitly target the expected chats folder
and validate sessions_dir first: ensure
sessions_dir.resolve().is_relative_to(Path.home() / ".gemini" / "tmp") (or
otherwise enforce/validate the expected layout) and replace the recursive
pattern with a scoped lookup like
sessions_dir.joinpath("chats").glob("session-*.json") (or stream files via
os.scandir on sessions_dir/"chats") before applying the since_days cutoff using
cutoff and f.stat().st_mtime; if validation fails, raise or log and skip
scanning to avoid expensive wide walks.
- Around line 51-52: The open(file_path) call in parse_gemini_session should
explicitly specify UTF-8 encoding to avoid platform-dependent defaults; update
the call in parse_gemini_session (and any other open(...) usages in this module)
to use open(file_path, encoding="utf-8") so the JSON payload is read as UTF-8
consistently.
- Around line 36-42: The current _extract_project_from_path uses
parts.index("tmp") which picks the first "tmp" in the path and can mis-attribute
the project; change it to locate the "tmp" that is the Gemini tmp directory by
searching for the sequence (".gemini", "tmp") (or walk parts from right to left
to find a "tmp" whose parent is ".gemini") and return the following segment as
the project; if that anchored match isn't found you can fall back to using the
last occurrence of "tmp" (i.e., search parts reversed) so you don't incorrectly
pick an earlier "tmp" like /private/tmp — update _extract_project_from_path to
implement this anchored/reversed search using file_path.parts and the ".gemini"
and "tmp" tokens.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: c986202a-8003-41d8-8eeb-9577323343bd
📒 Files selected for processing (11)
scripts/launchd/com.brainlayer.agent-ingest.plistscripts/launchd/install.shsrc/brainlayer/agent_watch.pysrc/brainlayer/cli/__init__.pysrc/brainlayer/index_new.pysrc/brainlayer/ingest/cursor.pysrc/brainlayer/ingest/gemini.pytests/test_agent_ingest_launchd.pytests/test_agent_session_watcher.pytests/test_ingest_cursor.pytests/test_ingest_gemini.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Cursor Bugbot
- GitHub Check: Macroscope - Correctness Check
- GitHub Check: test (3.12)
- GitHub Check: test (3.13)
- GitHub Check: test (3.11)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior
Enforce one-write-at-a-time concurrency constraint; reads are safe but brain_digest is write-heavy and must not run in parallel with other MCP work
Run pytest before claiming behavior changed safely; current test suite has 929 tests
**/*.py: Usepaths.py:get_db_path()for all database path resolution; all scripts and CLI must use this function rather than hardcoding paths
When performing bulk database operations: stop enrichment workers first, checkpoint WAL before and after, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks, and checkpoint every 3 batches
Files:
tests/test_agent_ingest_launchd.pytests/test_ingest_cursor.pysrc/brainlayer/index_new.pytests/test_agent_session_watcher.pytests/test_ingest_gemini.pysrc/brainlayer/ingest/gemini.pysrc/brainlayer/ingest/cursor.pysrc/brainlayer/agent_watch.pysrc/brainlayer/cli/__init__.py
src/brainlayer/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/brainlayer/**/*.py: Use retry logic onSQLITE_BUSYerrors; each worker must use its own database connection to handle concurrency safely
Classification must preserveai_code,stack_trace, anduser_messageverbatim; skipnoiseentries entirely and summarizebuild_loganddir_listingentries (structure only)
Use AST-aware chunking via tree-sitter; never split stack traces; mask large tool output
For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback viaenrichment_controller.py, and Ollama as offline last-resort; allow override viaBRAINLAYER_ENRICH_BACKENDenv var
Configure enrichment rate viaBRAINLAYER_ENRICH_RATEenvironment variable (default 0.2 = 12 RPM)
Implement chunk lifecycle columns:superseded_by,aggregated_into,archived_aton chunks table; exclude lifecycle-managed chunks from default search; allowinclude_archived=Trueto show history
Implementbrain_supersedewith safety gate for personal data (journals, notes, health/finance); use soft-delete forbrain_archivewith timestamp
Addsupersedesparameter tobrain_storefor atomic store-and-replace operations
Run linting and formatting with:ruff check src/ && ruff format src/
Run tests withpytest
UsePRAGMA wal_checkpoint(FULL)before and after bulk database operations to prevent WAL bloat
Files:
src/brainlayer/index_new.pysrc/brainlayer/ingest/gemini.pysrc/brainlayer/ingest/cursor.pysrc/brainlayer/agent_watch.pysrc/brainlayer/cli/__init__.py
🧠 Learnings (14)
📚 Learning: 2026-04-13T14:12:32.868Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 239
File: scripts/launchd/com.brainlayer.decay.plist:8-24
Timestamp: 2026-04-13T14:12:32.868Z
Learning: In `src/brainlayer/decay_job.py`, `run_decay_job()` is intentionally platform-neutral and must NOT contain any macOS launchd-specific logic (e.g., `launchctl unload/load`). It is used by the CLI and tests. Any enrichment-agent coordination around decay should be handled in a launchd wrapper or a dedicated follow-up command, not inside the shared decay function.
Applied to files:
scripts/launchd/install.shtests/test_agent_ingest_launchd.pyscripts/launchd/com.brainlayer.agent-ingest.plist
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : Implement chunk lifecycle columns: `superseded_by`, `aggregated_into`, `archived_at` on chunks table; exclude lifecycle-managed chunks from default search; allow `include_archived=True` to show history
Applied to files:
src/brainlayer/index_new.py
📚 Learning: 2026-04-04T23:24:03.159Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-04T23:24:03.159Z
Learning: Applies to src/brainlayer/{vector_store,search}*.py : Chunk lifecycle: implement columns `superseded_by`, `aggregated_into`, `archived_at` on chunks table; exclude lifecycle-managed chunks from default search
Applied to files:
src/brainlayer/index_new.py
📚 Learning: 2026-04-12T00:00:23.993Z
Learnt from: EtanHey
Repo: EtanHey/brainlayer PR: 0
File: :0-0
Timestamp: 2026-04-12T00:00:23.993Z
Learning: Applies to src/brainlayer/watcher.py and src/brainlayer/watcher_bridge.py: Nested Claude transcript artifacts under session folders (e.g., subagents/*.jsonl) belong to the project directory immediately under `projects/`, not to the immediate parent folder. The project root segment (the directory directly under `projects/`) is the canonical owner for watcher ingestion, regardless of how deeply nested the JSONL file is.
Applied to files:
tests/test_agent_session_watcher.pysrc/brainlayer/agent_watch.py
📚 Learning: 2026-04-06T11:15:05.940Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T11:15:05.940Z
Learning: Applies to src/brainlayer/watcher.py : Watcher uses Axiom telemetry for startup, flush, error, and heartbeat (60s) events to `brainlayer-watcher` dataset
Applied to files:
tests/test_agent_session_watcher.pysrc/brainlayer/agent_watch.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/**/*.py : For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback via `enrichment_controller.py`, and Ollama as offline last-resort; allow override via `BRAINLAYER_ENRICH_BACKEND` env var
Applied to files:
src/brainlayer/ingest/gemini.py
📚 Learning: 2026-04-01T01:24:44.281Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-01T01:24:44.281Z
Learning: Applies to src/brainlayer/*enrichment*.py : Enrichment backend priority: Groq (primary/cloud) → Gemini (fallback) → Ollama (offline last-resort), configurable via `BRAINLAYER_ENRICH_BACKEND` environment variable
Applied to files:
src/brainlayer/ingest/gemini.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Use LaunchAgent `com.brainlayer.watch.plist` with KeepAlive=true and Nice=10 for persistent watcher process
Applied to files:
scripts/launchd/com.brainlayer.agent-ingest.plist
📚 Learning: 2026-04-06T11:15:05.940Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T11:15:05.940Z
Learning: Applies to src/brainlayer/watcher*.py : Real-time watcher via LaunchAgent (`com.brainlayer.watch.plist`) uses 4-layer content filters: entry type whitelist → classify → chunk min-length → system-reminder strip
Applied to files:
scripts/launchd/com.brainlayer.agent-ingest.plistsrc/brainlayer/agent_watch.py
📚 Learning: 2026-04-06T11:15:05.940Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T11:15:05.940Z
Learning: Applies to src/brainlayer/watcher*.py : Watcher offsets persisted in `~/.local/share/brainlayer/offsets.json` to survive restarts; implement rewind detection (file shrink = checkpoint restore)
Applied to files:
src/brainlayer/agent_watch.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/watcher.py : Persist watcher offsets in `~/.local/share/brainlayer/offsets.json`; implement rewind detection (file shrink = checkpoint restore) and soft-archive reverted chunks
Applied to files:
src/brainlayer/agent_watch.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/watcher.py : Send Axiom telemetry (startup, flush, error, heartbeat at 60s intervals) to `brainlayer-watcher` dataset
Applied to files:
src/brainlayer/agent_watch.py
📚 Learning: 2026-04-06T08:40:13.531Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-06T08:40:13.531Z
Learning: Applies to src/brainlayer/watcher.py : Implement 4-layer content filtering for real-time JSONL watcher: entry type whitelist → classify → chunk min-length → system-reminder strip
Applied to files:
src/brainlayer/agent_watch.py
📚 Learning: 2026-04-03T11:34:19.303Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-03T11:34:19.303Z
Learning: Applies to src/brainlayer/cli.py : Use Typer CLI framework for command-line interface in `src/brainlayer/`
Applied to files:
src/brainlayer/cli/__init__.py
🔇 Additional comments (6)
scripts/launchd/install.sh (1)
122-152: LGTM — agent-ingest wiring is consistent with the existing pattern.Case entry,
allbranch,removebranch (with idempotent|| true), and usage string all stay in sync. Theremove_plist agent-ingest 2>/dev/null || trueis belt-and-suspenders givenremove_plist's internal suppression, but harmless and symmetric withremove_plist enrich.tests/test_agent_ingest_launchd.py (1)
7-23: LGTM.Plist shape + installer references are covered without coupling to implementation details. Using
ET.iter("string")is fine here because the plist DOCTYPE is external andxml.etreewon't fetch it.scripts/launchd/com.brainlayer.agent-ingest.plist (1)
1-44: Persistence/throttle settings look right for a background watcher.
KeepAlive=true+RunAtLoad=true+ThrottleInterval=5+Nice=10+ProcessType=Backgroundmatches the prior LaunchAgent pattern for persistent watcher processes.PYTHONUNBUFFERED=1ensures logs flush promptly to the configured paths.One optional consideration: if you want launchd to also throttle restart on crash loops more aggressively, you can switch
KeepAliveto a dict withSuccessfulExit=falseso launchd doesn't respawn on clean exits — but that's only relevant ifwatch-agentsis ever expected to exit cleanly (currently it runs forever).Based on learnings from
scripts/launchd/com.brainlayer.watch.plist: "Use LaunchAgent … with KeepAlive=true and Nice=10 for persistent watcher process".tests/test_ingest_cursor.py (1)
33-92: LGTM — parser contract is well covered.Happy-path user/assistant extraction,
<user_query>unwrapping, fenced-codeai_codeclassification, and the min-length filter are all asserted.tmp_pathensures hermetic fixtures per test.tests/test_agent_session_watcher.py (1)
30-72: LGTM — good multi-source watcher coverage.The test exercises the three important state transitions: initial ingest across three sources, idempotent no-op on an unchanged poll, and delta detection after append. Tracking both
mtime_nsandsizein the registry makes this robust even on coarse-mtime filesystems (the append changes size deterministically).One small note for readers: the
cursorsource here uses["**/*.jsonl"], whereas the production CLI registers it as["**/agent-transcripts/**/*.jsonl"]. That's fine for verifying watcher mechanics, just worth being aware of if you later add a test that pins the CLI-level pattern.tests/test_ingest_gemini.py (1)
14-78: LGTM — asserts the key parser invariants.Covers:
infofiltering,user→user_messagemapping,gemini→assistant_textmapping,source/project/session_id/timestampmetadata propagation, and fenced-codeai_codeclassification. Fixture path under.gemini/tmp/brainlayer/chatsexercises_extract_project_from_path.
| def poll_once(self) -> int: | ||
| processed = 0 | ||
| for source in self.sources: | ||
| for file_path in self._discover_files(source): | ||
| try: | ||
| stat = file_path.stat() | ||
| except OSError: | ||
| continue | ||
|
|
||
| state = {"mtime_ns": stat.st_mtime_ns, "size": stat.st_size} | ||
| previous = self.registry.get(str(file_path)) | ||
| if previous == state: | ||
| continue | ||
|
|
||
| indexed = source.ingest(file_path) | ||
| logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed) | ||
| self.registry.set(str(file_path), **state) | ||
| processed += 1 | ||
|
|
||
| now = time.monotonic() | ||
| if now - self._last_registry_flush >= self.registry_flush_interval_s: | ||
| self.registry.flush() | ||
| self._last_registry_flush = now | ||
| return processed |
There was a problem hiding this comment.
Isolate per-file ingest failures so a single bad file doesn't abort the whole poll.
source.ingest(file_path) is not wrapped in try/except. If any single file raises (malformed JSON, transient DB lock, bug in one adapter), poll_once() unwinds and the surrounding start() logs a generic "Agent watcher poll failed" — skipping all remaining files across all remaining sources until the next tick. The registry state is also not updated for the failing file (good, it will retry), but neighbors are unnecessarily penalized.
🛠️ Suggested fix
- indexed = source.ingest(file_path)
- logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed)
- self.registry.set(str(file_path), **state)
- processed += 1
+ try:
+ indexed = source.ingest(file_path)
+ except Exception as exc: # noqa: BLE001
+ logger.warning(
+ "Agent ingest failed for %s (%s): %s", source.name, file_path, exc
+ )
+ continue
+ logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed)
+ self.registry.set(str(file_path), **state)
+ processed += 1📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def poll_once(self) -> int: | |
| processed = 0 | |
| for source in self.sources: | |
| for file_path in self._discover_files(source): | |
| try: | |
| stat = file_path.stat() | |
| except OSError: | |
| continue | |
| state = {"mtime_ns": stat.st_mtime_ns, "size": stat.st_size} | |
| previous = self.registry.get(str(file_path)) | |
| if previous == state: | |
| continue | |
| indexed = source.ingest(file_path) | |
| logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed) | |
| self.registry.set(str(file_path), **state) | |
| processed += 1 | |
| now = time.monotonic() | |
| if now - self._last_registry_flush >= self.registry_flush_interval_s: | |
| self.registry.flush() | |
| self._last_registry_flush = now | |
| return processed | |
| def poll_once(self) -> int: | |
| processed = 0 | |
| for source in self.sources: | |
| for file_path in self._discover_files(source): | |
| try: | |
| stat = file_path.stat() | |
| except OSError: | |
| continue | |
| state = {"mtime_ns": stat.st_mtime_ns, "size": stat.st_size} | |
| previous = self.registry.get(str(file_path)) | |
| if previous == state: | |
| continue | |
| try: | |
| indexed = source.ingest(file_path) | |
| except Exception as exc: # noqa: BLE001 | |
| logger.warning( | |
| "Agent ingest failed for %s (%s): %s", source.name, file_path, exc | |
| ) | |
| continue | |
| logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed) | |
| self.registry.set(str(file_path), **state) | |
| processed += 1 | |
| now = time.monotonic() | |
| if now - self._last_registry_flush >= self.registry_flush_interval_s: | |
| self.registry.flush() | |
| self._last_registry_flush = now | |
| return processed |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/agent_watch.py` around lines 102 - 125, Wrap the call to
source.ingest(file_path) inside a per-file try/except in poll_once so a single
file error doesn't abort the whole loop: call source.ingest(file_path) in a try
block, on success log via logger.info, call self.registry.set(...) and increment
processed; on exception catch Exception as e and log a per-file error including
source.name, file_path, and the exception (with traceback/context) and then
continue to the next file without updating the registry or processed count.
Ensure the except does not re-raise so remaining files and sources are still
processed and the registry flush logic remains unchanged.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Inspect chunk id derivation + INSERT semantics in index_new.py and related writers
fd -t f 'index_new.py' src | xargs rg -nP -C4 '\b(insert|INSERT|OR IGNORE|OR REPLACE|chunk_id|hash|sha|uuid)\b'
rg -nP -C3 '\b(def\s+index_chunks_to_sqlite|generate_chunk_id|compute_chunk_id)\b'
# Check VectorStore.add_chunk / upsert paths
ast-grep --pattern $'class VectorStore:
$$$
def $_($$$):
$$$
$$$'Repository: EtanHey/brainlayer
Length of output: 891
🏁 Script executed:
# Check INSERT/UPSERT semantics in index_new.py
rg -nP -A20 'def index_chunks_to_sqlite' src/brainlayer/index_new.pyRepository: EtanHey/brainlayer
Length of output: 812
🏁 Script executed:
# Check if there's REPLACE or IGNORE handling
rg -nP 'OR IGNORE|OR REPLACE|INSERT.*VALUES|INSERT.*SELECT' src/brainlayer/index_new.pyRepository: EtanHey/brainlayer
Length of output: 44
🏁 Script executed:
# Find ingest_cursor_session and ingest_gemini_session implementations
fd -t f 'ingest.*\.py' src | xargs rg -l 'ingest_cursor_session|ingest_gemini_session'Repository: EtanHey/brainlayer
Length of output: 156
🏁 Script executed:
# Check VectorStore chunk insertion logic
rg -nP -B2 -A15 'def add_chunk|def upsert.*chunk' src/brainlayer/Repository: EtanHey/brainlayer
Length of output: 1449
🏁 Script executed:
# Get full INSERT statement including ON CONFLICT clause
rg -nP -A30 'def upsert_chunks' src/brainlayer/vector_store.py | head -50Repository: EtanHey/brainlayer
Length of output: 1748
🏁 Script executed:
# Check if there's ON CONFLICT handling in chunks table
rg -nP 'ON CONFLICT|UNIQUE.*chunks|PRIMARY KEY.*chunks' src/brainlayer/vector_store.pyRepository: EtanHey/brainlayer
Length of output: 112
🏁 Script executed:
# Check ingest_cursor_session - does it re-parse entire file or track offsets?
rg -nP -B3 -A20 'def ingest_cursor_session' src/brainlayer/ingest/cursor.pyRepository: EtanHey/brainlayer
Length of output: 929
🏁 Script executed:
# Check ingest_gemini_session similarly
rg -nP -B3 -A20 'def ingest_gemini_session' src/brainlayer/ingest/gemini.pyRepository: EtanHey/brainlayer
Length of output: 920
🏁 Script executed:
# Get full ingest_cursor_session to see if it re-parses entire file or uses offsets
rg -nP -A60 'def ingest_cursor_session' src/brainlayer/ingest/cursor.py | head -80Repository: EtanHey/brainlayer
Length of output: 2374
🏁 Script executed:
# Check if there's any offset tracking in the ingest flow
rg -nP 'offset|position|seek|tell|incremental' src/brainlayer/ingest/cursor.py src/brainlayer/ingest/gemini.pyRepository: EtanHey/brainlayer
Length of output: 44
🏁 Script executed:
# Verify chunk enumeration - is it deterministic across re-parses?
rg -nP -B2 -A10 'enumerate.*chunks|chunk.*enumerate' src/brainlayer/ingest/cursor.py src/brainlayer/ingest/gemini.pyRepository: EtanHey/brainlayer
Length of output: 44
🏁 Script executed:
# Check agent_watch.py to see exactly how source.ingest is called and if file is fully re-read
rg -nP -B5 -A5 'source.ingest' src/brainlayer/agent_watch.pyRepository: EtanHey/brainlayer
Length of output: 585
Re-ingestion waste on live agent sessions — entire file re-parsed and re-embedded on every mtime/size change, even for unchanged content.
AgentSessionWatcher tracks only {mtime_ns, size} per file and triggers source.ingest(file_path) on any change. The ingest functions (ingest_cursor_session, ingest_gemini_session) call parse_cursor_session(file_path) which reads the entire JSONL, re-parses all entries, and chunks them completely. Since Cursor/Gemini session JSONLs are append-only during active sessions, every poll interval yields re-parsing, re-chunking, and re-embedding of the entire transcript (potentially multi-MB).
Database deduplication is in place (upsert_chunks uses ON CONFLICT(id) DO UPDATE SET), so duplicate rows won't accumulate—but this mechanism still pays the full cost of parsing, chunking, and embedding the unchanged portion of the file. Contrast with src/brainlayer/watcher.py's JSONLWatcher, which persists per-file byte offsets and processes only new lines (plus rewind detection).
On a 30s poll interval with a live agent transcribing a multi-MB session, this pattern will:
- Waste CPU and embedding quota on unchanged content.
- Block the single poll thread for the duration of re-parsing and re-embedding.
- Slowly starve other work competing for the same embeddings API rate limit.
Adopt offset-based incremental ingestion (like JSONLWatcher) for append-only JSONL sources to ingest only new lines since the last offset.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/agent_watch.py` around lines 102 - 125,
AgentSessionWatcher.poll_once currently re-parses entire append-only JSONL
sessions on any mtime/size change; change to offset-based incremental ingestion
like JSONLWatcher: persist a per-file byte offset in the registry (keyed by file
path) and on each poll call read the saved offset, detect rewind (file.size <
offset -> full reparse), open the file and seek to offset to parse only new
lines, then call ingest_cursor_session/ingest_gemini_session (or refactor them
to accept a start_offset or file-like stream) to process only appended entries,
update the registry with the new offset (and mtime/size) after successful
ingest, and keep the existing flush logic; this avoids full re-parsing and
re-embedding unchanged content.
| if not created_at: | ||
| # Try to get timestamp from source file (first JSONL message) | ||
| try: | ||
| import json as _json | ||
|
|
||
| with open(source_file) as _f: | ||
| for _line in _f: | ||
| _line = _line.strip() | ||
| if not _line: | ||
| continue | ||
| _data = _json.loads(_line) | ||
| if "timestamp" in _data: | ||
| created_at = _data["timestamp"] | ||
| break | ||
| except Exception as e: | ||
| logger.debug("Could not extract timestamp from %s: %s", source_file, e) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Minor: broad except swallows everything, plus no encoding on open.
The fallback extraction path catches all exceptions (including KeyboardInterrupt-safe-but-noisy ones like MemoryError) and opens without an explicit encoding. For the JSONL use case this is typically fine, but:
- Consider narrowing to
(OSError, json.JSONDecodeError, UnicodeDecodeError). - Pass
encoding="utf-8"toopento avoid locale-dependent behavior on non-macOS/Linux hosts or unusual locales.
Also note: when callers pass a non-JSONL source_file (e.g. Gemini's single-document session-*.json), this block harmlessly fails and falls through to datetime.now(...). Since Gemini already passes created_at=session_ts from the adapter, that's acceptable, but worth being aware of.
♻️ Proposed refactor
if not created_at:
# Try to get timestamp from source file (first JSONL message)
try:
import json as _json
- with open(source_file) as _f:
+ with open(source_file, encoding="utf-8") as _f:
for _line in _f:
_line = _line.strip()
if not _line:
continue
_data = _json.loads(_line)
if "timestamp" in _data:
created_at = _data["timestamp"]
break
- except Exception as e:
+ except (OSError, ValueError, UnicodeDecodeError) as e:
logger.debug("Could not extract timestamp from %s: %s", source_file, e)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/index_new.py` around lines 48 - 63, The fallback that inspects
source_file to set created_at uses a broad except and opens files without an
encoding; narrow the exception handling to only (OSError, json.JSONDecodeError,
UnicodeDecodeError) and open the file with encoding="utf-8" when reading lines,
and update the except to reference logger.debug("Could not extract timestamp
from %s: %s", source_file, e) for those specific exceptions; locate the logic
around created_at, source_file and the logger.debug call in this block to apply
the changes (ensure json.JSONDecodeError is imported or referenced as needed).
| try: | ||
| line = json.loads(raw) | ||
| except (json.JSONDecodeError, ValueError): | ||
| continue | ||
|
|
||
| role = line.get("role") | ||
| message = line.get("message") or {} | ||
| text = _extract_text(message.get("content")).strip() |
There was a problem hiding this comment.
Guard against non-dict JSON payloads before .get().
json.loads(raw) can return any JSON value (list, string, number, null). On a top-level non-object line (not unheard of in heterogeneous transcript tooling), line.get("role") / line.get("message") will raise AttributeError, which is not caught here and — once the watcher wraps ingest in try/except per the other comment — will abort this file's parse mid-stream.
🛡️ Suggested fix
try:
line = json.loads(raw)
except (json.JSONDecodeError, ValueError):
continue
+ if not isinstance(line, dict):
+ continue
role = line.get("role")
message = line.get("message") or {}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/ingest/cursor.py` around lines 81 - 88, After json.loads(raw)
in cursor.py ensure the decoded value is a dict before calling .get(): check "if
not isinstance(line, dict): continue" immediately after parsing (and also ensure
"message" is treated as a dict, e.g., "message = line.get('message') or {}" only
after confirming line is a dict), so that subsequent uses of line.get("role"),
line.get("message"), and _extract_text(message.get("content")) won't raise
AttributeError for non-object JSON payloads; update the block around the
variables line, role, message and text to bail out/continue on non-dict inputs.
| already_indexed: set[str] = set() | ||
| if not dry_run and db_path and db_path.exists(): | ||
| try: | ||
| from ..vector_store import VectorStore | ||
|
|
||
| with VectorStore(db_path) as store: | ||
| cursor = store._read_cursor() | ||
| rows = cursor.execute("SELECT DISTINCT source_file FROM chunks WHERE source = 'cursor'") | ||
| already_indexed = {row[0] for row in rows} | ||
| except Exception as exc: | ||
| logger.debug("Could not check existing cursor chunks: %s", exc) | ||
|
|
||
| files_processed = 0 | ||
| total_chunks = 0 | ||
|
|
||
| for file_path in jsonl_files: | ||
| if str(file_path) in already_indexed: | ||
| logger.debug("Skipping already-indexed %s", file_path.name) | ||
| continue | ||
| try: | ||
| total_chunks += ingest_cursor_session( | ||
| file_path, | ||
| db_path=db_path, | ||
| project_override=project_override, | ||
| dry_run=dry_run, | ||
| verbose=verbose, | ||
| ) | ||
| files_processed += 1 | ||
| except Exception as exc: | ||
| logger.warning("Failed to ingest %s: %s", file_path.name, exc) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Compare codex vs cursor vs gemini dedup logic
fd -t f -e py . src/brainlayer/ingest | xargs rg -nP -C3 'already_indexed|DISTINCT source_file|source_file\s*='
# Inspect chunks schema for any offset/mtime columns
rg -nP -C2 'CREATE\s+TABLE\s+chunks|ALTER\s+TABLE\s+chunks' --type=py --type=sqlRepository: EtanHey/brainlayer
Length of output: 13681
Path-only dedup prevents re-ingestion of updated Cursor sessions across all ingest paths.
already_indexed checks only whether a file path exists in prior chunks, ignoring any content changes. Once a Cursor transcript is ingested, subsequent runs skip it forever—even if the session has new turns. This applies equally to cursor.py, gemini.py, and codex.py, all following the same pattern of SELECT DISTINCT source_file ... WHERE source = '...' followed by membership check on the path string.
The chunks table schema lacks any file metadata columns (mtime, size, offset, etc.) to track file state, so proper incremental detection is not currently possible. This is inconsistent with the expected behavior where file changes trigger re-ingestion, and means users relying on batch backfill via the CLI will silently miss updates to live sessions.
Either implement file state tracking (mtime/size) in the schema and comparison logic across all three ingest paths, or explicitly document that each ingest-* is one-shot-per-file and direct users to watch-agents for incremental updates. Ensure both codepaths define "already indexed" consistently.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/ingest/cursor.py` around lines 244 - 273, The current dedup
logic uses only path membership in already_indexed (see variable already_indexed
and the SELECT DISTINCT source_file query) which causes updated Cursor sessions
to be skipped; fix by either (A) extending the chunks schema to store file state
(e.g., mtime, size, checksum) and update VectorStore write/read logic so
ingest_cursor_session (and analogous code in gemini.py and codex.py) compares
stored file metadata against current file metadata to decide re-ingest, or (B)
explicitly document that ingest-* functions are one-shot-per-file and
remove/clarify the path-only dedup behavior so all three ingest paths define
"already indexed" consistently; implement the chosen approach across cursor.py,
gemini.py, and codex.py and update VectorStore read/write calls that
populate/consume chunks accordingly.
| def _extract_project_from_path(file_path: Path) -> Optional[str]: | ||
| parts = file_path.parts | ||
| if "tmp" in parts: | ||
| idx = parts.index("tmp") + 1 | ||
| if idx < len(parts): | ||
| return parts[idx] | ||
| return None |
There was a problem hiding this comment.
Brittle _extract_project_from_path — first "tmp" segment wins.
parts.index("tmp") returns the first occurrence, which works for the canonical ~/.gemini/tmp/<project>/chats/... layout but silently mis-attributes the project if any ancestor path segment happens to be "tmp" (e.g., /private/tmp/... sandboxes, CI tempdirs, or a custom sessions_dir whose root includes tmp). Anchor the search relative to the known Gemini root or walk from the right so only the .gemini/tmp/<project> hop is matched.
♻️ Proposed fix
def _extract_project_from_path(file_path: Path) -> Optional[str]:
parts = file_path.parts
- if "tmp" in parts:
- idx = parts.index("tmp") + 1
- if idx < len(parts):
- return parts[idx]
- return None
+ # Canonical layout: <...>/.gemini/tmp/<project>/chats/session-*.json
+ for i in range(len(parts) - 1):
+ if parts[i] == ".gemini" and i + 2 < len(parts) and parts[i + 1] == "tmp":
+ return parts[i + 2]
+ # Fallback: the directory two levels above "chats/session-*.json"
+ if len(parts) >= 3 and parts[-2] == "chats":
+ return parts[-3]
+ return None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/ingest/gemini.py` around lines 36 - 42, The current
_extract_project_from_path uses parts.index("tmp") which picks the first "tmp"
in the path and can mis-attribute the project; change it to locate the "tmp"
that is the Gemini tmp directory by searching for the sequence (".gemini",
"tmp") (or walk parts from right to left to find a "tmp" whose parent is
".gemini") and return the following segment as the project; if that anchored
match isn't found you can fall back to using the last occurrence of "tmp" (i.e.,
search parts reversed) so you don't incorrectly pick an earlier "tmp" like
/private/tmp — update _extract_project_from_path to implement this
anchored/reversed search using file_path.parts and the ".gemini" and "tmp"
tokens.
| with open(file_path) as fh: | ||
| payload = json.load(fh) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Open with explicit UTF-8 encoding.
Both parse_gemini_session (open(file_path) at line 51) and any other open in this module should pass encoding="utf-8"; Gemini writes these JSONs as UTF-8, and relying on the platform default can bite on non-POSIX locales or Windows.
♻️ Proposed fix
- with open(file_path) as fh:
+ with open(file_path, encoding="utf-8") as fh:
payload = json.load(fh)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| with open(file_path) as fh: | |
| payload = json.load(fh) | |
| with open(file_path, encoding="utf-8") as fh: | |
| payload = json.load(fh) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/ingest/gemini.py` around lines 51 - 52, The open(file_path)
call in parse_gemini_session should explicitly specify UTF-8 encoding to avoid
platform-dependent defaults; update the call in parse_gemini_session (and any
other open(...) usages in this module) to use open(file_path, encoding="utf-8")
so the JSON payload is read as UTF-8 consistently.
| session_files = sorted(sessions_dir.glob("**/chats/session-*.json")) | ||
| if since_days is not None: | ||
| cutoff = datetime.now(timezone.utc).timestamp() - since_days * 86400 | ||
| session_files = [f for f in session_files if f.stat().st_mtime >= cutoff] |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
**/chats/session-*.json can walk the entire home tree — scope the glob.
sessions_dir.glob("**/chats/session-*.json") on the default ~/.gemini/tmp is fine, but if a caller ever passes a broader directory (e.g., Path.home() or a project root), this recursive glob can become very expensive and may traverse unrelated chats/ folders. Consider validating that sessions_dir resolves under ~/.gemini/tmp (or otherwise document the expected layout) before the recursive walk. Also, for very large trees, os.scandir-based iteration would stream results instead of materializing the full list before filtering by mtime.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/ingest/gemini.py` around lines 204 - 207, The recursive glob
sessions_dir.glob("**/chats/session-*.json") can traverse the whole filesystem
if sessions_dir is broad; change to explicitly target the expected chats folder
and validate sessions_dir first: ensure
sessions_dir.resolve().is_relative_to(Path.home() / ".gemini" / "tmp") (or
otherwise enforce/validate the expected layout) and replace the recursive
pattern with a scoped lookup like
sessions_dir.joinpath("chats").glob("session-*.json") (or stream files via
os.scandir on sessions_dir/"chats") before applying the since_days cutoff using
cutoff and f.stat().st_mtime; if validation fails, raise or log and skip
scanning to avoid expensive wide walks.
| already_indexed: set[str] = set() | ||
| if not dry_run and db_path and db_path.exists(): | ||
| try: | ||
| from ..vector_store import VectorStore | ||
|
|
||
| with VectorStore(db_path) as store: | ||
| cursor = store._read_cursor() | ||
| rows = cursor.execute("SELECT DISTINCT source_file FROM chunks WHERE source = 'gemini'") | ||
| already_indexed = {row[0] for row in rows} | ||
| except Exception as exc: | ||
| logger.debug("Could not check existing gemini chunks: %s", exc) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Reaching into VectorStore._read_cursor() is a leaky abstraction.
_read_cursor() is a private method (underscore prefix) of VectorStore. Coupling the Gemini adapter to it means any refactor of the store (e.g., switching to a context-managed read cursor or a dedicated query method) silently breaks this skip-already-indexed path. Consider adding a public helper on VectorStore such as list_indexed_source_files(source: str) -> set[str] and calling that here (and from cursor.py which likely does the same thing). Happy to help extract the helper if useful.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/ingest/gemini.py` around lines 218 - 228, Replace the direct
call into the private VectorStore._read_cursor() from the Gemini ingest code
with a public helper on VectorStore (e.g., add list_indexed_source_files(self,
source: str) -> set[str]) that encapsulates the SQL ("SELECT DISTINCT
source_file FROM chunks WHERE source = ?") and returns the set of filenames;
then update the Gemini adapter (the block that currently imports VectorStore and
uses _read_cursor) to call VectorStore.list_indexed_source_files("gemini") and
assign its result to already_indexed, removing the dependency on the private
_read_cursor implementation.


Summary
watch-agentslaunchd wiring plus Cursor/Gemini parsers and testsPivot / Scope
Backfill State At Pivot
codex_cli:8962cursor:630gemini:2Notes
CLAUDE.mdchanges were intentionally left out of this PR because they are unrelated to the ingestion checkpointValidation Already Run
pytest -q tests/test_ingest_cursor.py tests/test_ingest_gemini.py tests/test_agent_session_watcher.py tests/test_agent_ingest_launchd.py9 passedNote
Medium Risk
Adds new ingestion paths and a persistent watcher that automatically writes into the local sqlite DB, so bugs could cause missed/duplicate indexing or unexpected background resource usage. Changes are localized to ingestion/CLI/launchd wiring and covered by new unit tests.
Overview
Adds first-class ingestion support for Cursor and Gemini session artifacts, including new CLI commands
ingest-cursorandingest-geminiplus parsers/adapters that normalize transcripts and index them with per-source metadata.Introduces a new
watch-agentsCLI command backed byAgentSessionWatcher, which polls Codex/Cursor/Gemini session roots and ingests files whosemtime/sizechanged, persisting state in an on-disk registry.Extends the macOS
launchdinstaller to manage a newcom.brainlayer.agent-ingestLaunchAgent (including add/remove/all flows) and updatesindex_chunks_to_sqliteto accept an optionalcreated_atoverride so ingesters can preserve session timestamps.Reviewed by Cursor Bugbot for commit bd4f7e7. Bugbot is set up for automated code reviews on this repo. Configure here.
Summary by CodeRabbit
Release Notes
New Features
Chores
Note
Add multi-agent ingestion for Codex, Cursor, and Gemini with a polling
watch-agentsdaemonAgentSessionWatcher(agent_watch.py), a polling loop that detects file changes via mtime/size and invokes per-source ingest callbacks, backed by a JSON registry for persistence across restarts.ingest-cursor,ingest-gemini, andwatch-agents(the long-lived multi-source watcher with SIGTERM/SIGINT handling).watch-agentsas a background service on a 30-second poll interval.index_chunks_to_sqliteto accept an explicitcreated_attimestamp, avoiding redundant file reads when the caller already knows the session time.📊 Macroscope summarized bd4f7e7. 7 files reviewed, 2 issues evaluated, 0 issues filtered, 2 comments posted
🗂️ Filtered Issues