Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions scripts/launchd/com.brainlayer.agent-ingest.plist
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.brainlayer.agent-ingest</string>

<key>ProgramArguments</key>
<array>
<string>__BRAINLAYER_BIN__</string>
<string>watch-agents</string>
<string>--poll</string>
<string>30.0</string>
</array>

<key>StandardOutPath</key>
<string>__HOME__/.local/share/brainlayer/logs/agent-ingest.log</string>
<key>StandardErrorPath</key>
<string>__HOME__/.local/share/brainlayer/logs/agent-ingest.err</string>

<key>EnvironmentVariables</key>
<dict>
<key>PATH</key>
<string>/usr/local/bin:/usr/bin:/bin:__HOME__/.local/bin</string>
<key>PYTHONUNBUFFERED</key>
<string>1</string>
</dict>

<key>KeepAlive</key>
<true/>

<key>RunAtLoad</key>
<true/>

<key>ThrottleInterval</key>
<integer>5</integer>

<key>Nice</key>
<integer>10</integer>

<key>ProcessType</key>
<string>Background</string>
</dict>
</plist>
8 changes: 7 additions & 1 deletion scripts/launchd/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# ./scripts/launchd/install.sh index # Install indexing only
# ./scripts/launchd/install.sh enrich # Install enrichment only
# ./scripts/launchd/install.sh decay # Install decay only
# ./scripts/launchd/install.sh agent-ingest # Install multi-agent ingest watcher
# ./scripts/launchd/install.sh load enrichment
# ./scripts/launchd/install.sh unload enrichment
# ./scripts/launchd/install.sh checkpoint # Install WAL checkpoint only
Expand Down Expand Up @@ -118,6 +119,9 @@ case "${1:-all}" in
decay)
install_plist decay
;;
agent-ingest)
install_plist agent-ingest
;;
load)
load_plist "${2:-enrichment}"
;;
Expand All @@ -129,6 +133,7 @@ case "${1:-all}" in
;;
all)
install_plist index
install_plist agent-ingest
install_plist enrichment
install_plist decay
install_plist wal-checkpoint
Expand All @@ -137,13 +142,14 @@ case "${1:-all}" in
;;
remove)
remove_plist index
remove_plist agent-ingest 2>/dev/null || true
remove_plist enrich 2>/dev/null || true
remove_plist enrichment 2>/dev/null || true
remove_plist decay 2>/dev/null || true
remove_plist wal-checkpoint
;;
*)
echo "Usage: $0 [index|enrich|enrichment|decay|load [name]|unload [name]|checkpoint|all|remove]"
echo "Usage: $0 [index|agent-ingest|enrich|enrichment|decay|load [name]|unload [name]|checkpoint|all|remove]"
exit 1
;;
esac
Expand Down
139 changes: 139 additions & 0 deletions src/brainlayer/agent_watch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Polling watcher for Codex, Cursor, and Gemini session artifacts."""

from __future__ import annotations

import json
import logging
import os
import tempfile
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Callable

logger = logging.getLogger(__name__)


class AgentSessionRegistry:
"""Persists mtime/size state for agent session files."""

def __init__(self, path: str | Path):
self.path = Path(path)
self._data: dict[str, dict[str, int]] = {}
self._dirty = False
self._load()

def _load(self) -> None:
try:
with open(self.path) as fh:
data = json.load(fh)
if isinstance(data, dict):
self._data = data
except (OSError, json.JSONDecodeError, ValueError):
self._data = {}

def get(self, filepath: str) -> dict[str, int] | None:
return self._data.get(filepath)

def set(self, filepath: str, *, mtime_ns: int, size: int) -> None:
self._data[filepath] = {"mtime_ns": mtime_ns, "size": size}
self._dirty = True

def flush(self) -> bool:
if not self._dirty:
return True

tmp_path = None
try:
self.path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=str(self.path.parent), suffix=".tmp")
with os.fdopen(fd, "w") as fh:
json.dump(self._data, fh)
os.rename(tmp_path, self.path)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟒 Low brainlayer/agent_watch.py:53

On Windows, os.rename(tmp_path, self.path) raises FileExistsError when self.path already exists. After the first successful flush, all subsequent flushes fail silently β€” the warning is logged but the registry file is never updated. Use os.replace() for cross-platform atomic file replacement.

-            os.rename(tmp_path, self.path)
+            os.replace(tmp_path, self.path)
πŸš€ Reply "fix it for me" or copy this AI Prompt for your agent:
In file src/brainlayer/agent_watch.py around line 53:

On Windows, `os.rename(tmp_path, self.path)` raises `FileExistsError` when `self.path` already exists. After the first successful flush, all subsequent flushes fail silently β€” the warning is logged but the registry file is never updated. Use `os.replace()` for cross-platform atomic file replacement.

Evidence trail:
src/brainlayer/agent_watch.py lines 45-65 (commit REVIEWED_COMMIT) - shows `os.rename(tmp_path, self.path)` at line 53 with OSError handling. Python documentation at https://docs.python.org/3/library/os.html confirms `os.rename()` raises `FileExistsError` on Windows when destination exists, and recommends `os.replace()` for cross-platform overwriting.

self._dirty = False
return True
except OSError as exc:
logger.warning("Failed to flush agent registry: %s", exc)
if tmp_path:
try:
os.unlink(tmp_path)
except OSError:
pass
return False


@dataclass(frozen=True)
class AgentSessionSource:
name: str
patterns: list[str]
ingest: Callable[[Path], int]
root: Path


class AgentSessionWatcher:
"""Poll source roots and ingest files whose size/mtime changed."""

def __init__(
self,
registry_path: str | Path,
sources: list[AgentSessionSource],
poll_interval_s: float = 30.0,
registry_flush_interval_s: float = 5.0,
):
self.registry = AgentSessionRegistry(registry_path)
self.sources = sources
self.poll_interval_s = poll_interval_s
self.registry_flush_interval_s = registry_flush_interval_s
self._stop = threading.Event()
self._last_registry_flush = time.monotonic()

def _discover_files(self, source: AgentSessionSource) -> list[Path]:
files: list[Path] = []
if not source.root.exists():
return files
for pattern in source.patterns:
try:
files.extend(path for path in source.root.glob(pattern) if path.is_file())
except OSError:
continue
return sorted(set(files))

def poll_once(self) -> int:
processed = 0
for source in self.sources:
for file_path in self._discover_files(source):
try:
stat = file_path.stat()
except OSError:
continue

state = {"mtime_ns": stat.st_mtime_ns, "size": stat.st_size}
previous = self.registry.get(str(file_path))
if previous == state:
continue

indexed = source.ingest(file_path)
logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed)
self.registry.set(str(file_path), **state)
processed += 1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing per-file error handling blocks all ingestion

High Severity

The source.ingest(file_path) call in poll_once lacks a per-file try-except. If one file fails, the exception escapes poll_once, is caught by start(), and all remaining files in that cycle are skipped. On the next cycle the same broken file is retried (it's not in the registry), fails again, and blocks all alphabetically-subsequent files β€” permanently. The batch equivalents (ingest_codex_dir, ingest_cursor_dir, ingest_gemini_dir) all correctly wrap each file in try-except with a logger.warning, showing the intended pattern.

Additional Locations (1)
Fix in CursorΒ Fix in Web

Reviewed by Cursor Bugbot for commit bd4f7e7. Configure here.


now = time.monotonic()
if now - self._last_registry_flush >= self.registry_flush_interval_s:
self.registry.flush()
self._last_registry_flush = now
return processed
Comment on lines +102 to +125
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Isolate per-file ingest failures so a single bad file doesn't abort the whole poll.

source.ingest(file_path) is not wrapped in try/except. If any single file raises (malformed JSON, transient DB lock, bug in one adapter), poll_once() unwinds and the surrounding start() logs a generic "Agent watcher poll failed" β€” skipping all remaining files across all remaining sources until the next tick. The registry state is also not updated for the failing file (good, it will retry), but neighbors are unnecessarily penalized.

πŸ› οΈ Suggested fix
-                indexed = source.ingest(file_path)
-                logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed)
-                self.registry.set(str(file_path), **state)
-                processed += 1
+                try:
+                    indexed = source.ingest(file_path)
+                except Exception as exc:  # noqa: BLE001
+                    logger.warning(
+                        "Agent ingest failed for %s (%s): %s", source.name, file_path, exc
+                    )
+                    continue
+                logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed)
+                self.registry.set(str(file_path), **state)
+                processed += 1
πŸ“ Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def poll_once(self) -> int:
processed = 0
for source in self.sources:
for file_path in self._discover_files(source):
try:
stat = file_path.stat()
except OSError:
continue
state = {"mtime_ns": stat.st_mtime_ns, "size": stat.st_size}
previous = self.registry.get(str(file_path))
if previous == state:
continue
indexed = source.ingest(file_path)
logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed)
self.registry.set(str(file_path), **state)
processed += 1
now = time.monotonic()
if now - self._last_registry_flush >= self.registry_flush_interval_s:
self.registry.flush()
self._last_registry_flush = now
return processed
def poll_once(self) -> int:
processed = 0
for source in self.sources:
for file_path in self._discover_files(source):
try:
stat = file_path.stat()
except OSError:
continue
state = {"mtime_ns": stat.st_mtime_ns, "size": stat.st_size}
previous = self.registry.get(str(file_path))
if previous == state:
continue
try:
indexed = source.ingest(file_path)
except Exception as exc: # noqa: BLE001
logger.warning(
"Agent ingest failed for %s (%s): %s", source.name, file_path, exc
)
continue
logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed)
self.registry.set(str(file_path), **state)
processed += 1
now = time.monotonic()
if now - self._last_registry_flush >= self.registry_flush_interval_s:
self.registry.flush()
self._last_registry_flush = now
return processed
πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/brainlayer/agent_watch.py` around lines 102 - 125, Wrap the call to
source.ingest(file_path) inside a per-file try/except in poll_once so a single
file error doesn't abort the whole loop: call source.ingest(file_path) in a try
block, on success log via logger.info, call self.registry.set(...) and increment
processed; on exception catch Exception as e and log a per-file error including
source.name, file_path, and the exception (with traceback/context) and then
continue to the next file without updating the registry or processed count.
Ensure the except does not re-raise so remaining files and sources are still
processed and the registry flush logic remains unchanged.

⚠️ Potential issue | πŸ”΄ Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Inspect chunk id derivation + INSERT semantics in index_new.py and related writers
fd -t f 'index_new.py' src | xargs rg -nP -C4 '\b(insert|INSERT|OR IGNORE|OR REPLACE|chunk_id|hash|sha|uuid)\b'
rg -nP -C3 '\b(def\s+index_chunks_to_sqlite|generate_chunk_id|compute_chunk_id)\b'
# Check VectorStore.add_chunk / upsert paths
ast-grep --pattern $'class VectorStore:
  $$$
  def $_($$$):
    $$$
  $$$'

Repository: EtanHey/brainlayer

Length of output: 891


🏁 Script executed:

# Check INSERT/UPSERT semantics in index_new.py
rg -nP -A20 'def index_chunks_to_sqlite' src/brainlayer/index_new.py

Repository: EtanHey/brainlayer

Length of output: 812


🏁 Script executed:

# Check if there's REPLACE or IGNORE handling
rg -nP 'OR IGNORE|OR REPLACE|INSERT.*VALUES|INSERT.*SELECT' src/brainlayer/index_new.py

Repository: EtanHey/brainlayer

Length of output: 44


🏁 Script executed:

# Find ingest_cursor_session and ingest_gemini_session implementations
fd -t f 'ingest.*\.py' src | xargs rg -l 'ingest_cursor_session|ingest_gemini_session'

Repository: EtanHey/brainlayer

Length of output: 156


🏁 Script executed:

# Check VectorStore chunk insertion logic
rg -nP -B2 -A15 'def add_chunk|def upsert.*chunk' src/brainlayer/

Repository: EtanHey/brainlayer

Length of output: 1449


🏁 Script executed:

# Get full INSERT statement including ON CONFLICT clause
rg -nP -A30 'def upsert_chunks' src/brainlayer/vector_store.py | head -50

Repository: EtanHey/brainlayer

Length of output: 1748


🏁 Script executed:

# Check if there's ON CONFLICT handling in chunks table
rg -nP 'ON CONFLICT|UNIQUE.*chunks|PRIMARY KEY.*chunks' src/brainlayer/vector_store.py

Repository: EtanHey/brainlayer

Length of output: 112


🏁 Script executed:

# Check ingest_cursor_session - does it re-parse entire file or track offsets?
rg -nP -B3 -A20 'def ingest_cursor_session' src/brainlayer/ingest/cursor.py

Repository: EtanHey/brainlayer

Length of output: 929


🏁 Script executed:

# Check ingest_gemini_session similarly
rg -nP -B3 -A20 'def ingest_gemini_session' src/brainlayer/ingest/gemini.py

Repository: EtanHey/brainlayer

Length of output: 920


🏁 Script executed:

# Get full ingest_cursor_session to see if it re-parses entire file or uses offsets
rg -nP -A60 'def ingest_cursor_session' src/brainlayer/ingest/cursor.py | head -80

Repository: EtanHey/brainlayer

Length of output: 2374


🏁 Script executed:

# Check if there's any offset tracking in the ingest flow
rg -nP 'offset|position|seek|tell|incremental' src/brainlayer/ingest/cursor.py src/brainlayer/ingest/gemini.py

Repository: EtanHey/brainlayer

Length of output: 44


🏁 Script executed:

# Verify chunk enumeration - is it deterministic across re-parses?
rg -nP -B2 -A10 'enumerate.*chunks|chunk.*enumerate' src/brainlayer/ingest/cursor.py src/brainlayer/ingest/gemini.py

Repository: EtanHey/brainlayer

Length of output: 44


🏁 Script executed:

# Check agent_watch.py to see exactly how source.ingest is called and if file is fully re-read
rg -nP -B5 -A5 'source.ingest' src/brainlayer/agent_watch.py

Repository: EtanHey/brainlayer

Length of output: 585


Re-ingestion waste on live agent sessions β€” entire file re-parsed and re-embedded on every mtime/size change, even for unchanged content.

AgentSessionWatcher tracks only {mtime_ns, size} per file and triggers source.ingest(file_path) on any change. The ingest functions (ingest_cursor_session, ingest_gemini_session) call parse_cursor_session(file_path) which reads the entire JSONL, re-parses all entries, and chunks them completely. Since Cursor/Gemini session JSONLs are append-only during active sessions, every poll interval yields re-parsing, re-chunking, and re-embedding of the entire transcript (potentially multi-MB).

Database deduplication is in place (upsert_chunks uses ON CONFLICT(id) DO UPDATE SET), so duplicate rows won't accumulateβ€”but this mechanism still pays the full cost of parsing, chunking, and embedding the unchanged portion of the file. Contrast with src/brainlayer/watcher.py's JSONLWatcher, which persists per-file byte offsets and processes only new lines (plus rewind detection).

On a 30s poll interval with a live agent transcribing a multi-MB session, this pattern will:

  • Waste CPU and embedding quota on unchanged content.
  • Block the single poll thread for the duration of re-parsing and re-embedding.
  • Slowly starve other work competing for the same embeddings API rate limit.

Adopt offset-based incremental ingestion (like JSONLWatcher) for append-only JSONL sources to ingest only new lines since the last offset.

πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/brainlayer/agent_watch.py` around lines 102 - 125,
AgentSessionWatcher.poll_once currently re-parses entire append-only JSONL
sessions on any mtime/size change; change to offset-based incremental ingestion
like JSONLWatcher: persist a per-file byte offset in the registry (keyed by file
path) and on each poll call read the saved offset, detect rewind (file.size <
offset -> full reparse), open the file and seek to offset to parse only new
lines, then call ingest_cursor_session/ingest_gemini_session (or refactor them
to accept a start_offset or file-like stream) to process only appended entries,
update the registry with the new offset (and mtime/size) after successful
ingest, and keep the existing flush logic; this avoids full re-parsing and
re-embedding unchanged content.


def start(self) -> None:
while not self._stop.is_set():
try:
self.poll_once()
except Exception as exc:
logger.error("Agent watcher poll failed: %s", exc)
self._stop.wait(self.poll_interval_s)

self.registry.flush()

def stop(self) -> None:
self._stop.set()

Loading
Loading